Mercurial > repos > yufei-luo > s_mart
view commons/core/sql/test/Tst_RepetJob.py @ 13:03045debed6e
Uploaded
author | m-zytnicki |
---|---|
date | Wed, 17 Apr 2013 10:39:35 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
import unittest import sys import os import time from commons.core.sql.DbMySql import DbMySql from commons.core.sql.Job import Job from commons.core.sql.RepetJob import RepetJob from commons.core.utils.FileUtils import FileUtils #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()... class Test_RepetJob( unittest.TestCase ): def setUp(self): self._jobTableName = "dummyJobTable" self._db = DbMySql() self._iRepetJob = RepetJob() def tearDown(self): self._iRepetJob = None self._db.close() def _createJobInstance(self): return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) def test_createJobTable_is_table_created(self): self._iRepetJob.createTable(self._jobTableName, "jobs") isTableCreated = self._db.doesTableExist(self._jobTableName) self.assertTrue(isTableCreated) self._db.dropTable(self._jobTableName) def test_createJobTable_field_list(self): self._iRepetJob.createTable(self._jobTableName, "jobs") obsLFiled = self._db.getFieldList(self._jobTableName) expLField = ["jobid", "jobname", "groupid", "command", "launcher", "queue", "status", "time", "node"] self.assertEquals(expLField, obsLFiled) self._db.dropTable(self._jobTableName) def test_recordJob(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() self._iRepetJob.recordJob(iJob) qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = %s" params = (iJob.jobid) self._db.execute(qryParams, params) tObs = self._db.fetchall()[0] tExp =(iJob.jobid, iJob.groupid, iJob.command, iJob.launcher, iJob.queue, "waiting", "?") self.assertEquals(tExp,tObs) self._db.dropTable(self._jobTableName) def test_removeJob(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() self._iRepetJob.recordJob(iJob) self._iRepetJob.removeJob(iJob) isTableEmpty = self._db.isEmpty(self._jobTableName) self.assertTrue(isTableEmpty) self._db.dropTable(self._jobTableName) def test_getJobStatus(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() self._iRepetJob.recordJob(iJob) expStatus = "waiting" obsStatus = self._iRepetJob.getJobStatus(iJob) self.assertEquals(expStatus, obsStatus) self._db.dropTable(self._jobTableName) def test_getJobStatus_unknown(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() expStatus = "unknown" obsStatus = self._iRepetJob.getJobStatus(iJob) self.assertEquals(expStatus, obsStatus) self._db.dropTable(self._jobTableName) def test_getJobStatus_no_name(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) expStatus = "unknown" obsStatus = self._iRepetJob.getJobStatus(iJob) self.assertEquals(expStatus, obsStatus) self._db.dropTable(self._jobTableName) def test_getJobStatus_non_unique_job(self): # Warning : this case will not append, because recordJob() begin by removeJob() self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) sqlCmd += " VALUES (" sqlCmd += " \"%s\"," % ( iJob.jobid ) sqlCmd += " \"%s\"," % ( iJob.jobname ) sqlCmd += " \"%s\"," % ( iJob.groupid ) sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) sqlCmd += " \"%s\"," % ( iJob.launcher ) sqlCmd += " \"%s\"," % ( iJob.queue ) sqlCmd += " \"waiting\"," sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) sqlCmd += " \"?\" );" self._db.execute( sqlCmd ) self._db.execute( sqlCmd ) expError = "expError.txt" expErrorHandler = open(expError, "w") expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") expErrorHandler.close() obsError = "obsError.txt" obsErrorHandler = open(obsError, "w") stderrRef = sys.stderr sys.stderr = obsErrorHandler isSysExitRaised = False try: self._iRepetJob.getJobStatus(iJob) except SystemExit: isSysExitRaised = True obsErrorHandler.close() self.assertTrue(isSysExitRaised) self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) sys.stderr = stderrRef os.remove(obsError) os.remove(expError) self._db.dropTable(self._jobTableName) def test_updateInfoTable(self): self._iRepetJob.updateInfoTable(self._jobTableName, "dummyInfo") qryParams = "SELECT name, file FROM info_tables WHERE name=%s AND file=%s" params = (self._jobTableName, "dummyInfo") self._db.execute(qryParams, params) tObs = self._db.fetchall()[0] tExp = (self._jobTableName, "dummyInfo") self.assertEquals(tExp, tObs) self._db.dropTable(self._jobTableName) def test_changeJobStatus(self): expStatus = "finished" self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() self._iRepetJob.recordJob(iJob) self._iRepetJob.changeJobStatus(iJob, expStatus, "method") qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" params = (iJob.jobid, iJob.groupid, iJob.queue) self._db.execute(qryParams, params) obsStatus = self._db.fetchall()[0][0] self.assertEquals(expStatus, obsStatus) self._iRepetJob.removeJob(iJob) self._db.dropTable(self._jobTableName) def test_getCountStatus(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob1 = self._createJobInstance() iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iRepetJob.recordJob(iJob1) self._iRepetJob.recordJob(iJob2) expCount = 2 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, iJob1.groupid, "waiting") self.assertEquals(expCount, obsCount) def test_getCountStatus_without_res(self): self._iRepetJob.createTable(self._jobTableName, "jobs") expCount = 0 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, "groupid", "waiting") self.assertEquals(expCount, obsCount) def test_cleanJobGroup(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob1 = self._createJobInstance() iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iRepetJob.recordJob(iJob1) self._iRepetJob.recordJob(iJob2) self._iRepetJob.recordJob(iJob3) self._iRepetJob.cleanJobGroup(self._jobTableName, iJob1.groupid) qryParams = "SELECT count(*) FROM " + self._jobTableName self._db.execute(qryParams) expCount = 1 obsCount = self._db.fetchall()[0][0] self.assertEquals(expCount, obsCount) self._iRepetJob.removeJob(iJob3) self._db.dropTable(self._jobTableName) def test_hasUnfinishedJob(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob1 = self._createJobInstance() iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iRepetJob.recordJob(iJob1) self._iRepetJob.recordJob(iJob2) self._iRepetJob.recordJob(iJob3) self._iRepetJob.changeJobStatus(iJob2, "finished", "method") expHasGrpIdFinished = True obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) self._iRepetJob.removeJob(iJob1) self._iRepetJob.removeJob(iJob2) self._iRepetJob.removeJob(iJob3) self._db.dropTable(self._jobTableName) def test_hasUnfinishedJob_JobTableNotExists(self): iJob1 = self._createJobInstance() expHasGrpIdFinished = False obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) def test_hasUnfinishedJob_AllJobsFinished(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob1 = self._createJobInstance() iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iRepetJob.recordJob(iJob1) self._iRepetJob.recordJob(iJob2) self._iRepetJob.recordJob(iJob3) self._iRepetJob.changeJobStatus(iJob1, "finished", "method") self._iRepetJob.changeJobStatus(iJob2, "finished", "method") expHasGrpIdFinished = False obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) self._iRepetJob.removeJob(iJob1) self._iRepetJob.removeJob(iJob2) self._iRepetJob.removeJob(iJob3) self._db.dropTable(self._jobTableName) def test_waitJobGroup_with_finished_job(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() self._iRepetJob.recordJob(iJob) self._iRepetJob.changeJobStatus(iJob, "finished", "method") Obs = self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0) Exp = None self.assertEquals(Exp, Obs) self._iRepetJob.removeJob(iJob) def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): Obs = False self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() self._iRepetJob.recordJob(iJob) self._iRepetJob.changeJobStatus(iJob, "error", "method") try: self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 0) except SystemExit: Obs = True self.assertTrue(Obs) self._iRepetJob.removeJob(iJob) def test_setJobIdFromSge(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance() self._iRepetJob.recordJob(iJob) self._iRepetJob.setJobIdFromSge(iJob, 1000) qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" params = (iJob.jobname, iJob.queue, iJob.groupid) self._db.execute(qryParams, params) tObs = self._db.fetchall()[0] tExp =(1000,) self.assertEquals(tExp,tObs) self._db.dropTable(self._jobTableName) def test_submitJob_8_fields_for_job_table(self): iJob = self._createJobInstance() self._db.dropTable(self._jobTableName) sqlCmd = "CREATE TABLE " + self._jobTableName sqlCmd += " ( jobid INT UNSIGNED" sqlCmd += ", groupid VARCHAR(255)" sqlCmd += ", command TEXT" sqlCmd += ", launcher VARCHAR(1024)" sqlCmd += ", queue VARCHAR(255)" sqlCmd += ", status VARCHAR(255)" sqlCmd += ", time DATETIME" sqlCmd += ", node VARCHAR(255) )" self._db.execute(sqlCmd) self._iRepetJob.submitJob(iJob) expFieldsNb = 9 obsFieldsNb = len(self._iRepetJob.getFieldList(self._jobTableName)) self.assertEquals(expFieldsNb, obsFieldsNb) self._db.dropTable(self._jobTableName) def test_getNodesListByGroupId(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" ) self._insertJob(iJob1) self._insertJob(iJob2) self._insertJob(iJob3) expNodeList = ["node1", "node2"] obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid") self.assertEquals(expNodeList, obsNodeList) self._db.dropTable(self._jobTableName) def test_getNodesListByGroupId_empty_list(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" ) self._insertJob(iJob1) self._insertJob(iJob2) self._insertJob(iJob3) expNodeList = [] obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid3") self.assertEquals(expNodeList, obsNodeList) self._db.dropTable(self._jobTableName) def _insertJob(self, iJob): self._iRepetJob.removeJob( iJob ) sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) sqlCmd += " VALUES (" sqlCmd += " \"%s\"," % ( iJob.jobid ) sqlCmd += " \"%s\"," % ( iJob.jobname ) sqlCmd += " \"%s\"," % ( iJob.groupid ) sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) sqlCmd += " \"%s\"," % ( iJob.launcher ) sqlCmd += " \"%s\"," % ( iJob.queue ) sqlCmd += " \"waiting\"," sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) sqlCmd += " \"%s\" );" % ( iJob.node ) self._iRepetJob.execute( sqlCmd ) if __name__ == "__main__": unittest.main()