Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/launcher/test/Test_Launcher.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/launcher/test/Test_Launcher.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,333 @@ +from commons.core.utils.FileUtils import FileUtils +from commons.core.launcher.Launcher import Launcher +from commons.core.launcher.WriteScript import WriteScript +from commons.core.stat.Stat import Stat +from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory +from commons.core.sql.DbFactory import DbFactory +from commons.core.sql.Job import Job +import unittest +import os +import shutil +import time +import stat + +#TODO: Test_F_Launcher.py : to execute prepareCommands() and runSingleJob() +# to test runLauncherForMultipleJobs() +#TODO: check clean of "Test_runSingleJob" +#TODO: refactoring => choose between "self._queue" or "lResources" to set resources +class Test_Launcher(unittest.TestCase): + + SARUMAN_NAME = "compute-2-46.local" + + def setUp(self): + self._cDir = os.getcwd() + self._tmpDir = self._cDir + self._groupid = "test" + self._jobTable = "dummyJobTable" + self._iDb = DbFactory.createInstance() + self._iDb.createTable(self._jobTable, "jobs", overwrite = True) + self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable) + self._queue = "" + self._configFileName = "dummyConfigFile" + + def tearDown(self): + self._iDb.dropTable(self._jobTable) + self._iDb.close() + FileUtils.removeFilesByPattern('*.e*') + FileUtils.removeFilesByPattern('*.o*') + FileUtils.removeFilesByPattern('launcherFileTest_BeginRun.py') + FileUtils.removeFilesByPattern(self._configFileName) + FileUtils.removeFilesByPattern('ClusterLauncher_*') + + def test__init__wrong_fields_for_job_table(self): + self._iDb.dropTable(self._jobTable) + sqlCmd = "CREATE TABLE " + self._jobTable + sqlCmd += " ( jobid INT UNSIGNED" + sqlCmd += ", jobname VARCHAR(255)" + sqlCmd += ", groupid VARCHAR(255)" + sqlCmd += ", command TEXT" + sqlCmd += ", launcher VARCHAR(1024)" + sqlCmd += ", queue VARCHAR(255)" + sqlCmd += ", status VARCHAR(255)" + sqlCmd += ", time DATETIME" + sqlCmd += ", node VARCHAR(255) )" + self._iDb.execute(sqlCmd) + acronym = "Test__init__" + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) + lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"]) + lObsFields = sorted(self._iDb.getFieldList(self._jobTable)) + self.assertEquals(lExpFields, lObsFields) + expJob = Job(queue = self._queue) + obsJob = iLauncher.job + self.assertEquals(expJob, obsJob) + + def test__init__withResources(self): + queue = "main.q mem_free=3G" + acronym = "Test__init__" + expQueue = "main.q" + explResources = ['mem_free=3G'] + expJob = Job(queue = expQueue, lResources = explResources) + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", queue, self._groupid, acronym) + obsJob = iLauncher.job + self.assertEquals(expJob, obsJob) + + def test_createGroupidIfItNotExist(self): + acronym = "checkGroupID" + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) + iLauncher.createGroupidIfItNotExist() + obsGroupid = iLauncher.job.groupid + self.assertEquals(self._groupid, obsGroupid) + + def test_createGroupidIfItNotExist_without_groupid(self): + groupid = "" + acronym = "checkGroupID" + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, groupid, acronym) + iLauncher.createGroupidIfItNotExist() + obsGroupid = iLauncher.job.groupid + self.assertTrue(obsGroupid != "") + + def test_beginRun_with_Job_finished_in_Table(self): + acronym = "BeginRun" + iJob = Job(queue = self._queue) + self._jobdb.recordJob(iJob) + self._jobdb.changeJobStatus(iJob, "finished") + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) + iLauncher.beginRun() + self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0) + + def test_beginRun_with_Job_unfinished_in_Table(self): + acronym = "testU_BeginRun" + cmd_start = "log = os.system( \"date;sleep 10;date\" )\n" + pyFileName = "%s/launcherFileTest_BeginRun.py" % os.getcwd() + if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): + iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"]) + else: + iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName) + iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir) + iWriteScript.run(cmd_start, "", pyFileName) + os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) + self._jobdb.submitJob(iJob) + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) + + iLauncher.beginRun() + + self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 1) + + def test_getStatsOfExecutionTime(self): + acronym = "test_statTime" + + expLValues = [1000.00000, 1000.00000] + expStat = Stat(expLValues) + + f = open(acronym +".o1", "w") + f.write("executionTime=1000.000000") + f.close() + f = open(acronym +".o2", "w") + f.write("executionTime=1000.000000") + f.close() + + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) + obsStat = iLauncher.getStatsOfExecutionTime(acronym) + + self.assertEqual(expStat, obsStat) + + def test_endRun(self): + acronym = "testU_EndRun" + cmd_start = "log = os.system( \"date;sleep 10;date\" )\n" + pyFileName = "%s/launcherFileTest_EndRun.py" % os.getcwd() + if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): + iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"]) + else: + iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName) + + iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir) + iWriteScript.run(cmd_start, "", pyFileName) + os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) + self._jobdb.submitJob(iJob) + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) + iLauncher.job.groupid = self._groupid + iLauncher.endRun() + + self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0) + self.assertTrue(self._jobdb.getCountStatus(self._groupid, "error") == 0) + self.assertTrue(self._jobdb.getCountStatus(self._groupid, "waiting") == 0) + + os.remove(iJob.launcher) + + def test_clean(self): + acronym = "test_clean" + f = open("ClusterLauncher" + acronym + ".py", "w") + f.close() + f = open(acronym + ".o1", "w") + f.close() + f = open(acronym + ".e1", "w") + f.close() + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) + iLauncher.clean(acronym) + self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym + ".py")) + + def test_clean_without_acronym(self): + acronym = "" + acronym2 = "toto" + f = open("ClusterLauncher" + acronym2 + ".py", "w") + f.close() + f = open(acronym2 + ".o1", "w") + f.close() + f = open(acronym2 + ".e1", "w") + f.close() + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym2) + iLauncher.clean(acronym) + self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym2 + ".py")) + + def test_getQueueNameAndResources_queue_no_resource(self): + configQueue = "all.q" + expQueueName = "all.q" + expResources = [] + iLauncher = Launcher(self._jobdb) + obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) + self.assertEquals(expQueueName, obsQueueName) + self.assertEquals(expResources, obsResources) + + def test_getQueueNameAndResources_queue_one_resource(self): + configQueue = "test.q 'test=TRUE'" + expQueueName = "test.q" + expResources = ["test=TRUE"] + iLauncher = Launcher(self._jobdb) + obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) + self.assertEquals(expQueueName, obsQueueName) + self.assertEquals(expResources, obsResources) + + def test_getQueueNameAndResources_queue_two_resources(self): + configQueue = "big.q 's_data=8G s_cpu=96:00:00'" + expQueueName = "big.q" + expResources = ["s_data=8G", "s_cpu=96:00:00"] + iLauncher = Launcher(self._jobdb) + obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) + self.assertEquals(expQueueName, obsQueueName) + self.assertEquals(expResources, obsResources) + + def test_getQueueNameAndResources_no_queue_no_resource(self): + configQueue = "" + expQueueName = "" + expResources = [] + iLauncher = Launcher(self._jobdb) + obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) + self.assertEquals(expQueueName, obsQueueName) + self.assertEquals(expResources, obsResources) + + def test_getQueueNameAndResources_no_queue_one_resource(self): + configQueue = "s_data=8G" + expQueueName = "" + expResources = ["s_data=8G"] + iLauncher = Launcher(self._jobdb) + obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) + self.assertEquals(expQueueName, obsQueueName) + self.assertEquals(expResources, obsResources) + + def test_getQueueNameAndResources_no_queue_two_resource(self): + configQueue = "s_data=8G s_cpu=96:00:00" + expQueueName = "" + expResources = ["s_data=8G", "s_cpu=96:00:00"] + iLauncher = Launcher(self._jobdb) + obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) + self.assertEquals(expQueueName, obsQueueName) + self.assertEquals(expResources, obsResources) + +# #TODO: test with at least 2 lines in cmd + def test_runSingleJob(self): + acronym = "Test_runSingleJob" + os.mkdir(acronym) + os.chdir(acronym) + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym) + iLauncher.job.groupid = self._groupid + iLauncher.job.jobname = acronym + iLauncher.job.queue = self._queue + if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): + iLauncher.job.lResources = ["test=TRUE"] + cmd = "log = os.system(\"touch 'YuFei'\")\n" + iLauncher.runSingleJob(cmd) + time.sleep(20) + jobStatus = self._jobdb.getJobStatus(iLauncher.job) + os.chdir(self._cDir) + shutil.rmtree(acronym) + self.assertEqual(jobStatus, "finished") + + def test_runSingleJob_catch_error_wrong_tmpDir(self): + acronym = "Test_runSingleJob_catch_error" + os.mkdir(acronym) + os.chdir(acronym) + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), "%s/toto" % self._tmpDir, "", self._queue, self._groupid, acronym) + iLauncher.job.groupid = self._groupid + iLauncher.job.jobname = acronym + iLauncher.job.queue = self._queue + if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): + iLauncher.job.lResources = ["test=TRUE"] + cmd = "log = os.system(\"touch 'YuFei'\")\n" + iLauncher.runSingleJob(cmd) + time.sleep(20) + jobStatus = self._jobdb.getJobStatus(iLauncher.job) + os.chdir(self._cDir) + shutil.rmtree(acronym) + self.assertEqual(jobStatus, "error") + + def test_runSingleJob_catch_error_wrong_cmd(self): + acronym = "Test_runSingleJob_catch_error" + os.mkdir(acronym) + os.chdir(acronym) + iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym) + iLauncher.job.groupid = self._groupid + iLauncher.job.jobname = acronym + iLauncher.job.queue = self._queue + if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): + iLauncher.job.lResources = ["test=TRUE"] + cmd = "log = os.system(\"truc -i toto\")\n" + iLauncher.runSingleJob(cmd) + time.sleep(20) + jobStatus = self._jobdb.getJobStatus(iLauncher.job) + self._jobdb.cleanJobGroup(self._groupid) + os.chdir(self._cDir) + shutil.rmtree(acronym) + self.assertEqual(jobStatus, "error") + + def test_prepareCommands(self): + expCmdStart = "os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")\n\tos.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")\n\tlog = os.system(\"touch file\")\n\t" + expCmdFinish = "if os.path.exists(\"yufei.align\"):\n\t\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )\n\t" + expCmdSize = "fileSize = 3.2\n\t\t" + expCmdCopy = "shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")\n\t\tshutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")\n\t\t" + + lCmdStart = [] + lCmdStart.append("os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")") + lCmdStart.append("os.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")") + lCmds = [] + lCmds.append("log = os.system(\"touch file\")") + lCmdFinish = [] + lCmdFinish.append("if os.path.exists(\"yufei.align\"):") + lCmdFinish.append("\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )") + lCmdSize = [] + lCmdSize.append("fileSize = 3.2") + lCmdCopy = [] + lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")") + lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")") + + iLauncher = Launcher(self._jobdb) + obsCmdStart, obsCmdFinish, obsCmdSize, obsCmdCopy = iLauncher.prepareCommands(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy) + + self.assertEquals(expCmdStart, obsCmdStart) + self.assertEquals(expCmdFinish, obsCmdFinish) + self.assertEquals(expCmdSize, obsCmdSize) + self.assertEquals(expCmdCopy, obsCmdCopy) + + def test_getSystemCommand(self): + prg = "touch" + lArgs = [] + lArgs.append("file") + expCmd = "log = os.system(\"touch file\")" + iLauncher = Launcher(self._jobdb) + obsCmd = iLauncher.getSystemCommand(prg, lArgs) + self.assertEquals(expCmd, obsCmd) + + +test_suite = unittest.TestSuite() +test_suite.addTest( unittest.makeSuite( Test_Launcher ) ) +if __name__ == "__main__": + unittest.TextTestRunner(verbosity=2).run( test_suite )