diff smart_toolShed/commons/core/sql/test/Test_F_TableJobAdaptator.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/smart_toolShed/commons/core/sql/test/Test_F_TableJobAdaptator.py	Thu Jan 17 10:52:14 2013 -0500
@@ -0,0 +1,185 @@
+from commons.core.launcher.WriteScript import WriteScript
+from commons.core.sql.Job import Job
+from commons.core.sql.DbFactory import DbFactory
+from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
+import sys
+import stat
+import os
+import time
+import unittest
+import glob
+
+class Test_F_TableJobAdaptator(unittest.TestCase):
+
+    def setUp(self):
+        self._jobTableName = "dummyJobTable"
+        self._db = DbFactory.createInstance()
+        self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
+
+    def tearDown(self):
+        self._db.dropTable(self._jobTableName)
+        self._db.close()
+    
+    def test_submitJob_with_multiple_jobs(self):
+        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
+        job1 = _createJobInstance("job1")
+        _createLauncherFile(job1, self._iTJA)
+        job2 = _createJobInstance("job2")
+        _createLauncherFile(job2, self._iTJA)
+        job3 = _createJobInstance("job3")
+        _createLauncherFile(job3, self._iTJA)
+        
+        self._iTJA.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
+        self._iTJA.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
+        self._iTJA.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
+
+        time.sleep(120)
+        
+        expJobStatus = "finished"
+        obsJobStatus1 = self._iTJA.getJobStatus(job1)
+        obsJobStatus2 = self._iTJA.getJobStatus(job2)
+        obsJobStatus3 = self._iTJA.getJobStatus(job3)
+        
+        self.assertEquals(expJobStatus, obsJobStatus1)
+        self.assertEquals(expJobStatus, obsJobStatus2)
+        self.assertEquals(expJobStatus, obsJobStatus3)
+        
+        expErrorFilePrefix1 = job1.jobname + ".e" 
+        expOutputFilePrefix1 = job1.jobname + ".o"
+        expErrorFilePrefix2 = job2.jobname + ".e" 
+        expOutputFilePrefix2 = job2.jobname + ".o"
+        expErrorFilePrefix3 = job3.jobname + ".e" 
+        expOutputFilePrefix3 = job3.jobname + ".o"
+        
+        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
+        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
+        lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
+        lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
+        lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
+        lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
+        
+        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
+        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
+        isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) 
+        isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
+        isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) 
+        isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
+        
+        os.system("rm launcherFileTest*.py *.e* *.o*")
+        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
+        self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
+        self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)
+
+    def test_submitJob_job_already_submitted(self):
+        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
+        iJob = _createJobInstance("job")
+        self._iTJA.recordJob(iJob)
+        
+        isSysExitRaised = False
+        try:
+            self._iTJA.submitJob(iJob)
+        except SystemExit:
+            isSysExitRaised = True
+        self.assertTrue(isSysExitRaised)
+    
+    def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
+        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
+        iJob = _createJobInstance("job")
+        _createLauncherFile(iJob, self._iTJA)
+        
+        self._iTJA.recordJob(iJob)
+        self._iTJA.changeJobStatus(iJob, "error")
+        
+        self._iTJA.waitJobGroup(iJob.groupid, 0, 2)
+        
+        time.sleep(120)
+        
+        expJobStatus = "finished"
+        obsJobStatus1 = self._iTJA.getJobStatus(iJob)
+        
+        self.assertEquals(expJobStatus, obsJobStatus1)
+        
+        expErrorFilePrefix1 = iJob.jobname + ".e" 
+        expOutputFilePrefix1 = iJob.jobname + ".o"
+        
+        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
+        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
+        
+        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
+        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
+        
+        self._iTJA.removeJob(iJob) 
+        os.system("rm launcherFileTest*.py *.e* *.o*")
+        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
+
+class Test_F_TableJobAdaptator_SGE(unittest.TestCase):
+
+    def setUp(self):
+        if os.environ["REPET_JOB_MANAGER"].lower() != "sge":
+            print "ERROR: jobs manager is not SGE: REPET_JOB_MANAGER = %s." % os.environ["REPET_JOB_MANAGER"]
+            sys.exit(0)
+        self._jobTableName = "dummyJobTable"
+        self._db = DbFactory.createInstance()
+        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
+        self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
+        self._iJob = _createJobInstance("job")
+        _createLauncherFile(self._iJob, self._iTJA)
+
+    def tearDown(self):
+        self._db.dropTable(self._jobTableName)
+        self._db.close()
+
+    def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
+        self._iTJA.recordJob(self._iJob)
+        self._iTJA.changeJobStatus(self._iJob, "running")
+        
+        expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % self._iJob.jobid
+        
+        obsError = "obsError.txt"
+        obsErrorHandler = open(obsError, "w")
+        stderrRef = sys.stderr
+        sys.stderr = obsErrorHandler
+        
+        isSysExitRaised = False
+        try:
+            self._iTJA.waitJobGroup(self._iJob.groupid, timeOutPerJob = 3)
+        except SystemExit:
+            isSysExitRaised = True
+           
+        obsErrorHandler.close()
+        
+        obsErrorHandler = open(obsError, "r")
+        obsMsg = obsErrorHandler.readline()
+        obsErrorHandler.close()
+       
+        sys.stderr = stderrRef
+        os.remove(obsError)
+        os.system("rm launcherFileTest*.py")
+        self.assertTrue(isSysExitRaised)
+        self.assertEquals(expMsg, obsMsg)
+         
+    def test_isJobStillHandledBySge_True(self):
+        self._iTJA.submitJob(self._iJob)
+        isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
+        os.system("rm launcherFileTest*.py")
+        self.assertTrue(isJobHandledBySge)
+
+    def test_isJobStillHandledBySge_False(self):
+        self._iTJA.recordJob(self._iJob)
+        isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
+        os.system("rm launcherFileTest*.py")
+        self.assertFalse(isJobHandledBySge)
+
+def _createJobInstance(name):
+    lResources = []
+    if os.environ.get("HOSTNAME") == "compute-2-46.local":
+        lResources.append("test=TRUE")
+    return Job(0, name, "test", "", "log = os.system(\"date;sleep 5;date\")", "%s/launcherFileTest_%s.py" % (os.getcwd(), name), lResources=lResources)
+
+def _createLauncherFile(iJob, iTJA):
+    iWriteScript = WriteScript(iJob, iTJA, os.getcwd(), os.getcwd())
+    iWriteScript.run(iJob.command, "", iJob.launcher)
+    os.chmod(iJob.launcher, stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO)
+        
+if __name__ == "__main__":
+    unittest.main()