view smart_toolShed/commons/core/sql/test/Tst_F_RepetJob.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line source

import os
import time
import sys
import stat
import unittest
import glob
from commons.core.sql.DbMySql import DbMySql
from commons.core.sql.RepetJob import RepetJob
from commons.core.sql.Job import Job

class Test_F_RepetJob(unittest.TestCase):

    def setUp(self):
        self._jobTableName = "dummyJobTable"
        self._db = DbMySql()
        self._iRepetJob = RepetJob()
        self._configFileName = "dummyConfigFile"
        configF = open(self._configFileName, "w" )
        configF.write( "[repet_env]\n" )
        configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
        configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
        configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
        configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
        configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
        configF.close()

    def tearDown(self):
        self._iRepetJob = None
        self._db.dropTable( self._jobTableName )
        self._db.close()
        os.remove(self._configFileName)
    
    def test_submitJob_with_multiple_jobs(self):
        job1 = self._createJobInstance("job1")
        self._createLauncherFile(job1)

        job2 = self._createJobInstance("job2")
        self._createLauncherFile(job2)

        job3 = self._createJobInstance("job3")
        self._createLauncherFile(job3)
        
        self._iRepetJob.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
        self._iRepetJob.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
        self._iRepetJob.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )

        time.sleep(70)
        
        expJobStatus = "finished"
        obsJobStatus1 = self._iRepetJob.getJobStatus(job1)
        obsJobStatus2 = self._iRepetJob.getJobStatus(job2)
        obsJobStatus3 = self._iRepetJob.getJobStatus(job3)
        
        self.assertEquals(expJobStatus, obsJobStatus1)
        self.assertEquals(expJobStatus, obsJobStatus2)
        self.assertEquals(expJobStatus, obsJobStatus3)
        
        jobName1 = job1.jobname
        jobName2 = job2.jobname
        jobName3 = job3.jobname
        
        expErrorFilePrefix1 = jobName1+ ".e" 
        expOutputFilePrefix1 = jobName1 + ".o"
        expErrorFilePrefix2 = jobName2 + ".e" 
        expOutputFilePrefix2 = jobName2 + ".o"
        expErrorFilePrefix3 = jobName3 + ".e" 
        expOutputFilePrefix3 = jobName3 + ".o"
        
        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
        lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
        lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
        lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
        lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
        
        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
        isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) 
        isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
        isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) 
        isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
        
        os.system("rm launcherFileTest*.py *.e* *.o*")
        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
        self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
        self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)

    def test_submitJob_job_already_submitted(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance("job")
        self._iRepetJob.recordJob(iJob)
        
        isSysExitRaised = False
        try:
            self._iRepetJob.submitJob(iJob)
        except SystemExit:
            isSysExitRaised = True
        self.assertTrue(isSysExitRaised)
    
    def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance("job")
        self._createLauncherFile(iJob)
        self._iRepetJob.recordJob(iJob)
        self._iRepetJob.changeJobStatus(iJob, "running", "method")
        
        expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % ( iJob.jobid )
        
        obsError = "obsError.txt"
        obsErrorHandler = open(obsError, "w")
        stderrRef = sys.stderr
        sys.stderr = obsErrorHandler
        
        isSysExitRaised = False
        try:
            self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, timeOutPerJob=3)
        except SystemExit:
            isSysExitRaised = True
           
        obsErrorHandler.close()
        
        obsErrorHandler = open(obsError, "r")
        obsMsg = obsErrorHandler.readline()
        obsErrorHandler.close()
       
        sys.stderr = stderrRef
        os.remove(obsError)
        os.system("rm launcherFileTest*.py")
        self.assertTrue(isSysExitRaised)
        self.assertEquals(expMsg, obsMsg)
         
    def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance("job")
        self._createLauncherFile(iJob)
        
        self._iRepetJob.recordJob(iJob)
        self._iRepetJob.changeJobStatus(iJob, "error", "method")
        
        self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 2)
        
        time.sleep(10)
        
        expJobStatus = "finished"
        obsJobStatus1 = self._iRepetJob.getJobStatus(iJob)
        
        self.assertEquals(expJobStatus, obsJobStatus1)
        
        jobName = iJob.jobname
        
        expErrorFilePrefix1 = jobName + ".e" 
        expOutputFilePrefix1 = jobName + ".o"
        
        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
        
        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
        
        self._iRepetJob.removeJob(iJob) 
        os.system("rm launcherFileTest*.py *.e* *.o*")
        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
        

    def test_isJobStillHandledBySge_True(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance("job")
        self._createLauncherFile(iJob)
        self._iRepetJob.submitJob(iJob)
        
        isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
        os.system("rm launcherFileTest*.py")
        
        self.assertTrue(isJobHandledBySge)

    def test_isJobStillHandledBySge_False(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance("job")
        self._createLauncherFile(iJob)
        self._iRepetJob.recordJob(iJob)
        
        isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
        os.system("rm launcherFileTest*.py")
        
        self.assertFalse(isJobHandledBySge)
        
    def _createJobInstance(self, name):
        return Job(self._jobTableName, 0, name, "test", "", "date;sleep 5;date", "./launcherFileTest_"+ name +".py")
    
    def _createLauncherFile(self, iJob):
        jobFileHandler = open( iJob.launcher , "w" )

        launcher = "#!/usr/bin/python\n"
        launcher += "import os\n"
        launcher += "import sys\n"
        
        launcher += "print \"system:\", os.uname()\n"
        launcher += "sys.stdout.flush()\n"
        newStatus = "running"
        prg = "%s/bin/srptChangeJobStatus.py" % (os.environ["REPET_PATH"])
        cmd = prg
        cmd += " -t %s" % ( iJob.tablename )
        cmd += " -n %s" % ( iJob.jobname )
        cmd += " -g %s" % ( iJob.groupid )
        if iJob.queue != "":
            cmd += " -q %s" % ( iJob.queue )
        cmd += " -s %s" % ( newStatus )
        cmd += " -c %s"  %( self._configFileName )
        cmd += " -v 1"
        launcher +="os.system( \"" + cmd + "\" )\n"
        
        launcher += "print \"LAUNCH: "+ iJob.command + "\"\n"
        launcher += "sys.stdout.flush()\n"
        launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n"
        launcher += "if exitStatus != 0:\n"
        launcher += "\tprint \"ERROR: "+  iJob.command + " returned exit status '%i'\" % ( exitStatus )\n"
        
        newStatus = "finished"
        prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py"
        cmd = prg
        cmd += " -t %s" % ( iJob.tablename )
        cmd += " -n %s" % ( iJob.jobname )
        cmd += " -g %s" % ( iJob.groupid )
        if iJob.queue != "":
            cmd += " -q %s" % ( iJob.queue )
        cmd += " -s %s" % ( newStatus )
        cmd += " -c %s"  %( self._configFileName )
        cmd += " -v 1"
        launcher +="os.system( \"" + cmd + "\" )\n"
        launcher += "sys.exit(0)\n"
        jobFileHandler.write(launcher)
        jobFileHandler.close()
        os.chmod( iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC )

if __name__ == "__main__":
    unittest.main()