diff smart_toolShed/commons/core/launcher/test/Test_Launcher.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/smart_toolShed/commons/core/launcher/test/Test_Launcher.py	Thu Jan 17 10:52:14 2013 -0500
@@ -0,0 +1,333 @@
+from commons.core.utils.FileUtils import FileUtils
+from commons.core.launcher.Launcher import Launcher
+from commons.core.launcher.WriteScript import WriteScript
+from commons.core.stat.Stat import Stat
+from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
+from commons.core.sql.DbFactory import DbFactory
+from commons.core.sql.Job import Job
+import unittest
+import os
+import shutil
+import time
+import stat
+
+#TODO: Test_F_Launcher.py : to execute prepareCommands() and runSingleJob()
+#                            to test runLauncherForMultipleJobs()
+#TODO: check clean of "Test_runSingleJob"
+#TODO: refactoring => choose between "self._queue" or "lResources" to set resources
+class Test_Launcher(unittest.TestCase):
+
+    SARUMAN_NAME = "compute-2-46.local"
+    
+    def setUp(self):
+        self._cDir = os.getcwd()
+        self._tmpDir = self._cDir
+        self._groupid = "test"
+        self._jobTable = "dummyJobTable"
+        self._iDb = DbFactory.createInstance()
+        self._iDb.createTable(self._jobTable, "jobs", overwrite = True)
+        self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable)
+        self._queue = ""
+        self._configFileName = "dummyConfigFile"
+    
+    def tearDown(self):
+        self._iDb.dropTable(self._jobTable)
+        self._iDb.close()
+        FileUtils.removeFilesByPattern('*.e*')
+        FileUtils.removeFilesByPattern('*.o*')
+        FileUtils.removeFilesByPattern('launcherFileTest_BeginRun.py')
+        FileUtils.removeFilesByPattern(self._configFileName)
+        FileUtils.removeFilesByPattern('ClusterLauncher_*')
+        
+    def test__init__wrong_fields_for_job_table(self):
+        self._iDb.dropTable(self._jobTable)
+        sqlCmd = "CREATE TABLE " + self._jobTable 
+        sqlCmd += " ( jobid INT UNSIGNED"
+        sqlCmd += ", jobname VARCHAR(255)"
+        sqlCmd += ", groupid VARCHAR(255)"
+        sqlCmd += ", command TEXT"
+        sqlCmd += ", launcher VARCHAR(1024)"
+        sqlCmd += ", queue VARCHAR(255)"
+        sqlCmd += ", status VARCHAR(255)"
+        sqlCmd += ", time DATETIME"
+        sqlCmd += ", node VARCHAR(255) )"
+        self._iDb.execute(sqlCmd)
+        acronym = "Test__init__"
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
+        lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"])
+        lObsFields = sorted(self._iDb.getFieldList(self._jobTable))
+        self.assertEquals(lExpFields, lObsFields)
+        expJob = Job(queue = self._queue)
+        obsJob = iLauncher.job
+        self.assertEquals(expJob, obsJob)
+        
+    def test__init__withResources(self):
+        queue = "main.q mem_free=3G"
+        acronym = "Test__init__"
+        expQueue = "main.q"
+        explResources = ['mem_free=3G']
+        expJob = Job(queue = expQueue, lResources = explResources)
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", queue, self._groupid, acronym)
+        obsJob = iLauncher.job
+        self.assertEquals(expJob, obsJob)
+
+    def test_createGroupidIfItNotExist(self):
+        acronym = "checkGroupID"
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
+        iLauncher.createGroupidIfItNotExist()
+        obsGroupid = iLauncher.job.groupid
+        self.assertEquals(self._groupid, obsGroupid)
+
+    def test_createGroupidIfItNotExist_without_groupid(self):
+        groupid = ""
+        acronym = "checkGroupID"
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, groupid, acronym)
+        iLauncher.createGroupidIfItNotExist()
+        obsGroupid = iLauncher.job.groupid
+        self.assertTrue(obsGroupid != "")
+        
+    def test_beginRun_with_Job_finished_in_Table(self):
+        acronym = "BeginRun"
+        iJob = Job(queue = self._queue)
+        self._jobdb.recordJob(iJob)
+        self._jobdb.changeJobStatus(iJob, "finished")
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
+        iLauncher.beginRun()
+        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
+        
+    def test_beginRun_with_Job_unfinished_in_Table(self):
+        acronym = "testU_BeginRun"
+        cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
+        pyFileName = "%s/launcherFileTest_BeginRun.py" % os.getcwd()
+        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
+            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
+        else: 
+            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
+        iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
+        iWriteScript.run(cmd_start, "", pyFileName)
+        os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
+        self._jobdb.submitJob(iJob)
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
+        
+        iLauncher.beginRun()
+        
+        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 1)
+    
+    def test_getStatsOfExecutionTime(self):
+        acronym = "test_statTime"
+        
+        expLValues = [1000.00000, 1000.00000]
+        expStat = Stat(expLValues) 
+        
+        f = open(acronym +".o1", "w")
+        f.write("executionTime=1000.000000")
+        f.close()
+        f = open(acronym +".o2", "w")
+        f.write("executionTime=1000.000000")
+        f.close()
+        
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
+        obsStat = iLauncher.getStatsOfExecutionTime(acronym)
+        
+        self.assertEqual(expStat, obsStat)           
+           
+    def test_endRun(self):
+        acronym = "testU_EndRun"
+        cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
+        pyFileName = "%s/launcherFileTest_EndRun.py" % os.getcwd()
+        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
+            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
+        else: 
+            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
+ 
+        iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
+        iWriteScript.run(cmd_start, "", pyFileName)
+        os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
+        self._jobdb.submitJob(iJob)
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
+        iLauncher.job.groupid = self._groupid
+        iLauncher.endRun()
+        
+        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
+        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "error") == 0)
+        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "waiting") == 0)    
+           
+        os.remove(iJob.launcher)
+
+    def test_clean(self):
+        acronym = "test_clean"
+        f = open("ClusterLauncher" + acronym + ".py", "w")
+        f.close()
+        f = open(acronym + ".o1", "w")
+        f.close()
+        f = open(acronym + ".e1", "w")
+        f.close()
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
+        iLauncher.clean(acronym)
+        self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym + ".py"))
+
+    def test_clean_without_acronym(self):
+        acronym = ""
+        acronym2 = "toto"
+        f = open("ClusterLauncher" + acronym2 + ".py", "w")
+        f.close()
+        f = open(acronym2 + ".o1", "w")
+        f.close()
+        f = open(acronym2 + ".e1", "w")
+        f.close()
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym2)
+        iLauncher.clean(acronym)
+        self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym2 + ".py"))
+        
+    def test_getQueueNameAndResources_queue_no_resource(self):
+        configQueue = "all.q"
+        expQueueName = "all.q"
+        expResources = []
+        iLauncher = Launcher(self._jobdb)
+        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
+        self.assertEquals(expQueueName, obsQueueName)
+        self.assertEquals(expResources, obsResources)
+        
+    def test_getQueueNameAndResources_queue_one_resource(self):
+        configQueue = "test.q 'test=TRUE'"
+        expQueueName = "test.q"
+        expResources = ["test=TRUE"]
+        iLauncher = Launcher(self._jobdb)
+        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
+        self.assertEquals(expQueueName, obsQueueName)
+        self.assertEquals(expResources, obsResources)
+        
+    def test_getQueueNameAndResources_queue_two_resources(self):
+        configQueue = "big.q 's_data=8G s_cpu=96:00:00'"
+        expQueueName = "big.q"
+        expResources = ["s_data=8G", "s_cpu=96:00:00"]
+        iLauncher = Launcher(self._jobdb)
+        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
+        self.assertEquals(expQueueName, obsQueueName)
+        self.assertEquals(expResources, obsResources)
+        
+    def test_getQueueNameAndResources_no_queue_no_resource(self):
+        configQueue = ""
+        expQueueName = ""
+        expResources = []
+        iLauncher = Launcher(self._jobdb)
+        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
+        self.assertEquals(expQueueName, obsQueueName)
+        self.assertEquals(expResources, obsResources)
+        
+    def test_getQueueNameAndResources_no_queue_one_resource(self):
+        configQueue = "s_data=8G"
+        expQueueName = ""
+        expResources = ["s_data=8G"]
+        iLauncher = Launcher(self._jobdb)
+        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
+        self.assertEquals(expQueueName, obsQueueName)
+        self.assertEquals(expResources, obsResources)
+        
+    def test_getQueueNameAndResources_no_queue_two_resource(self):
+        configQueue = "s_data=8G s_cpu=96:00:00"
+        expQueueName = ""
+        expResources = ["s_data=8G", "s_cpu=96:00:00"]
+        iLauncher = Launcher(self._jobdb)
+        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
+        self.assertEquals(expQueueName, obsQueueName)
+        self.assertEquals(expResources, obsResources)      
+
+#   #TODO: test with at least 2 lines in cmd
+    def test_runSingleJob(self):
+        acronym = "Test_runSingleJob"
+        os.mkdir(acronym)
+        os.chdir(acronym)
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
+        iLauncher.job.groupid = self._groupid
+        iLauncher.job.jobname = acronym
+        iLauncher.job.queue = self._queue
+        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
+            iLauncher.job.lResources = ["test=TRUE"]
+        cmd = "log = os.system(\"touch 'YuFei'\")\n"
+        iLauncher.runSingleJob(cmd)
+        time.sleep(20)
+        jobStatus = self._jobdb.getJobStatus(iLauncher.job)
+        os.chdir(self._cDir)
+        shutil.rmtree(acronym)
+        self.assertEqual(jobStatus, "finished")
+        
+    def test_runSingleJob_catch_error_wrong_tmpDir(self):
+        acronym = "Test_runSingleJob_catch_error"
+        os.mkdir(acronym)
+        os.chdir(acronym)
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), "%s/toto" % self._tmpDir, "", self._queue, self._groupid, acronym)
+        iLauncher.job.groupid = self._groupid
+        iLauncher.job.jobname = acronym
+        iLauncher.job.queue = self._queue
+        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
+            iLauncher.job.lResources = ["test=TRUE"]
+        cmd = "log = os.system(\"touch 'YuFei'\")\n"
+        iLauncher.runSingleJob(cmd)
+        time.sleep(20)
+        jobStatus = self._jobdb.getJobStatus(iLauncher.job) 
+        os.chdir(self._cDir)
+        shutil.rmtree(acronym)
+        self.assertEqual(jobStatus, "error")
+        
+    def test_runSingleJob_catch_error_wrong_cmd(self):
+        acronym = "Test_runSingleJob_catch_error"
+        os.mkdir(acronym)
+        os.chdir(acronym)
+        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
+        iLauncher.job.groupid = self._groupid
+        iLauncher.job.jobname = acronym
+        iLauncher.job.queue = self._queue
+        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
+            iLauncher.job.lResources = ["test=TRUE"]
+        cmd = "log = os.system(\"truc -i toto\")\n"
+        iLauncher.runSingleJob(cmd)
+        time.sleep(20)
+        jobStatus = self._jobdb.getJobStatus(iLauncher.job) 
+        self._jobdb.cleanJobGroup(self._groupid)
+        os.chdir(self._cDir)
+        shutil.rmtree(acronym)
+        self.assertEqual(jobStatus, "error")
+
+    def test_prepareCommands(self):
+        expCmdStart = "os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")\n\tos.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")\n\tlog = os.system(\"touch file\")\n\t" 
+        expCmdFinish = "if os.path.exists(\"yufei.align\"):\n\t\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )\n\t"
+        expCmdSize = "fileSize = 3.2\n\t\t"
+        expCmdCopy = "shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")\n\t\tshutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")\n\t\t"
+        
+        lCmdStart = []
+        lCmdStart.append("os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")")
+        lCmdStart.append("os.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")")
+        lCmds = []
+        lCmds.append("log = os.system(\"touch file\")")
+        lCmdFinish = []
+        lCmdFinish.append("if os.path.exists(\"yufei.align\"):")
+        lCmdFinish.append("\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )") 
+        lCmdSize = []
+        lCmdSize.append("fileSize = 3.2")    
+        lCmdCopy = []
+        lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")")
+        lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")")
+
+        iLauncher = Launcher(self._jobdb)
+        obsCmdStart, obsCmdFinish, obsCmdSize, obsCmdCopy = iLauncher.prepareCommands(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy)         
+        
+        self.assertEquals(expCmdStart, obsCmdStart)
+        self.assertEquals(expCmdFinish, obsCmdFinish)      
+        self.assertEquals(expCmdSize, obsCmdSize)
+        self.assertEquals(expCmdCopy, obsCmdCopy)
+        
+    def test_getSystemCommand(self):
+        prg = "touch"
+        lArgs = []
+        lArgs.append("file")
+        expCmd = "log = os.system(\"touch file\")"
+        iLauncher = Launcher(self._jobdb)
+        obsCmd = iLauncher.getSystemCommand(prg, lArgs)
+        self.assertEquals(expCmd, obsCmd)
+
+
+test_suite = unittest.TestSuite()
+test_suite.addTest( unittest.makeSuite( Test_Launcher ) )
+if __name__ == "__main__":
+        unittest.TextTestRunner(verbosity=2).run( test_suite )