Mercurial > repos > yufei-luo > s_mart
diff commons/core/sql/test/Test_TableJobAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/sql/test/Test_TableJobAdaptator.py Fri Jan 18 04:54:14 2013 -0500 @@ -0,0 +1,640 @@ +import unittest +import sys +import os +import time +#import stat +#import threading +from commons.core.sql.DbMySql import DbMySql +#from commons.core.sql.DbSQLite import DbSQLite +from commons.core.sql.Job import Job +from commons.core.utils.FileUtils import FileUtils +from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory + +#class Test_TableJobAdaptator_SQLite( unittest.TestCase ): +# +# def setUp(self): +# self._jobTableName = "dummyJobTable" +# self._dbName = "test.db" +# self._db = DbSQLite(self._dbName) +# self._iTJA = TableJobAdaptator(self._db, self._jobTableName) +# if not self._db.doesTableExist(self._jobTableName): +# self._db.createJobTable(self._jobTableName) +# self._iJob = self._createJobInstance() +# +# def tearDown(self): +# self._iTJA = None +# self._db.close() +## self._db.delete() +# +## def test_recordJob(self): +## self._iTJA.recordJob(self._iJob) +## qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = ?" +## params = (self._iJob.jobid,) +## self._db.execute(qryParams, params) +## tObs = self._db.fetchall()[0] +## tExp =(self._iJob.jobid, self._iJob.groupid, self._iJob.command, self._iJob.launcher, self._iJob.queue, "waiting", "?") +## self.assertEquals(tExp,tObs) +## +## def test_removeJob(self): +## self._iTJA.recordJob(self._iJob) +## self._iTJA.removeJob(self._iJob) +## self.assertTrue(self._db.isEmpty(self._jobTableName)) +## +## def test_getJobStatus(self): +## self._iTJA.recordJob(self._iJob) +## expStatus = "waiting" +## obsStatus = self._iTJA.getJobStatus(self._iJob) +## self.assertEquals(expStatus, obsStatus) +## +## def test_getJobStatus_no_job(self): +## expStatus = "unknown" +## obsStatus = self._iTJA.getJobStatus(self._iJob) +## self.assertEquals(expStatus, obsStatus) +## +## def test_getJobStatus_no_name(self): +## iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) +## expStatus = "unknown" +## obsStatus = self._iTJA.getJobStatus(iJob) +## self.assertEquals(expStatus, obsStatus) +## +## def test_getJobStatus_two_jobs(self): +## # Warning : this case will not append, because recordJob() begin by removeJob() +## sqlCmd = "INSERT INTO %s" % self._iJob.tablename +## sqlCmd += " VALUES (" +## sqlCmd += " \"%s\"," % self._iJob.jobid +## sqlCmd += " \"%s\"," % self._iJob.jobname +## sqlCmd += " \"%s\"," % self._iJob.groupid +## sqlCmd += " \"%s\"," % self._iJob.command.replace("\"","\'") +## sqlCmd += " \"%s\"," % self._iJob.launcher +## sqlCmd += " \"%s\"," % self._iJob.queue +## sqlCmd += " \"waiting\"," +## sqlCmd += " \"%s\"," % time.strftime( "%Y-%m-%d %H:%M:%S" ) +## sqlCmd += " \"?\" );" +## self._db.execute(sqlCmd) +## self._db.execute(sqlCmd) +## +## expError = "expError.txt" +## expErrorHandler = open(expError, "w") +## expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") +## expErrorHandler.close() +## obsError = "obsError.txt" +## obsErrorHandler = open(obsError, "w") +## stderrRef = sys.stderr +## sys.stderr = obsErrorHandler +## +## isSysExitRaised = False +## try: +## self._iTJA.getJobStatus(self._iJob) +## except SystemExit: +## isSysExitRaised = True +## +## obsErrorHandler.close() +## +## self.assertTrue(isSysExitRaised) +## self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) +## sys.stderr = stderrRef +## os.remove(obsError) +## os.remove(expError) +## +## def test_changeJobStatus(self): +## expStatus = "finished" +## self._iTJA.recordJob(self._iJob) +## self._iTJA.changeJobStatus(self._iJob, expStatus) +## qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =? AND groupid=? AND queue=?" +## params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue) +## self._db.execute(qryParams, params) +## obsStatus = self._db.fetchall()[0][0] +## self.assertEquals(expStatus, obsStatus) +## self._iTJA.removeJob(self._iJob) +## +## def test_getCountStatus(self): +## iJob1 = self._createJobInstance() +## iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") +## self._iTJA.recordJob(iJob1) +## self._iTJA.recordJob(iJob2) +## expCount = 2 +## obsCount = self._iTJA.getCountStatus(self._jobTableName, iJob1.groupid, "waiting") +## self.assertEquals(expCount, obsCount) +## +## def test_getCountStatus_without_res(self): +## expCount = 0 +## obsCount = self._iTJA.getCountStatus(self._jobTableName, "groupid", "waiting") +## self.assertEquals(expCount, obsCount) +## +## def test_cleanJobGroup(self): +## iJob1 = self._createJobInstance() +## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") +## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") +## self._iTJA.recordJob(iJob1) +## self._iTJA.recordJob(iJob2) +## self._iTJA.recordJob(iJob3) +## self._iTJA.cleanJobGroup(self._jobTableName, iJob1.groupid) +## qryParams = "SELECT count(*) FROM " + self._jobTableName +## self._db.execute(qryParams) +## expCount = 1 +## obsCount = self._db.fetchall()[0][0] +## self.assertEquals(expCount, obsCount) +## +## def test_hasUnfinishedJob_one_waiting_one_finished(self): +## iJob1 = self._createJobInstance() +## iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") +## iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") +## self._iTJA.recordJob(iJob1) +## self._iTJA.recordJob(iJob2) +## self._iTJA.recordJob(iJob3) +## self._iTJA.changeJobStatus(iJob2, "finished") +## expHasGrpIdFinished = True +## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid) +## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) +## +## def test_hasUnfinishedJob_jobTable_doesnt_exist(self): +## self._db.dropTable(self._jobTableName) +## expHasGrpIdFinished = False +## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, self._iJob.groupid) +## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) +## +## def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self): +## iJob1 = self._createJobInstance() +## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") +## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") +## self._iTJA.recordJob(iJob1) +## self._iTJA.recordJob(iJob2) +## self._iTJA.recordJob(iJob3) +## self._iTJA.changeJobStatus(iJob1, "finished") +## self._iTJA.changeJobStatus(iJob2, "finished") +## expHasGrpIdFinished = False +## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid) +## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) +## +## def test_waitJobGroup_with_finished_job(self): +## obs = False +## self._iTJA.recordJob(self._iJob) +## self._iTJA.changeJobStatus(self._iJob, "finished") +## try: +## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0) +## except SystemExit: +## obs = True +## self.assertFalse(obs) +## +## def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): +## obs = False +## self._iTJA.recordJob(self._iJob) +## self._iTJA.changeJobStatus(self._iJob, "error") +## try: +## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0) +## except SystemExit: +## obs = True +## self.assertTrue(obs) +## +## def test_setJobIdFromSge(self): +## self._iTJA.recordJob(self._iJob) +## self._iTJA.setJobIdFromSge(self._iJob, 1000) +## qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = ? AND queue = ? AND groupid = ?" +## params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid) +## self._db.execute(qryParams, params) +## tObs = self._db.fetchall()[0] +## tExp =(1000,) +## self.assertEquals(tExp,tObs) +## +## def test_submitJob_8_fields_for_job_table(self): +## self._db.dropTable(self._jobTableName) +## sqlCmd = "CREATE TABLE " + self._jobTableName +## sqlCmd += " ( jobid INT UNSIGNED" +## sqlCmd += ", groupid VARCHAR(255)" +## sqlCmd += ", command TEXT" +## sqlCmd += ", launcher VARCHAR(1024)" +## sqlCmd += ", queue VARCHAR(255)" +## sqlCmd += ", status VARCHAR(255)" +## sqlCmd += ", time DATETIME" +## sqlCmd += ", node VARCHAR(255) )" +## self._db.execute(sqlCmd) +## self._iTJA.submitJob(self._iJob) +## expFieldsNb = 9 +## obsFieldsNb = len(self._db.getFieldList(self._jobTableName)) +## self.assertEquals(expFieldsNb, obsFieldsNb) +## os.remove("jobid.stdout") +## +## def test_getNodesListByGroupId(self): +## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) +## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) +## iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" ) +## self._insertJob(iJob1) +## self._insertJob(iJob2) +## self._insertJob(iJob3) +## expNodeList = ["node1", "node2"] +## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid") +## self.assertEquals(expNodeList, obsNodeList) +## +## def test_getNodesListByGroupId_empty_list(self): +## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) +## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) +## iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" ) +## self._insertJob(iJob1) +## self._insertJob(iJob2) +## self._insertJob(iJob3) +## expNodeList = [] +## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid3") +## self.assertEquals(expNodeList, obsNodeList) +## +## def test_commitJob(self): +## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) +## self._insertJob(iJob1) +## +## expJobStatus = "waiting" +## obsJobStatus = self._iTJA.getJobStatus(self._iJob) +## self.assertEquals(expJobStatus, obsJobStatus) +## expJobStatus = "waiting" +## obsJobStatus = self._iTJA.getJobStatus(self._iJob) +## self.assertEquals(expJobStatus, obsJobStatus) +## self._db.close() +## +## self._db = DbSQLite(self._dbName) +## self._iTJA = TableJobAdaptator(self._db, self._jobTableName) +## expJobStatus = "waiting" +## obsJobStatus = self._iTJA.getJobStatus(self._iJob) +## self.assertEquals(expJobStatus, obsJobStatus) +## +## def _insertJob(self, iJob): +## self._iTJA = TableJobAdaptator(self._db, self._jobTableName) +## self._iTJA.removeJob( iJob ) +## sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) +## sqlCmd += " VALUES (" +## sqlCmd += " \"%s\"," % ( iJob.jobid ) +## sqlCmd += " \"%s\"," % ( iJob.jobname ) +## sqlCmd += " \"%s\"," % ( iJob.groupid ) +## sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) +## sqlCmd += " \"%s\"," % ( iJob.launcher ) +## sqlCmd += " \"%s\"," % ( iJob.queue ) +## sqlCmd += " \"waiting\"," +## sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) +## sqlCmd += " \"%s\" );" % ( iJob.node ) +## self._db.execute( sqlCmd ) +# +## def testRecordJob_in_parallel_with_2_thread(self) : +## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py") +## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py") +## +## db1 = DbSQLite('threadJobTable.db') +## db1.createJobTable(self._jobTableName) +## +## db2 = DbSQLite(self._dbName) +## +## iTJA1 = TableJobAdaptator(db1, self._jobTableName) +## iTJA2 = TableJobAdaptator(db2, self._jobTableName) +## +## iRJT1 = RecordJobThread(iTJA1, job1) +## iRJT2 = RecordJobThread(iTJA2, job2) +## iRJT1.start() +## iRJT2.start() +## +## while iRJT1.isAlive() or iRJT2.isAlive(): +## time.sleep(5) +## +## expJobStatus = "waiting" +## obsJobStatus1 = iTJA1.getJobStatus(job1) +## obsJobStatus2 = iTJA2.getJobStatus(job2) +## +## self.assertEquals(expJobStatus, obsJobStatus1) +## self.assertEquals(expJobStatus, obsJobStatus2) +## db1.db.close() +## db1.delete() +## +# +# def test_ThreadRecordJob_sqlite3_connection_object_different_instances(self): +# +## for i in range(1, 11): +## job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i) +## db1 = DbSQLite(self._dbName) +## iTJA1 = TableJobAdaptator(db1, self._jobTableName) +## iRJT1 = RecordJobThread(iTJA1, job) +# +# #self._db.createJobTable(self._jobTableName) +# +# for i in range(1, 30) : +# job = "job%s"% i +# db = "db%s"%i +# job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i) +# db = DbSQLite(self._dbName) +# if i == 1 : +# db.createJobTable(self._jobTableName) +# iTJA = TableJobAdaptator(db, self._jobTableName) +# iRJT = RecordJobThread(iTJA, job) +# iRJT.start() +# +# #while iRJT.isAlive() : +# #time.sleep(1) +# +## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py") +## self._createLauncherFile(job1) +## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py") +## self._createLauncherFile(job2) +## +## db1 = DbSQLite(self._dbName) +## db2 = DbSQLite(self._dbName) +## +## iTJA1 = TableJobAdaptator(db1, self._jobTableName) +## iTJA2 = TableJobAdaptator(db2, self._jobTableName) +## +## +## iRJT1 = RecordJobThread(iTJA1, job1) +## iRJT2 = RecordJobThread(iTJA2, job2) +## +## iRJT1.start() +## iRJT2.start() +## +## while iRJT1.isAlive() or iRJT2.isAlive(): +## time.sleep(5) +# +# +## self.assertNotEquals(iRJT1._iTableJobAdaptator._iDb.db, iRJT2._iTableJobAdaptator._iDb.db) +# +# +# def _createLauncherFile(self, iJob): +# jobFileHandler = open(iJob.launcher , "w") +## self.cdir +## self.job +# cDir = os.getcwd() +# +# launcher = "#!/usr/bin/python\n" +# launcher += "import os\n" +# launcher += "import sys\n" +# +# launcher += "print \"system:\", os.uname()\n" +# launcher += "sys.stdout.flush()\n" +# +# newStatus = "running" +# launcher += "from commons.core.sql.Job import Job\n" +# launcher += "from commons.core.sql.DbSQLite import DbSQLite\n" +# launcher += "from commons.core.sql.TableJobAdaptator import TableJobAdaptator\n" +# launcher += "iJob = Job('%s', %s, '%s', '%s')\n" % (iJob.tablename, iJob.jobid, iJob.jobname, iJob.groupid) +# launcher += "iDb = DbSQLite('%s/%s')\n" % (cDir, self._dbName) +# launcher += "iTJA = TableJobAdaptator(iDb, '%s')\n" % self._jobTableName +# launcher += "if not iDb.doesTableExist('%s'):\n" % (iJob.tablename) +# launcher += "\tiDb.createJobTable('%s')\n" % self._jobTableName +# +# launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus +# +# launcher += "print \"LAUNCH: " + iJob.command + "\"\n" +# launcher += "sys.stdout.flush()\n" +# launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n" +# launcher += "if exitStatus != 0:\n" +# launcher += "\tprint \"ERROR: " + iJob.command + " returned exit status '%i'\" % ( exitStatus )\n" +# +# newStatus = "finished" +# launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus +# launcher += "iDb.close()\n" +# +# launcher += "sys.exit(0)\n" +# jobFileHandler.write(launcher) +# jobFileHandler.close() +# os.chmod(iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) +# +# def _createJobInstance(self): +# return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) + + +class Test_TableJobAdaptator_MySQL( unittest.TestCase ): + + def setUp(self): + self._jobTableName = "dummyJobTable" + self._db = DbMySql() + self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) + self._db.createTable(self._jobTableName, "jobs", overwrite = True) + self._iJob = self._createJobInstance() + + def tearDown(self): + self._db.dropTable(self._jobTableName) + self._iTJA = None + self._db.close() + + def test_recordJob(self): + self._iTJA.recordJob(self._iJob) + qryParams = "SELECT jobid, jobname, groupid, launcher, queue, resources, status, node FROM " + self._jobTableName + " WHERE jobid = %s" + params = (self._iJob.jobid) + self._db.execute(qryParams, params) + tObs = self._db.fetchall()[0] + tExp =(self._iJob.jobid, self._iJob.jobname, self._iJob.groupid, self._iJob.launcher, self._iJob.queue, "['mem_free=10M']", "waiting", "?") + self.assertEquals(tExp,tObs) + + def test_removeJob(self): + self._iTJA.recordJob(self._iJob) + self._iTJA.removeJob(self._iJob) + isTableEmpty = self._db.isEmpty(self._jobTableName) + self.assertTrue(isTableEmpty) + + def test_getJobStatus(self): + self._iTJA.recordJob(self._iJob) + expStatus = "waiting" + obsStatus = self._iTJA.getJobStatus(self._iJob) + self.assertEquals(expStatus, obsStatus) + + def test_getJobStatus_no_job(self): + expStatus = "unknown" + obsStatus = self._iTJA.getJobStatus(self._iJob) + self.assertEquals(expStatus, obsStatus) + + def test_getJobStatus_no_name(self): + iJob = Job(self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources") + expStatus = "unknown" + obsStatus = self._iTJA.getJobStatus(iJob) + self.assertEquals(expStatus, obsStatus) + + def test_getJobStatus_two_jobs(self): + # Warning : this case will not append, because recordJob() begin by removeJob() + sqlCmd = "INSERT INTO %s" % self._jobTableName + sqlCmd += " VALUES (" + sqlCmd += " \"%s\"," % self._iJob.jobid + sqlCmd += " \"%s\"," % self._iJob.jobname + sqlCmd += " \"%s\"," % self._iJob.groupid + sqlCmd += " \"%s\"," % self._iJob.launcher + sqlCmd += " \"%s\"," % self._iJob.queue + sqlCmd += " \"%s\"," % self._iJob.lResources + sqlCmd += " \"waiting\"," + sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") + sqlCmd += " \"?\" );" + self._db.execute(sqlCmd) + self._db.execute(sqlCmd) + + expError = "expError.txt" + expErrorHandler = open(expError, "w") + expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") + expErrorHandler.close() + obsError = "obsError.txt" + obsErrorHandler = open(obsError, "w") + stderrRef = sys.stderr + sys.stderr = obsErrorHandler + + isSysExitRaised = False + try: + self._iTJA.getJobStatus(self._iJob) + except SystemExit: + isSysExitRaised = True + obsErrorHandler.close() + self.assertTrue(isSysExitRaised) + self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) + sys.stderr = stderrRef + os.remove(obsError) + os.remove(expError) + + def test_changeJobStatus(self): + expStatus = "finished" + self._iTJA.recordJob(self._iJob) + self._iTJA.changeJobStatus(self._iJob, expStatus) + qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" + params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue) + self._db.execute(qryParams, params) + obsStatus = self._db.fetchall()[0][0] + self.assertEquals(expStatus, obsStatus) + + def test_getCountStatus(self): + iJob1 = self._createJobInstance() + iJob2 = Job(1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") + self._iTJA.recordJob(iJob1) + self._iTJA.recordJob(iJob2) + expCount = 2 + obsCount = self._iTJA.getCountStatus(iJob1.groupid, "waiting") + self.assertEquals(expCount, obsCount) + + def test_getCountStatus_without_res(self): + expCount = 0 + obsCount = self._iTJA.getCountStatus("groupid", "waiting") + self.assertEquals(expCount, obsCount) + + def test_cleanJobGroup(self): + iJob1 = self._createJobInstance() + iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") + iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") + self._iTJA.recordJob(iJob1) + self._iTJA.recordJob(iJob2) + self._iTJA.recordJob(iJob3) + self._iTJA.cleanJobGroup(iJob1.groupid) + qryParams = "SELECT count(*) FROM %s" % self._jobTableName + self._db.execute(qryParams) + expCount = 1 + obsCount = self._db.fetchall()[0][0] + self.assertEquals(expCount, obsCount) + + def test_hasUnfinishedJob_one_waiting_one_finished(self): + iJob1 = self._createJobInstance() + iJob2 = Job(0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") + iJob3 = Job(0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") + self._iTJA.recordJob(iJob1) + self._iTJA.recordJob(iJob2) + self._iTJA.recordJob(iJob3) + self._iTJA.changeJobStatus(iJob2, "finished") + expHasGrpIdFinished = True + obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid) + self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) + + def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self): + iJob1 = self._createJobInstance() + iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") + iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") + self._iTJA.recordJob(iJob1) + self._iTJA.recordJob(iJob2) + self._iTJA.recordJob(iJob3) + self._iTJA.changeJobStatus(iJob1, "finished") + self._iTJA.changeJobStatus(iJob2, "finished") + expHasGrpIdFinished = False + obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid) + self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) + + def test_waitJobGroup_with_finished_job(self): + obs = False + self._iTJA.recordJob(self._iJob) + self._iTJA.changeJobStatus(self._iJob, "finished") + try: + self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0) + except SystemExit: + obs = True + self.assertFalse(obs) + + def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): + obs = False + self._iTJA.recordJob(self._iJob) + self._iTJA.changeJobStatus(self._iJob, "error") + try: + self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0) + except SystemExit: + obs = True + self.assertTrue(obs) + + #TODO: how to test ?!? +# def test_waitJobGroup_with_error_relaunch(self): +# iJob = Job(0, "job1", "groupid", "queue.q", "command", "launcherFile", "node", ["mem_free=10M", "test=TRUE"]) +# obs = False +# self._iTJA.recordJob(iJob) +# self._iTJA.changeJobStatus(iJob, "error") +# try: +# self._iTJA.waitJobGroup(iJob.groupid) +# except SystemExit: +# obs = True +# self.assertTrue(obs) + + def test_updateJobIdInDB(self): + self._iTJA.recordJob(self._iJob) + self._iTJA.updateJobIdInDB(self._iJob, 1000) + qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" + params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid) + self._db.execute(qryParams, params) + tObs = self._db.fetchall()[0] + tExp =(1000,) + self.assertEquals(tExp,tObs) + + def test_getNodesListByGroupId(self): + iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources") + iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources") + iJob3 = Job(2, "job3", "groupid", "queue", "command", "launcherFile", "node2", "lResources") + iJob4 = Job(3, "job4", "groupid2", "queue", "command", "launcherFile", "node3", "lResources") + self._insertJob(iJob1) + self._insertJob(iJob2) + self._insertJob(iJob3) + self._insertJob(iJob4) + expNodeList = ["node1", "node2"] + obsNodeList = self._iTJA.getNodesListByGroupId("groupid") + self.assertEquals(expNodeList, obsNodeList) + + def test_getNodesListByGroupId_empty_list(self): + iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources") + iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources") + iJob3 = Job(2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources") + self._insertJob(iJob1) + self._insertJob(iJob2) + self._insertJob(iJob3) + expNodeList = [] + obsNodeList = self._iTJA.getNodesListByGroupId("groupid3") + self.assertEquals(expNodeList, obsNodeList) + +# TODO test TableJobAdaptator._createJobInstance TableJobAdaptator._createLauncherFile + def _insertJob(self, iJob): + self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) + self._iTJA.removeJob(iJob) + sqlCmd = "INSERT INTO %s" % self._jobTableName + sqlCmd += " VALUES (" + sqlCmd += " \"%s\"," % iJob.jobid + sqlCmd += " \"%s\"," % iJob.jobname + sqlCmd += " \"%s\"," % iJob.groupid + sqlCmd += " \"%s\"," % iJob.launcher + sqlCmd += " \"%s\"," % iJob.queue + sqlCmd += " \"%s\"," % iJob.lResources + sqlCmd += " \"waiting\"," + sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") + sqlCmd += " \"%s\" );" % iJob.node + self._db.execute(sqlCmd) + + def _createJobInstance(self): + return Job(0, "job1", "groupid", "", "command", "launcherFile", "node", ["mem_free=10M"]) + +#class RecordJobThread(threading.Thread): +# +# def __init__(self, iTableJobAdaptator, iJob): +# threading.Thread.__init__(self) +# self._iTableJobAdaptator = iTableJobAdaptator +# self._iJob = iJob +# +# def run(self): +# self._iTableJobAdaptator.recordJob(self._iJob) +# #self._iTableJobAdaptator.submitJob(self._iJob) + +if __name__ == "__main__": + unittest.main()