view smart_toolShed/commons/core/launcher/test/Test_WriteScript.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line source

from commons.core.utils.FileUtils import FileUtils
from commons.core.launcher.WriteScript import WriteScript
from commons.core.sql.Job import Job
from commons.core.sql.DbFactory import DbFactory
from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
import unittest
import os
import shutil
import time
import threading

class Test_WriteScript(unittest.TestCase):

    def setUp(self):
        self._testDir = os.getcwd()
        self._acronym = "dummyAcronym"
        self._jobTable = "dummyJobsTable"
        self._iDb = DbFactory.createInstance()
        self._iDb.createTable(self._jobTable, "jobs", overwrite = True)
        self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable)
        self._job = Job()
        self._job.groupid = "groupid"
        self._job.jobname = self._acronym
        self._job.launcher = "ClusterLauncher"
        self._jobdb.recordJob(self._job)
        self._dummyScratch = "dummyScratch"
        os.mkdir(self._dummyScratch)
        os.chdir(self._dummyScratch)
        self._tmpDir = os.getcwd()
        self._iScriptWriter = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir)
        
    def tearDown(self):
        self._iDb.dropTable(self._jobTable)
        self._iDb.close()
        if FileUtils.isRessourceExists(self._dummyScratch):
            shutil.rmtree(self._dummyScratch)

    def test_run(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        cmdStart = "log = os.system( \"touch %s\" )\n" % fileToCreate
        cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
        os.system("python %s" % pyFileName)

        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        expJobStatus = "finished"    
        obsJobStatus = self._jobdb.getJobStatus(self._job)
            
        self.assertTrue(isScriptAsRun)
        self.assertEquals(expJobStatus, obsJobStatus)
        
    def test_run_with_cmdSize_and_cmdCopy(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        fileSize = 0.5
        cmdSize = "fileSize = %f\n" % fileSize
        cmdCopy = "os.system(\"touch bank.fa\")\n"
        cmdStart = "log = os.system(\"touch %s\")\n" % fileToCreate
        cmdFinish = "shutil.move(\"%s\", \"%s\")" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        iWriteScript = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir, True)
        iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
        os.system("python %s" % pyFileName)

        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        expJobStatus = "finished"    
        obsJobStatus = self._jobdb.getJobStatus(self._job)
            
        self.assertTrue(isScriptAsRun)
        self.assertEquals(expJobStatus, obsJobStatus)

#TODO: how to test ?
#    def test_run_2_jobs_trying_to_create_same_groupIdDir(self):
#        fileToCreate1 = 'dummyFile1'
#        fileToCreate2 = 'dummyFile2'
#        flagFileOSError = "osErrorRaised"
#        
#        fileSize = 0.5
#        cmd_checkSize = ""
#        cmd_checkSize += "if not os.path.exists( \"%s\" ):\n" % self._job.groupid
#        cmd_checkSize += "\tfileSize = %f\n" % fileSize
#        
#        cmd_checkGroupidDir1 = ""
#        cmd_checkGroupidDir1 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\ttry:\n"
#        cmd_checkGroupidDir1 += "\t\ttime.sleep(10)\n"
#        cmd_checkGroupidDir1 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\texcept OSError, e :\n"
#        cmd_checkGroupidDir1 += "\t\tos.system(\"touch %s\")\n" % flagFileOSError
#        cmd_checkGroupidDir1 += "\t\tif e.args[0] != 17:\n"
#        cmd_checkGroupidDir1 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\tos.system(\"touch bank.fa\")\n" #cp
#        cmd_checkGroupidDir1 += "else:\n"
#        cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        
#        cmdStart1 = "log = os.system(\"touch %s\")\n" % fileToCreate1
#        cmdFinish1 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate1, self._testDir)
#        pyFileName1 = "%s/ClusterLauncher1_job1.py" % os.getcwd()
#       
#        cmd_checkGroupidDir2 = ""
#        cmd_checkGroupidDir2 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\ttry:\n"
#        cmd_checkGroupidDir2 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\texcept OSError, e :\n"
#        cmd_checkGroupidDir2 += "\t\tif e.args[0] != 17:\n"
#        cmd_checkGroupidDir2 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\tos.system(\"touch bank.fa\")\n" #cp
#        cmd_checkGroupidDir2 += "else:\n"
#        cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        
#        cmdStart2 = "log = os.system(\"touch %s\")\n" % fileToCreate2
#        cmdFinish2 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate2, self._testDir)
#        pyFileName2 = "%s/ClusterLauncher2_job2.py" % os.getcwd()
#            
#        job1 = Job(self._jobTable, jobname = "job1", groupid = self._job.groupid)
#        self._jobdb.recordJob(job1)
#        job2 = Job(self._jobTable, jobname = "job2", groupid = self._job.groupid)
#        self._jobdb.recordJob(job2)
#        iScriptWriter1 = WriteScript(job1, self._jobdb, self._testDir, self._tmpDir)
#        iScriptWriter1.run(cmdStart1, cmdFinish1, pyFileName1, cmd_checkSize, cmd_checkGroupidDir1)
#        iScriptWriter2 = WriteScript(job2, self._jobdb, self._testDir, self._tmpDir)
#        iScriptWriter2.run(cmdStart2, cmdFinish2, pyFileName2, cmd_checkSize, cmd_checkGroupidDir2)
#    
#        iCFT1 = CreateFileThread(pyFileName1)
#        iCFT2 = CreateFileThread(pyFileName2)
#        iCFT1.start()
#        iCFT2.start()
#        while iCFT1.isAlive() or iCFT2.isAlive():
#            time.sleep(5)
#        self.assertTrue(FileUtils.isRessourceExists(flagFileOSError))
#        os.chdir(self._testDir)
#        
#        if FileUtils.isRessourceExists(fileToCreate1):
#            os.remove(fileToCreate1)
#            
#        if FileUtils.isRessourceExists(fileToCreate2):            
#            os.remove(fileToCreate2)
    
    def test_run_2_lines_in_cmd_start(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        
        cmdStart = "log = 0\n\t"
        cmdStart += "if True:\n\t"
        cmdStart += "\tos.system( \"touch dummyFile\" )\n"
        cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
        os.system("python %s" % pyFileName)
        
        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        self.assertTrue(isScriptAsRun)

    def test_run_2_lines_in_cmd_finish(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        
        cmdStart = "log = 0\n\t"
        cmdStart += "if True:\n\t"
        cmdStart += "\tos.system( \"touch dummyFile\" )\n"
        cmdFinish = "if True:\n\t"
        cmdFinish += "\tos.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
        os.system("python %s" % pyFileName)
        
        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        self.assertTrue(isScriptAsRun)
        
    def test_fillTemplate_with_JobScriptTemplate(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "repet_host" : "pisano",
             "repet_user" : "user",
             "repet_pw" : "user",
             "repet_db" : "repet_user",
             "repet_port" : "3306",
             "cmdStart" : "log = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/"
             }
        expFileName = "expFiles/expJobScriptTemplate.py"
        obsFileName = "obsFile.py"
        
        iWS = WriteScript()
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)
        
    def test_fillTemplate_with_JobScriptTemplate_2_lines_in_cmd_start(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "repet_host" : "pisano",
             "repet_user" : "user",
             "repet_pw" : "user",
             "repet_db" : "repet_user",
             "repet_port" : "3306",
             "cmdStart" : "print \"Hello Yufei\"\n\tlog = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/"
             }
        expFileName = "expFiles/expJobScriptTemplate_cmdWith2Lines.py"
        obsFileName = "obsFile.py"
        
        iWS = WriteScript()
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)
        
    def test_fillTemplate_with_JobScriptWithFilesCopyTemplate(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "repet_host" : "pisano",
             "repet_user" : "user",
             "repet_pw" : "user",
             "repet_db" : "repet_user",
             "repet_port" : "3306",
             "cmdStart" : "log = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/",
             "cmdSize" : "fileSize = 0.500000",
             "cmdCopy" : "os.system(\"touch bank.fa\")"
             }
        expFileName = "expFiles/expJobScriptWithFilesCopyTemplate.py"
        obsFileName = "obsFile.py"
        
        iWS = WriteScript(chooseTemplateWithCopy = True)
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)

    def test_fillTemplate_with_JobScriptTemplateLight(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "cmdStart" : "log = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/",
             "cmdSize" : "fileSize = 0.500000",
             "cmdCopy" : "os.system(\"touch bank.fa\")"
             }
        expFileName = "expFiles/expJobScriptTemplateLight.py"
        obsFileName = "obs.py"
        
        iWS = WriteScript(chooseTemplateLight = True)
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)
        
    def test_createJobScriptDict(self):
        os.chdir("..")
        cmd_start = "log = os.system(\"touch dummyFile1\")"
        cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")"
        cmd_size = ""
        cmd_copy = ""
        expDict = {
             "tmpDir" : self._tmpDir,
             "jobTableName" : self._jobTable,
             "groupId" : self._job.groupid,
             "jobName" : self._acronym,
             "launcher" : self._job.launcher,
             "time" : time.strftime("%Y%m%d-%H%M%S"),
             "repet_path" : os.environ["REPET_PATH"],
             "repet_host" : os.environ["REPET_HOST"],
             "repet_user" : os.environ["REPET_USER"],
             "repet_pw" : os.environ["REPET_PW"],
             "repet_db" : os.environ["REPET_DB"],
             "repet_port" : os.environ["REPET_PORT"],
             "cmdStart" : cmd_start,
             "cmdFinish" : cmd_finish,
             "cDir" : self._testDir,
             "cmdSize" : cmd_size,
             "cmdCopy" : cmd_copy
             }
        obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy)
        self.assertEquals(expDict, obsDict)
        
    def test_createJobScriptDict_with_cmdSize_and_cmdCopy(self):
        os.chdir("..")
        cmd_start = "log = os.system(\"touch dummyFile1\")"
        cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")"
        cmd_size = "fileSize = 0.500000"
        cmd_copy = "os.system(\"touch bank.fa\")"
        expDict = {
             "tmpDir" : self._tmpDir,
             "jobTableName" : self._jobTable,
             "groupId" : self._job.groupid,
             "jobName" : self._acronym,
             "launcher" : self._job.launcher,
             "time" : time.strftime("%Y%m%d-%H%M%S"),
             "repet_path" : os.environ["REPET_PATH"],
             "repet_host" : os.environ["REPET_HOST"],
             "repet_user" : os.environ["REPET_USER"],
             "repet_pw" : os.environ["REPET_PW"],
             "repet_db" : os.environ["REPET_DB"],
             "repet_port" : os.environ["REPET_PORT"],
             "cmdStart" : cmd_start,
             "cmdFinish" : cmd_finish,
             "cDir" : self._testDir,
             "cmdSize" : cmd_size,
             "cmdCopy" : cmd_copy
             }
        obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy)
        self.assertEquals(expDict, obsDict)
        
class CreateFileThread(threading.Thread):

    def __init__(self, pyFileName):
        threading.Thread.__init__(self)
        self._pyFileName = pyFileName
        
    def run(self):
        os.system("python %s" % self._pyFileName)

test_suite = unittest.TestSuite()
test_suite.addTest( unittest.makeSuite( Test_WriteScript ) )
if __name__ == "__main__":
        unittest.TextTestRunner(verbosity=2).run( test_suite )