view smart_toolShed/commons/core/launcher/test/Test_Launcher.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line source

from commons.core.utils.FileUtils import FileUtils
from commons.core.launcher.Launcher import Launcher
from commons.core.launcher.WriteScript import WriteScript
from commons.core.stat.Stat import Stat
from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
from commons.core.sql.DbFactory import DbFactory
from commons.core.sql.Job import Job
import unittest
import os
import shutil
import time
import stat

#TODO: Test_F_Launcher.py : to execute prepareCommands() and runSingleJob()
#                            to test runLauncherForMultipleJobs()
#TODO: check clean of "Test_runSingleJob"
#TODO: refactoring => choose between "self._queue" or "lResources" to set resources
class Test_Launcher(unittest.TestCase):

    SARUMAN_NAME = "compute-2-46.local"
    
    def setUp(self):
        self._cDir = os.getcwd()
        self._tmpDir = self._cDir
        self._groupid = "test"
        self._jobTable = "dummyJobTable"
        self._iDb = DbFactory.createInstance()
        self._iDb.createTable(self._jobTable, "jobs", overwrite = True)
        self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable)
        self._queue = ""
        self._configFileName = "dummyConfigFile"
    
    def tearDown(self):
        self._iDb.dropTable(self._jobTable)
        self._iDb.close()
        FileUtils.removeFilesByPattern('*.e*')
        FileUtils.removeFilesByPattern('*.o*')
        FileUtils.removeFilesByPattern('launcherFileTest_BeginRun.py')
        FileUtils.removeFilesByPattern(self._configFileName)
        FileUtils.removeFilesByPattern('ClusterLauncher_*')
        
    def test__init__wrong_fields_for_job_table(self):
        self._iDb.dropTable(self._jobTable)
        sqlCmd = "CREATE TABLE " + self._jobTable 
        sqlCmd += " ( jobid INT UNSIGNED"
        sqlCmd += ", jobname VARCHAR(255)"
        sqlCmd += ", groupid VARCHAR(255)"
        sqlCmd += ", command TEXT"
        sqlCmd += ", launcher VARCHAR(1024)"
        sqlCmd += ", queue VARCHAR(255)"
        sqlCmd += ", status VARCHAR(255)"
        sqlCmd += ", time DATETIME"
        sqlCmd += ", node VARCHAR(255) )"
        self._iDb.execute(sqlCmd)
        acronym = "Test__init__"
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
        lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"])
        lObsFields = sorted(self._iDb.getFieldList(self._jobTable))
        self.assertEquals(lExpFields, lObsFields)
        expJob = Job(queue = self._queue)
        obsJob = iLauncher.job
        self.assertEquals(expJob, obsJob)
        
    def test__init__withResources(self):
        queue = "main.q mem_free=3G"
        acronym = "Test__init__"
        expQueue = "main.q"
        explResources = ['mem_free=3G']
        expJob = Job(queue = expQueue, lResources = explResources)
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", queue, self._groupid, acronym)
        obsJob = iLauncher.job
        self.assertEquals(expJob, obsJob)

    def test_createGroupidIfItNotExist(self):
        acronym = "checkGroupID"
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
        iLauncher.createGroupidIfItNotExist()
        obsGroupid = iLauncher.job.groupid
        self.assertEquals(self._groupid, obsGroupid)

    def test_createGroupidIfItNotExist_without_groupid(self):
        groupid = ""
        acronym = "checkGroupID"
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, groupid, acronym)
        iLauncher.createGroupidIfItNotExist()
        obsGroupid = iLauncher.job.groupid
        self.assertTrue(obsGroupid != "")
        
    def test_beginRun_with_Job_finished_in_Table(self):
        acronym = "BeginRun"
        iJob = Job(queue = self._queue)
        self._jobdb.recordJob(iJob)
        self._jobdb.changeJobStatus(iJob, "finished")
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
        iLauncher.beginRun()
        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
        
    def test_beginRun_with_Job_unfinished_in_Table(self):
        acronym = "testU_BeginRun"
        cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
        pyFileName = "%s/launcherFileTest_BeginRun.py" % os.getcwd()
        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
        else: 
            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
        iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
        iWriteScript.run(cmd_start, "", pyFileName)
        os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
        self._jobdb.submitJob(iJob)
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
        
        iLauncher.beginRun()
        
        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 1)
    
    def test_getStatsOfExecutionTime(self):
        acronym = "test_statTime"
        
        expLValues = [1000.00000, 1000.00000]
        expStat = Stat(expLValues) 
        
        f = open(acronym +".o1", "w")
        f.write("executionTime=1000.000000")
        f.close()
        f = open(acronym +".o2", "w")
        f.write("executionTime=1000.000000")
        f.close()
        
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
        obsStat = iLauncher.getStatsOfExecutionTime(acronym)
        
        self.assertEqual(expStat, obsStat)           
           
    def test_endRun(self):
        acronym = "testU_EndRun"
        cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
        pyFileName = "%s/launcherFileTest_EndRun.py" % os.getcwd()
        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
        else: 
            iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
 
        iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
        iWriteScript.run(cmd_start, "", pyFileName)
        os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
        self._jobdb.submitJob(iJob)
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
        iLauncher.job.groupid = self._groupid
        iLauncher.endRun()
        
        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "error") == 0)
        self.assertTrue(self._jobdb.getCountStatus(self._groupid, "waiting") == 0)    
           
        os.remove(iJob.launcher)

    def test_clean(self):
        acronym = "test_clean"
        f = open("ClusterLauncher" + acronym + ".py", "w")
        f.close()
        f = open(acronym + ".o1", "w")
        f.close()
        f = open(acronym + ".e1", "w")
        f.close()
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
        iLauncher.clean(acronym)
        self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym + ".py"))

    def test_clean_without_acronym(self):
        acronym = ""
        acronym2 = "toto"
        f = open("ClusterLauncher" + acronym2 + ".py", "w")
        f.close()
        f = open(acronym2 + ".o1", "w")
        f.close()
        f = open(acronym2 + ".e1", "w")
        f.close()
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym2)
        iLauncher.clean(acronym)
        self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym2 + ".py"))
        
    def test_getQueueNameAndResources_queue_no_resource(self):
        configQueue = "all.q"
        expQueueName = "all.q"
        expResources = []
        iLauncher = Launcher(self._jobdb)
        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
        self.assertEquals(expQueueName, obsQueueName)
        self.assertEquals(expResources, obsResources)
        
    def test_getQueueNameAndResources_queue_one_resource(self):
        configQueue = "test.q 'test=TRUE'"
        expQueueName = "test.q"
        expResources = ["test=TRUE"]
        iLauncher = Launcher(self._jobdb)
        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
        self.assertEquals(expQueueName, obsQueueName)
        self.assertEquals(expResources, obsResources)
        
    def test_getQueueNameAndResources_queue_two_resources(self):
        configQueue = "big.q 's_data=8G s_cpu=96:00:00'"
        expQueueName = "big.q"
        expResources = ["s_data=8G", "s_cpu=96:00:00"]
        iLauncher = Launcher(self._jobdb)
        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
        self.assertEquals(expQueueName, obsQueueName)
        self.assertEquals(expResources, obsResources)
        
    def test_getQueueNameAndResources_no_queue_no_resource(self):
        configQueue = ""
        expQueueName = ""
        expResources = []
        iLauncher = Launcher(self._jobdb)
        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
        self.assertEquals(expQueueName, obsQueueName)
        self.assertEquals(expResources, obsResources)
        
    def test_getQueueNameAndResources_no_queue_one_resource(self):
        configQueue = "s_data=8G"
        expQueueName = ""
        expResources = ["s_data=8G"]
        iLauncher = Launcher(self._jobdb)
        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
        self.assertEquals(expQueueName, obsQueueName)
        self.assertEquals(expResources, obsResources)
        
    def test_getQueueNameAndResources_no_queue_two_resource(self):
        configQueue = "s_data=8G s_cpu=96:00:00"
        expQueueName = ""
        expResources = ["s_data=8G", "s_cpu=96:00:00"]
        iLauncher = Launcher(self._jobdb)
        obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
        self.assertEquals(expQueueName, obsQueueName)
        self.assertEquals(expResources, obsResources)      

#   #TODO: test with at least 2 lines in cmd
    def test_runSingleJob(self):
        acronym = "Test_runSingleJob"
        os.mkdir(acronym)
        os.chdir(acronym)
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
        iLauncher.job.groupid = self._groupid
        iLauncher.job.jobname = acronym
        iLauncher.job.queue = self._queue
        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
            iLauncher.job.lResources = ["test=TRUE"]
        cmd = "log = os.system(\"touch 'YuFei'\")\n"
        iLauncher.runSingleJob(cmd)
        time.sleep(20)
        jobStatus = self._jobdb.getJobStatus(iLauncher.job)
        os.chdir(self._cDir)
        shutil.rmtree(acronym)
        self.assertEqual(jobStatus, "finished")
        
    def test_runSingleJob_catch_error_wrong_tmpDir(self):
        acronym = "Test_runSingleJob_catch_error"
        os.mkdir(acronym)
        os.chdir(acronym)
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), "%s/toto" % self._tmpDir, "", self._queue, self._groupid, acronym)
        iLauncher.job.groupid = self._groupid
        iLauncher.job.jobname = acronym
        iLauncher.job.queue = self._queue
        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
            iLauncher.job.lResources = ["test=TRUE"]
        cmd = "log = os.system(\"touch 'YuFei'\")\n"
        iLauncher.runSingleJob(cmd)
        time.sleep(20)
        jobStatus = self._jobdb.getJobStatus(iLauncher.job) 
        os.chdir(self._cDir)
        shutil.rmtree(acronym)
        self.assertEqual(jobStatus, "error")
        
    def test_runSingleJob_catch_error_wrong_cmd(self):
        acronym = "Test_runSingleJob_catch_error"
        os.mkdir(acronym)
        os.chdir(acronym)
        iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
        iLauncher.job.groupid = self._groupid
        iLauncher.job.jobname = acronym
        iLauncher.job.queue = self._queue
        if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
            iLauncher.job.lResources = ["test=TRUE"]
        cmd = "log = os.system(\"truc -i toto\")\n"
        iLauncher.runSingleJob(cmd)
        time.sleep(20)
        jobStatus = self._jobdb.getJobStatus(iLauncher.job) 
        self._jobdb.cleanJobGroup(self._groupid)
        os.chdir(self._cDir)
        shutil.rmtree(acronym)
        self.assertEqual(jobStatus, "error")

    def test_prepareCommands(self):
        expCmdStart = "os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")\n\tos.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")\n\tlog = os.system(\"touch file\")\n\t" 
        expCmdFinish = "if os.path.exists(\"yufei.align\"):\n\t\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )\n\t"
        expCmdSize = "fileSize = 3.2\n\t\t"
        expCmdCopy = "shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")\n\t\tshutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")\n\t\t"
        
        lCmdStart = []
        lCmdStart.append("os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")")
        lCmdStart.append("os.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")")
        lCmds = []
        lCmds.append("log = os.system(\"touch file\")")
        lCmdFinish = []
        lCmdFinish.append("if os.path.exists(\"yufei.align\"):")
        lCmdFinish.append("\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )") 
        lCmdSize = []
        lCmdSize.append("fileSize = 3.2")    
        lCmdCopy = []
        lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")")
        lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")")

        iLauncher = Launcher(self._jobdb)
        obsCmdStart, obsCmdFinish, obsCmdSize, obsCmdCopy = iLauncher.prepareCommands(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy)         
        
        self.assertEquals(expCmdStart, obsCmdStart)
        self.assertEquals(expCmdFinish, obsCmdFinish)      
        self.assertEquals(expCmdSize, obsCmdSize)
        self.assertEquals(expCmdCopy, obsCmdCopy)
        
    def test_getSystemCommand(self):
        prg = "touch"
        lArgs = []
        lArgs.append("file")
        expCmd = "log = os.system(\"touch file\")"
        iLauncher = Launcher(self._jobdb)
        obsCmd = iLauncher.getSystemCommand(prg, lArgs)
        self.assertEquals(expCmd, obsCmd)


test_suite = unittest.TestSuite()
test_suite.addTest( unittest.makeSuite( Test_Launcher ) )
if __name__ == "__main__":
        unittest.TextTestRunner(verbosity=2).run( test_suite )