view commons/core/sql/test/Test_F_TableJobAdaptator.py @ 31:0ab839023fe4

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 14:33:21 -0400
parents 769e306b7933
children
line wrap: on
line source

from commons.core.launcher.WriteScript import WriteScript
from commons.core.sql.Job import Job
from commons.core.sql.DbFactory import DbFactory
from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
import sys
import stat
import os
import time
import unittest
import glob

class Test_F_TableJobAdaptator(unittest.TestCase):

    def setUp(self):
        self._jobTableName = "dummyJobTable"
        self._db = DbFactory.createInstance()
        self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)

    def tearDown(self):
        self._db.dropTable(self._jobTableName)
        self._db.close()
    
    def test_submitJob_with_multiple_jobs(self):
        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
        job1 = _createJobInstance("job1")
        _createLauncherFile(job1, self._iTJA)
        job2 = _createJobInstance("job2")
        _createLauncherFile(job2, self._iTJA)
        job3 = _createJobInstance("job3")
        _createLauncherFile(job3, self._iTJA)
        
        self._iTJA.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
        self._iTJA.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
        self._iTJA.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )

        time.sleep(120)
        
        expJobStatus = "finished"
        obsJobStatus1 = self._iTJA.getJobStatus(job1)
        obsJobStatus2 = self._iTJA.getJobStatus(job2)
        obsJobStatus3 = self._iTJA.getJobStatus(job3)
        
        self.assertEquals(expJobStatus, obsJobStatus1)
        self.assertEquals(expJobStatus, obsJobStatus2)
        self.assertEquals(expJobStatus, obsJobStatus3)
        
        expErrorFilePrefix1 = job1.jobname + ".e" 
        expOutputFilePrefix1 = job1.jobname + ".o"
        expErrorFilePrefix2 = job2.jobname + ".e" 
        expOutputFilePrefix2 = job2.jobname + ".o"
        expErrorFilePrefix3 = job3.jobname + ".e" 
        expOutputFilePrefix3 = job3.jobname + ".o"
        
        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
        lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
        lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
        lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
        lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
        
        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
        isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) 
        isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
        isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) 
        isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
        
        os.system("rm launcherFileTest*.py *.e* *.o*")
        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
        self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
        self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)

    def test_submitJob_job_already_submitted(self):
        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
        iJob = _createJobInstance("job")
        self._iTJA.recordJob(iJob)
        
        isSysExitRaised = False
        try:
            self._iTJA.submitJob(iJob)
        except SystemExit:
            isSysExitRaised = True
        self.assertTrue(isSysExitRaised)
    
    def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
        iJob = _createJobInstance("job")
        _createLauncherFile(iJob, self._iTJA)
        
        self._iTJA.recordJob(iJob)
        self._iTJA.changeJobStatus(iJob, "error")
        
        self._iTJA.waitJobGroup(iJob.groupid, 0, 2)
        
        time.sleep(120)
        
        expJobStatus = "finished"
        obsJobStatus1 = self._iTJA.getJobStatus(iJob)
        
        self.assertEquals(expJobStatus, obsJobStatus1)
        
        expErrorFilePrefix1 = iJob.jobname + ".e" 
        expOutputFilePrefix1 = iJob.jobname + ".o"
        
        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
        
        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
        
        self._iTJA.removeJob(iJob) 
        os.system("rm launcherFileTest*.py *.e* *.o*")
        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)

class Test_F_TableJobAdaptator_SGE(unittest.TestCase):

    def setUp(self):
        if os.environ["REPET_JOB_MANAGER"].lower() != "sge":
            print "ERROR: jobs manager is not SGE: REPET_JOB_MANAGER = %s." % os.environ["REPET_JOB_MANAGER"]
            sys.exit(0)
        self._jobTableName = "dummyJobTable"
        self._db = DbFactory.createInstance()
        self._db.createTable(self._jobTableName, "jobs", overwrite = True)
        self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
        self._iJob = _createJobInstance("job")
        _createLauncherFile(self._iJob, self._iTJA)

    def tearDown(self):
        self._db.dropTable(self._jobTableName)
        self._db.close()

    def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
        self._iTJA.recordJob(self._iJob)
        self._iTJA.changeJobStatus(self._iJob, "running")
        
        expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % self._iJob.jobid
        
        obsError = "obsError.txt"
        obsErrorHandler = open(obsError, "w")
        stderrRef = sys.stderr
        sys.stderr = obsErrorHandler
        
        isSysExitRaised = False
        try:
            self._iTJA.waitJobGroup(self._iJob.groupid, timeOutPerJob = 3)
        except SystemExit:
            isSysExitRaised = True
           
        obsErrorHandler.close()
        
        obsErrorHandler = open(obsError, "r")
        obsMsg = obsErrorHandler.readline()
        obsErrorHandler.close()
       
        sys.stderr = stderrRef
        os.remove(obsError)
        os.system("rm launcherFileTest*.py")
        self.assertTrue(isSysExitRaised)
        self.assertEquals(expMsg, obsMsg)
         
    def test_isJobStillHandledBySge_True(self):
        self._iTJA.submitJob(self._iJob)
        isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
        os.system("rm launcherFileTest*.py")
        self.assertTrue(isJobHandledBySge)

    def test_isJobStillHandledBySge_False(self):
        self._iTJA.recordJob(self._iJob)
        isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
        os.system("rm launcherFileTest*.py")
        self.assertFalse(isJobHandledBySge)

def _createJobInstance(name):
    lResources = []
    if os.environ.get("HOSTNAME") == "compute-2-46.local":
        lResources.append("test=TRUE")
    return Job(0, name, "test", "", "log = os.system(\"date;sleep 5;date\")", "%s/launcherFileTest_%s.py" % (os.getcwd(), name), lResources=lResources)

def _createLauncherFile(iJob, iTJA):
    iWriteScript = WriteScript(iJob, iTJA, os.getcwd(), os.getcwd())
    iWriteScript.run(iJob.command, "", iJob.launcher)
    os.chmod(iJob.launcher, stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO)
        
if __name__ == "__main__":
    unittest.main()