view commons/core/sql/test/Tst_RepetJob.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line source

import unittest
import sys
import os
import time
from commons.core.sql.DbMySql import DbMySql
from commons.core.sql.Job import Job
from commons.core.sql.RepetJob import RepetJob
from commons.core.utils.FileUtils import FileUtils

#TODO: to remove... => replace all RepetJob() by TableJobAdaptator()...
class Test_RepetJob( unittest.TestCase ):
    
    def setUp(self):
        self._jobTableName = "dummyJobTable"
        self._db = DbMySql()
        self._iRepetJob = RepetJob()
    
    def tearDown(self):
        self._iRepetJob = None
        self._db.close()
        
    def _createJobInstance(self):
        return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
    
    def test_createJobTable_is_table_created(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
    
        isTableCreated = self._db.doesTableExist(self._jobTableName)
        self.assertTrue(isTableCreated)
    
        self._db.dropTable(self._jobTableName)
    
    def test_createJobTable_field_list(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")

        obsLFiled = self._db.getFieldList(self._jobTableName)
        expLField = ["jobid", "jobname", "groupid", "command", "launcher", "queue", "status", "time", "node"]
    
        self.assertEquals(expLField, obsLFiled)
    
        self._db.dropTable(self._jobTableName)
    
    def test_recordJob(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        self._iRepetJob.recordJob(iJob)
    
        qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = %s" 
        params = (iJob.jobid)
        
        self._db.execute(qryParams, params)
        
        tObs = self._db.fetchall()[0]
        tExp =(iJob.jobid, iJob.groupid, iJob.command, iJob.launcher, iJob.queue, "waiting", "?")
        
        self.assertEquals(tExp,tObs)

        self._db.dropTable(self._jobTableName)
    
    def test_removeJob(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        self._iRepetJob.recordJob(iJob)

        self._iRepetJob.removeJob(iJob)
        
        isTableEmpty = self._db.isEmpty(self._jobTableName)
        
        self.assertTrue(isTableEmpty)
        
        self._db.dropTable(self._jobTableName)
        
    def test_getJobStatus(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        self._iRepetJob.recordJob(iJob)

        expStatus = "waiting"
        obsStatus = self._iRepetJob.getJobStatus(iJob)
        
        self.assertEquals(expStatus, obsStatus)
        self._db.dropTable(self._jobTableName)
    
    def test_getJobStatus_unknown(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()        

        expStatus = "unknown"
        obsStatus = self._iRepetJob.getJobStatus(iJob)
        
        self.assertEquals(expStatus, obsStatus)
        self._db.dropTable(self._jobTableName)
    
    def test_getJobStatus_no_name(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) 
        
        expStatus = "unknown"
        obsStatus = self._iRepetJob.getJobStatus(iJob)
        
        self.assertEquals(expStatus, obsStatus)
        self._db.dropTable(self._jobTableName)
        
    def test_getJobStatus_non_unique_job(self):
        # Warning : this case will not append, because recordJob() begin by removeJob()
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
        sqlCmd += " VALUES ("
        sqlCmd += " \"%s\"," % ( iJob.jobid )
        sqlCmd += " \"%s\"," % ( iJob.jobname )
        sqlCmd += " \"%s\"," % ( iJob.groupid )
        sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
        sqlCmd += " \"%s\"," % ( iJob.launcher )
        sqlCmd += " \"%s\"," % ( iJob.queue )
        sqlCmd += " \"waiting\","
        sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
        sqlCmd += " \"?\" );"
        self._db.execute( sqlCmd )
        self._db.execute( sqlCmd )
        
        expError = "expError.txt"
        expErrorHandler = open(expError, "w")
        expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
        expErrorHandler.close()
        
        obsError = "obsError.txt"
        obsErrorHandler = open(obsError, "w")
        stderrRef = sys.stderr
        sys.stderr = obsErrorHandler
        
        isSysExitRaised = False
        try:
            self._iRepetJob.getJobStatus(iJob)
        except SystemExit:
            isSysExitRaised = True
           
        obsErrorHandler.close()
        
        self.assertTrue(isSysExitRaised)
        self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
        
        sys.stderr = stderrRef
        os.remove(obsError)
        os.remove(expError)
         
        self._db.dropTable(self._jobTableName)
        
    def test_updateInfoTable(self):
        self._iRepetJob.updateInfoTable(self._jobTableName, "dummyInfo")
        
        qryParams = "SELECT name, file FROM  info_tables WHERE name=%s AND file=%s"
        params = (self._jobTableName, "dummyInfo")
        
        self._db.execute(qryParams, params)
        tObs = self._db.fetchall()[0]
        tExp = (self._jobTableName, "dummyInfo")
        
        self.assertEquals(tExp, tObs)
        
        self._db.dropTable(self._jobTableName)
        
    def test_changeJobStatus(self):
        expStatus = "finished"
        
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        self._iRepetJob.recordJob(iJob)
        self._iRepetJob.changeJobStatus(iJob, expStatus, "method")
        
        qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" 
        params = (iJob.jobid, iJob.groupid, iJob.queue)
        self._db.execute(qryParams, params)

        obsStatus = self._db.fetchall()[0][0]
        self.assertEquals(expStatus, obsStatus)
        self._iRepetJob.removeJob(iJob)
        self._db.dropTable(self._jobTableName)

    def test_getCountStatus(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        
        iJob1 = self._createJobInstance()
        iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
        
        self._iRepetJob.recordJob(iJob1)
        self._iRepetJob.recordJob(iJob2)

        expCount = 2
        obsCount = self._iRepetJob.getCountStatus(self._jobTableName, iJob1.groupid, "waiting")
        
        self.assertEquals(expCount, obsCount)
        
    def test_getCountStatus_without_res(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        expCount = 0
        
        obsCount = self._iRepetJob.getCountStatus(self._jobTableName, "groupid", "waiting")
        
        self.assertEquals(expCount, obsCount)
   
    def test_cleanJobGroup(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob1 = self._createJobInstance()
        iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
        iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
        
        self._iRepetJob.recordJob(iJob1)
        self._iRepetJob.recordJob(iJob2)
        self._iRepetJob.recordJob(iJob3)
        
        self._iRepetJob.cleanJobGroup(self._jobTableName, iJob1.groupid)
        
        qryParams = "SELECT count(*) FROM " + self._jobTableName  
        
        self._db.execute(qryParams)
        
        expCount = 1
        obsCount = self._db.fetchall()[0][0]
        
        self.assertEquals(expCount, obsCount)
        
        self._iRepetJob.removeJob(iJob3)
        self._db.dropTable(self._jobTableName)

    def test_hasUnfinishedJob(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob1 = self._createJobInstance()
        iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
        iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
        
        self._iRepetJob.recordJob(iJob1)
        self._iRepetJob.recordJob(iJob2)
        self._iRepetJob.recordJob(iJob3)
        
        self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
        
        expHasGrpIdFinished = True
        obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
        
        self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
        
        self._iRepetJob.removeJob(iJob1)
        self._iRepetJob.removeJob(iJob2)
        self._iRepetJob.removeJob(iJob3)
        self._db.dropTable(self._jobTableName)
        
    def test_hasUnfinishedJob_JobTableNotExists(self):
        iJob1 = self._createJobInstance()
        
        expHasGrpIdFinished = False
        obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
        
        self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
        
    def test_hasUnfinishedJob_AllJobsFinished(self): 
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob1 = self._createJobInstance()
        iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
        iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
        
        self._iRepetJob.recordJob(iJob1)
        self._iRepetJob.recordJob(iJob2)
        self._iRepetJob.recordJob(iJob3)
        
        self._iRepetJob.changeJobStatus(iJob1, "finished", "method")
        self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
        
        expHasGrpIdFinished = False
        obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
        
        self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
        
        self._iRepetJob.removeJob(iJob1)
        self._iRepetJob.removeJob(iJob2)
        self._iRepetJob.removeJob(iJob3)
        self._db.dropTable(self._jobTableName)
        
    def test_waitJobGroup_with_finished_job(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        self._iRepetJob.recordJob(iJob)
        self._iRepetJob.changeJobStatus(iJob, "finished", "method")
        
        Obs = self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0)
        Exp = None
        
        self.assertEquals(Exp, Obs)
        self._iRepetJob.removeJob(iJob) 
        
    def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
        Obs = False
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        self._iRepetJob.recordJob(iJob)
        self._iRepetJob.changeJobStatus(iJob, "error", "method")
        
        try:
            self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 0)
        except SystemExit:
            Obs = True
        
        self.assertTrue(Obs)
        self._iRepetJob.removeJob(iJob)
        
    def test_setJobIdFromSge(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob = self._createJobInstance()
        self._iRepetJob.recordJob(iJob)
        self._iRepetJob.setJobIdFromSge(iJob, 1000)
        
        qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" 
        params = (iJob.jobname, iJob.queue, iJob.groupid)
        
        self._db.execute(qryParams, params)
        
        tObs = self._db.fetchall()[0]
        tExp =(1000,)
        
        self.assertEquals(tExp,tObs)
        
        self._db.dropTable(self._jobTableName)
        
    def test_submitJob_8_fields_for_job_table(self):
        iJob = self._createJobInstance()
        self._db.dropTable(self._jobTableName)
        sqlCmd = "CREATE TABLE " + self._jobTableName 
        sqlCmd += " ( jobid INT UNSIGNED"
        sqlCmd += ", groupid VARCHAR(255)"
        sqlCmd += ", command TEXT"
        sqlCmd += ", launcher VARCHAR(1024)"
        sqlCmd += ", queue VARCHAR(255)"
        sqlCmd += ", status VARCHAR(255)"
        sqlCmd += ", time DATETIME"
        sqlCmd += ", node VARCHAR(255) )"
        self._db.execute(sqlCmd)
        
        self._iRepetJob.submitJob(iJob)
        
        expFieldsNb = 9
        obsFieldsNb = len(self._iRepetJob.getFieldList(self._jobTableName))
        
        self.assertEquals(expFieldsNb, obsFieldsNb)
        
        self._db.dropTable(self._jobTableName)
        
    def test_getNodesListByGroupId(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
        iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
        iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" )
        
        self._insertJob(iJob1)
        self._insertJob(iJob2)
        self._insertJob(iJob3)
        
        expNodeList = ["node1", "node2"]
        obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid")
        self.assertEquals(expNodeList, obsNodeList)
        
        self._db.dropTable(self._jobTableName)
        
    def test_getNodesListByGroupId_empty_list(self):
        self._iRepetJob.createTable(self._jobTableName, "jobs")
        iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
        iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
        iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" )
        
        self._insertJob(iJob1)
        self._insertJob(iJob2)
        self._insertJob(iJob3)
        
        expNodeList = []
        obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid3")
        self.assertEquals(expNodeList, obsNodeList)
        
        self._db.dropTable(self._jobTableName)
        
    def _insertJob(self, iJob):
        self._iRepetJob.removeJob( iJob )
        sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
        sqlCmd += " VALUES ("
        sqlCmd += " \"%s\"," % ( iJob.jobid )
        sqlCmd += " \"%s\"," % ( iJob.jobname )
        sqlCmd += " \"%s\"," % ( iJob.groupid )
        sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
        sqlCmd += " \"%s\"," % ( iJob.launcher )
        sqlCmd += " \"%s\"," % ( iJob.queue )
        sqlCmd += " \"waiting\","
        sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
        sqlCmd += " \"%s\" );" % ( iJob.node )
        self._iRepetJob.execute( sqlCmd )
        
if __name__ == "__main__":
    unittest.main()