Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/sql/test/Test_F_TableJobAdaptator.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/sql/test/Test_F_TableJobAdaptator.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,185 @@ +from commons.core.launcher.WriteScript import WriteScript +from commons.core.sql.Job import Job +from commons.core.sql.DbFactory import DbFactory +from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory +import sys +import stat +import os +import time +import unittest +import glob + +class Test_F_TableJobAdaptator(unittest.TestCase): + + def setUp(self): + self._jobTableName = "dummyJobTable" + self._db = DbFactory.createInstance() + self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) + + def tearDown(self): + self._db.dropTable(self._jobTableName) + self._db.close() + + def test_submitJob_with_multiple_jobs(self): + self._db.createTable(self._jobTableName, "jobs", overwrite = True) + job1 = _createJobInstance("job1") + _createLauncherFile(job1, self._iTJA) + job2 = _createJobInstance("job2") + _createLauncherFile(job2, self._iTJA) + job3 = _createJobInstance("job3") + _createLauncherFile(job3, self._iTJA) + + self._iTJA.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) + self._iTJA.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) + self._iTJA.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) + + time.sleep(120) + + expJobStatus = "finished" + obsJobStatus1 = self._iTJA.getJobStatus(job1) + obsJobStatus2 = self._iTJA.getJobStatus(job2) + obsJobStatus3 = self._iTJA.getJobStatus(job3) + + self.assertEquals(expJobStatus, obsJobStatus1) + self.assertEquals(expJobStatus, obsJobStatus2) + self.assertEquals(expJobStatus, obsJobStatus3) + + expErrorFilePrefix1 = job1.jobname + ".e" + expOutputFilePrefix1 = job1.jobname + ".o" + expErrorFilePrefix2 = job2.jobname + ".e" + expOutputFilePrefix2 = job2.jobname + ".o" + expErrorFilePrefix3 = job3.jobname + ".e" + expOutputFilePrefix3 = job3.jobname + ".o" + + lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") + lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") + lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*") + lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*") + lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*") + lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*") + + isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) + isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) + isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) + isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0) + isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) + isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0) + + os.system("rm launcherFileTest*.py *.e* *.o*") + self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) + self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2) + self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3) + + def test_submitJob_job_already_submitted(self): + self._db.createTable(self._jobTableName, "jobs", overwrite = True) + iJob = _createJobInstance("job") + self._iTJA.recordJob(iJob) + + isSysExitRaised = False + try: + self._iTJA.submitJob(iJob) + except SystemExit: + isSysExitRaised = True + self.assertTrue(isSysExitRaised) + + def test_waitJobGroup_with_error_job_maxRelaunch_two(self): + self._db.createTable(self._jobTableName, "jobs", overwrite = True) + iJob = _createJobInstance("job") + _createLauncherFile(iJob, self._iTJA) + + self._iTJA.recordJob(iJob) + self._iTJA.changeJobStatus(iJob, "error") + + self._iTJA.waitJobGroup(iJob.groupid, 0, 2) + + time.sleep(120) + + expJobStatus = "finished" + obsJobStatus1 = self._iTJA.getJobStatus(iJob) + + self.assertEquals(expJobStatus, obsJobStatus1) + + expErrorFilePrefix1 = iJob.jobname + ".e" + expOutputFilePrefix1 = iJob.jobname + ".o" + + lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") + lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") + + isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) + isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) + + self._iTJA.removeJob(iJob) + os.system("rm launcherFileTest*.py *.e* *.o*") + self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) + +class Test_F_TableJobAdaptator_SGE(unittest.TestCase): + + def setUp(self): + if os.environ["REPET_JOB_MANAGER"].lower() != "sge": + print "ERROR: jobs manager is not SGE: REPET_JOB_MANAGER = %s." % os.environ["REPET_JOB_MANAGER"] + sys.exit(0) + self._jobTableName = "dummyJobTable" + self._db = DbFactory.createInstance() + self._db.createTable(self._jobTableName, "jobs", overwrite = True) + self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) + self._iJob = _createJobInstance("job") + _createLauncherFile(self._iJob, self._iTJA) + + def tearDown(self): + self._db.dropTable(self._jobTableName) + self._db.close() + + def test_waitJobGroup_with_several_nbTimeOut_waiting(self): + self._iTJA.recordJob(self._iJob) + self._iTJA.changeJobStatus(self._iJob, "running") + + expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % self._iJob.jobid + + obsError = "obsError.txt" + obsErrorHandler = open(obsError, "w") + stderrRef = sys.stderr + sys.stderr = obsErrorHandler + + isSysExitRaised = False + try: + self._iTJA.waitJobGroup(self._iJob.groupid, timeOutPerJob = 3) + except SystemExit: + isSysExitRaised = True + + obsErrorHandler.close() + + obsErrorHandler = open(obsError, "r") + obsMsg = obsErrorHandler.readline() + obsErrorHandler.close() + + sys.stderr = stderrRef + os.remove(obsError) + os.system("rm launcherFileTest*.py") + self.assertTrue(isSysExitRaised) + self.assertEquals(expMsg, obsMsg) + + def test_isJobStillHandledBySge_True(self): + self._iTJA.submitJob(self._iJob) + isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname) + os.system("rm launcherFileTest*.py") + self.assertTrue(isJobHandledBySge) + + def test_isJobStillHandledBySge_False(self): + self._iTJA.recordJob(self._iJob) + isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname) + os.system("rm launcherFileTest*.py") + self.assertFalse(isJobHandledBySge) + +def _createJobInstance(name): + lResources = [] + if os.environ.get("HOSTNAME") == "compute-2-46.local": + lResources.append("test=TRUE") + return Job(0, name, "test", "", "log = os.system(\"date;sleep 5;date\")", "%s/launcherFileTest_%s.py" % (os.getcwd(), name), lResources=lResources) + +def _createLauncherFile(iJob, iTJA): + iWriteScript = WriteScript(iJob, iTJA, os.getcwd(), os.getcwd()) + iWriteScript.run(iJob.command, "", iJob.launcher) + os.chmod(iJob.launcher, stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO) + +if __name__ == "__main__": + unittest.main()