6
|
1 import unittest
|
|
2 import sys
|
|
3 import os
|
|
4 import time
|
|
5 #import stat
|
|
6 #import threading
|
|
7 from commons.core.sql.DbMySql import DbMySql
|
|
8 #from commons.core.sql.DbSQLite import DbSQLite
|
|
9 from commons.core.sql.Job import Job
|
|
10 from commons.core.utils.FileUtils import FileUtils
|
|
11 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
|
|
12
|
|
13 #class Test_TableJobAdaptator_SQLite( unittest.TestCase ):
|
|
14 #
|
|
15 # def setUp(self):
|
|
16 # self._jobTableName = "dummyJobTable"
|
|
17 # self._dbName = "test.db"
|
|
18 # self._db = DbSQLite(self._dbName)
|
|
19 # self._iTJA = TableJobAdaptator(self._db, self._jobTableName)
|
|
20 # if not self._db.doesTableExist(self._jobTableName):
|
|
21 # self._db.createJobTable(self._jobTableName)
|
|
22 # self._iJob = self._createJobInstance()
|
|
23 #
|
|
24 # def tearDown(self):
|
|
25 # self._iTJA = None
|
|
26 # self._db.close()
|
|
27 ## self._db.delete()
|
|
28 #
|
|
29 ## def test_recordJob(self):
|
|
30 ## self._iTJA.recordJob(self._iJob)
|
|
31 ## qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = ?"
|
|
32 ## params = (self._iJob.jobid,)
|
|
33 ## self._db.execute(qryParams, params)
|
|
34 ## tObs = self._db.fetchall()[0]
|
|
35 ## tExp =(self._iJob.jobid, self._iJob.groupid, self._iJob.command, self._iJob.launcher, self._iJob.queue, "waiting", "?")
|
|
36 ## self.assertEquals(tExp,tObs)
|
|
37 ##
|
|
38 ## def test_removeJob(self):
|
|
39 ## self._iTJA.recordJob(self._iJob)
|
|
40 ## self._iTJA.removeJob(self._iJob)
|
|
41 ## self.assertTrue(self._db.isEmpty(self._jobTableName))
|
|
42 ##
|
|
43 ## def test_getJobStatus(self):
|
|
44 ## self._iTJA.recordJob(self._iJob)
|
|
45 ## expStatus = "waiting"
|
|
46 ## obsStatus = self._iTJA.getJobStatus(self._iJob)
|
|
47 ## self.assertEquals(expStatus, obsStatus)
|
|
48 ##
|
|
49 ## def test_getJobStatus_no_job(self):
|
|
50 ## expStatus = "unknown"
|
|
51 ## obsStatus = self._iTJA.getJobStatus(self._iJob)
|
|
52 ## self.assertEquals(expStatus, obsStatus)
|
|
53 ##
|
|
54 ## def test_getJobStatus_no_name(self):
|
|
55 ## iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
|
|
56 ## expStatus = "unknown"
|
|
57 ## obsStatus = self._iTJA.getJobStatus(iJob)
|
|
58 ## self.assertEquals(expStatus, obsStatus)
|
|
59 ##
|
|
60 ## def test_getJobStatus_two_jobs(self):
|
|
61 ## # Warning : this case will not append, because recordJob() begin by removeJob()
|
|
62 ## sqlCmd = "INSERT INTO %s" % self._iJob.tablename
|
|
63 ## sqlCmd += " VALUES ("
|
|
64 ## sqlCmd += " \"%s\"," % self._iJob.jobid
|
|
65 ## sqlCmd += " \"%s\"," % self._iJob.jobname
|
|
66 ## sqlCmd += " \"%s\"," % self._iJob.groupid
|
|
67 ## sqlCmd += " \"%s\"," % self._iJob.command.replace("\"","\'")
|
|
68 ## sqlCmd += " \"%s\"," % self._iJob.launcher
|
|
69 ## sqlCmd += " \"%s\"," % self._iJob.queue
|
|
70 ## sqlCmd += " \"waiting\","
|
|
71 ## sqlCmd += " \"%s\"," % time.strftime( "%Y-%m-%d %H:%M:%S" )
|
|
72 ## sqlCmd += " \"?\" );"
|
|
73 ## self._db.execute(sqlCmd)
|
|
74 ## self._db.execute(sqlCmd)
|
|
75 ##
|
|
76 ## expError = "expError.txt"
|
|
77 ## expErrorHandler = open(expError, "w")
|
|
78 ## expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
|
|
79 ## expErrorHandler.close()
|
|
80 ## obsError = "obsError.txt"
|
|
81 ## obsErrorHandler = open(obsError, "w")
|
|
82 ## stderrRef = sys.stderr
|
|
83 ## sys.stderr = obsErrorHandler
|
|
84 ##
|
|
85 ## isSysExitRaised = False
|
|
86 ## try:
|
|
87 ## self._iTJA.getJobStatus(self._iJob)
|
|
88 ## except SystemExit:
|
|
89 ## isSysExitRaised = True
|
|
90 ##
|
|
91 ## obsErrorHandler.close()
|
|
92 ##
|
|
93 ## self.assertTrue(isSysExitRaised)
|
|
94 ## self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
|
|
95 ## sys.stderr = stderrRef
|
|
96 ## os.remove(obsError)
|
|
97 ## os.remove(expError)
|
|
98 ##
|
|
99 ## def test_changeJobStatus(self):
|
|
100 ## expStatus = "finished"
|
|
101 ## self._iTJA.recordJob(self._iJob)
|
|
102 ## self._iTJA.changeJobStatus(self._iJob, expStatus)
|
|
103 ## qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =? AND groupid=? AND queue=?"
|
|
104 ## params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue)
|
|
105 ## self._db.execute(qryParams, params)
|
|
106 ## obsStatus = self._db.fetchall()[0][0]
|
|
107 ## self.assertEquals(expStatus, obsStatus)
|
|
108 ## self._iTJA.removeJob(self._iJob)
|
|
109 ##
|
|
110 ## def test_getCountStatus(self):
|
|
111 ## iJob1 = self._createJobInstance()
|
|
112 ## iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
113 ## self._iTJA.recordJob(iJob1)
|
|
114 ## self._iTJA.recordJob(iJob2)
|
|
115 ## expCount = 2
|
|
116 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, iJob1.groupid, "waiting")
|
|
117 ## self.assertEquals(expCount, obsCount)
|
|
118 ##
|
|
119 ## def test_getCountStatus_without_res(self):
|
|
120 ## expCount = 0
|
|
121 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, "groupid", "waiting")
|
|
122 ## self.assertEquals(expCount, obsCount)
|
|
123 ##
|
|
124 ## def test_cleanJobGroup(self):
|
|
125 ## iJob1 = self._createJobInstance()
|
|
126 ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
127 ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
128 ## self._iTJA.recordJob(iJob1)
|
|
129 ## self._iTJA.recordJob(iJob2)
|
|
130 ## self._iTJA.recordJob(iJob3)
|
|
131 ## self._iTJA.cleanJobGroup(self._jobTableName, iJob1.groupid)
|
|
132 ## qryParams = "SELECT count(*) FROM " + self._jobTableName
|
|
133 ## self._db.execute(qryParams)
|
|
134 ## expCount = 1
|
|
135 ## obsCount = self._db.fetchall()[0][0]
|
|
136 ## self.assertEquals(expCount, obsCount)
|
|
137 ##
|
|
138 ## def test_hasUnfinishedJob_one_waiting_one_finished(self):
|
|
139 ## iJob1 = self._createJobInstance()
|
|
140 ## iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
141 ## iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
142 ## self._iTJA.recordJob(iJob1)
|
|
143 ## self._iTJA.recordJob(iJob2)
|
|
144 ## self._iTJA.recordJob(iJob3)
|
|
145 ## self._iTJA.changeJobStatus(iJob2, "finished")
|
|
146 ## expHasGrpIdFinished = True
|
|
147 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
|
|
148 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
149 ##
|
|
150 ## def test_hasUnfinishedJob_jobTable_doesnt_exist(self):
|
|
151 ## self._db.dropTable(self._jobTableName)
|
|
152 ## expHasGrpIdFinished = False
|
|
153 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, self._iJob.groupid)
|
|
154 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
155 ##
|
|
156 ## def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self):
|
|
157 ## iJob1 = self._createJobInstance()
|
|
158 ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
159 ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
160 ## self._iTJA.recordJob(iJob1)
|
|
161 ## self._iTJA.recordJob(iJob2)
|
|
162 ## self._iTJA.recordJob(iJob3)
|
|
163 ## self._iTJA.changeJobStatus(iJob1, "finished")
|
|
164 ## self._iTJA.changeJobStatus(iJob2, "finished")
|
|
165 ## expHasGrpIdFinished = False
|
|
166 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
|
|
167 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
168 ##
|
|
169 ## def test_waitJobGroup_with_finished_job(self):
|
|
170 ## obs = False
|
|
171 ## self._iTJA.recordJob(self._iJob)
|
|
172 ## self._iTJA.changeJobStatus(self._iJob, "finished")
|
|
173 ## try:
|
|
174 ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0)
|
|
175 ## except SystemExit:
|
|
176 ## obs = True
|
|
177 ## self.assertFalse(obs)
|
|
178 ##
|
|
179 ## def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
|
|
180 ## obs = False
|
|
181 ## self._iTJA.recordJob(self._iJob)
|
|
182 ## self._iTJA.changeJobStatus(self._iJob, "error")
|
|
183 ## try:
|
|
184 ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0)
|
|
185 ## except SystemExit:
|
|
186 ## obs = True
|
|
187 ## self.assertTrue(obs)
|
|
188 ##
|
|
189 ## def test_setJobIdFromSge(self):
|
|
190 ## self._iTJA.recordJob(self._iJob)
|
|
191 ## self._iTJA.setJobIdFromSge(self._iJob, 1000)
|
|
192 ## qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = ? AND queue = ? AND groupid = ?"
|
|
193 ## params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid)
|
|
194 ## self._db.execute(qryParams, params)
|
|
195 ## tObs = self._db.fetchall()[0]
|
|
196 ## tExp =(1000,)
|
|
197 ## self.assertEquals(tExp,tObs)
|
|
198 ##
|
|
199 ## def test_submitJob_8_fields_for_job_table(self):
|
|
200 ## self._db.dropTable(self._jobTableName)
|
|
201 ## sqlCmd = "CREATE TABLE " + self._jobTableName
|
|
202 ## sqlCmd += " ( jobid INT UNSIGNED"
|
|
203 ## sqlCmd += ", groupid VARCHAR(255)"
|
|
204 ## sqlCmd += ", command TEXT"
|
|
205 ## sqlCmd += ", launcher VARCHAR(1024)"
|
|
206 ## sqlCmd += ", queue VARCHAR(255)"
|
|
207 ## sqlCmd += ", status VARCHAR(255)"
|
|
208 ## sqlCmd += ", time DATETIME"
|
|
209 ## sqlCmd += ", node VARCHAR(255) )"
|
|
210 ## self._db.execute(sqlCmd)
|
|
211 ## self._iTJA.submitJob(self._iJob)
|
|
212 ## expFieldsNb = 9
|
|
213 ## obsFieldsNb = len(self._db.getFieldList(self._jobTableName))
|
|
214 ## self.assertEquals(expFieldsNb, obsFieldsNb)
|
|
215 ## os.remove("jobid.stdout")
|
|
216 ##
|
|
217 ## def test_getNodesListByGroupId(self):
|
|
218 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
|
|
219 ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
|
|
220 ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" )
|
|
221 ## self._insertJob(iJob1)
|
|
222 ## self._insertJob(iJob2)
|
|
223 ## self._insertJob(iJob3)
|
|
224 ## expNodeList = ["node1", "node2"]
|
|
225 ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid")
|
|
226 ## self.assertEquals(expNodeList, obsNodeList)
|
|
227 ##
|
|
228 ## def test_getNodesListByGroupId_empty_list(self):
|
|
229 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
|
|
230 ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
|
|
231 ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" )
|
|
232 ## self._insertJob(iJob1)
|
|
233 ## self._insertJob(iJob2)
|
|
234 ## self._insertJob(iJob3)
|
|
235 ## expNodeList = []
|
|
236 ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid3")
|
|
237 ## self.assertEquals(expNodeList, obsNodeList)
|
|
238 ##
|
|
239 ## def test_commitJob(self):
|
|
240 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
|
|
241 ## self._insertJob(iJob1)
|
|
242 ##
|
|
243 ## expJobStatus = "waiting"
|
|
244 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob)
|
|
245 ## self.assertEquals(expJobStatus, obsJobStatus)
|
|
246 ## expJobStatus = "waiting"
|
|
247 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob)
|
|
248 ## self.assertEquals(expJobStatus, obsJobStatus)
|
|
249 ## self._db.close()
|
|
250 ##
|
|
251 ## self._db = DbSQLite(self._dbName)
|
|
252 ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName)
|
|
253 ## expJobStatus = "waiting"
|
|
254 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob)
|
|
255 ## self.assertEquals(expJobStatus, obsJobStatus)
|
|
256 ##
|
|
257 ## def _insertJob(self, iJob):
|
|
258 ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName)
|
|
259 ## self._iTJA.removeJob( iJob )
|
|
260 ## sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
|
|
261 ## sqlCmd += " VALUES ("
|
|
262 ## sqlCmd += " \"%s\"," % ( iJob.jobid )
|
|
263 ## sqlCmd += " \"%s\"," % ( iJob.jobname )
|
|
264 ## sqlCmd += " \"%s\"," % ( iJob.groupid )
|
|
265 ## sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
|
|
266 ## sqlCmd += " \"%s\"," % ( iJob.launcher )
|
|
267 ## sqlCmd += " \"%s\"," % ( iJob.queue )
|
|
268 ## sqlCmd += " \"waiting\","
|
|
269 ## sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
|
|
270 ## sqlCmd += " \"%s\" );" % ( iJob.node )
|
|
271 ## self._db.execute( sqlCmd )
|
|
272 #
|
|
273 ## def testRecordJob_in_parallel_with_2_thread(self) :
|
|
274 ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py")
|
|
275 ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py")
|
|
276 ##
|
|
277 ## db1 = DbSQLite('threadJobTable.db')
|
|
278 ## db1.createJobTable(self._jobTableName)
|
|
279 ##
|
|
280 ## db2 = DbSQLite(self._dbName)
|
|
281 ##
|
|
282 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName)
|
|
283 ## iTJA2 = TableJobAdaptator(db2, self._jobTableName)
|
|
284 ##
|
|
285 ## iRJT1 = RecordJobThread(iTJA1, job1)
|
|
286 ## iRJT2 = RecordJobThread(iTJA2, job2)
|
|
287 ## iRJT1.start()
|
|
288 ## iRJT2.start()
|
|
289 ##
|
|
290 ## while iRJT1.isAlive() or iRJT2.isAlive():
|
|
291 ## time.sleep(5)
|
|
292 ##
|
|
293 ## expJobStatus = "waiting"
|
|
294 ## obsJobStatus1 = iTJA1.getJobStatus(job1)
|
|
295 ## obsJobStatus2 = iTJA2.getJobStatus(job2)
|
|
296 ##
|
|
297 ## self.assertEquals(expJobStatus, obsJobStatus1)
|
|
298 ## self.assertEquals(expJobStatus, obsJobStatus2)
|
|
299 ## db1.db.close()
|
|
300 ## db1.delete()
|
|
301 ##
|
|
302 #
|
|
303 # def test_ThreadRecordJob_sqlite3_connection_object_different_instances(self):
|
|
304 #
|
|
305 ## for i in range(1, 11):
|
|
306 ## job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i)
|
|
307 ## db1 = DbSQLite(self._dbName)
|
|
308 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName)
|
|
309 ## iRJT1 = RecordJobThread(iTJA1, job)
|
|
310 #
|
|
311 # #self._db.createJobTable(self._jobTableName)
|
|
312 #
|
|
313 # for i in range(1, 30) :
|
|
314 # job = "job%s"% i
|
|
315 # db = "db%s"%i
|
|
316 # job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i)
|
|
317 # db = DbSQLite(self._dbName)
|
|
318 # if i == 1 :
|
|
319 # db.createJobTable(self._jobTableName)
|
|
320 # iTJA = TableJobAdaptator(db, self._jobTableName)
|
|
321 # iRJT = RecordJobThread(iTJA, job)
|
|
322 # iRJT.start()
|
|
323 #
|
|
324 # #while iRJT.isAlive() :
|
|
325 # #time.sleep(1)
|
|
326 #
|
|
327 ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py")
|
|
328 ## self._createLauncherFile(job1)
|
|
329 ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py")
|
|
330 ## self._createLauncherFile(job2)
|
|
331 ##
|
|
332 ## db1 = DbSQLite(self._dbName)
|
|
333 ## db2 = DbSQLite(self._dbName)
|
|
334 ##
|
|
335 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName)
|
|
336 ## iTJA2 = TableJobAdaptator(db2, self._jobTableName)
|
|
337 ##
|
|
338 ##
|
|
339 ## iRJT1 = RecordJobThread(iTJA1, job1)
|
|
340 ## iRJT2 = RecordJobThread(iTJA2, job2)
|
|
341 ##
|
|
342 ## iRJT1.start()
|
|
343 ## iRJT2.start()
|
|
344 ##
|
|
345 ## while iRJT1.isAlive() or iRJT2.isAlive():
|
|
346 ## time.sleep(5)
|
|
347 #
|
|
348 #
|
|
349 ## self.assertNotEquals(iRJT1._iTableJobAdaptator._iDb.db, iRJT2._iTableJobAdaptator._iDb.db)
|
|
350 #
|
|
351 #
|
|
352 # def _createLauncherFile(self, iJob):
|
|
353 # jobFileHandler = open(iJob.launcher , "w")
|
|
354 ## self.cdir
|
|
355 ## self.job
|
|
356 # cDir = os.getcwd()
|
|
357 #
|
|
358 # launcher = "#!/usr/bin/python\n"
|
|
359 # launcher += "import os\n"
|
|
360 # launcher += "import sys\n"
|
|
361 #
|
|
362 # launcher += "print \"system:\", os.uname()\n"
|
|
363 # launcher += "sys.stdout.flush()\n"
|
|
364 #
|
|
365 # newStatus = "running"
|
|
366 # launcher += "from commons.core.sql.Job import Job\n"
|
|
367 # launcher += "from commons.core.sql.DbSQLite import DbSQLite\n"
|
|
368 # launcher += "from commons.core.sql.TableJobAdaptator import TableJobAdaptator\n"
|
|
369 # launcher += "iJob = Job('%s', %s, '%s', '%s')\n" % (iJob.tablename, iJob.jobid, iJob.jobname, iJob.groupid)
|
|
370 # launcher += "iDb = DbSQLite('%s/%s')\n" % (cDir, self._dbName)
|
|
371 # launcher += "iTJA = TableJobAdaptator(iDb, '%s')\n" % self._jobTableName
|
|
372 # launcher += "if not iDb.doesTableExist('%s'):\n" % (iJob.tablename)
|
|
373 # launcher += "\tiDb.createJobTable('%s')\n" % self._jobTableName
|
|
374 #
|
|
375 # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus
|
|
376 #
|
|
377 # launcher += "print \"LAUNCH: " + iJob.command + "\"\n"
|
|
378 # launcher += "sys.stdout.flush()\n"
|
|
379 # launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n"
|
|
380 # launcher += "if exitStatus != 0:\n"
|
|
381 # launcher += "\tprint \"ERROR: " + iJob.command + " returned exit status '%i'\" % ( exitStatus )\n"
|
|
382 #
|
|
383 # newStatus = "finished"
|
|
384 # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus
|
|
385 # launcher += "iDb.close()\n"
|
|
386 #
|
|
387 # launcher += "sys.exit(0)\n"
|
|
388 # jobFileHandler.write(launcher)
|
|
389 # jobFileHandler.close()
|
|
390 # os.chmod(iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
391 #
|
|
392 # def _createJobInstance(self):
|
|
393 # return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
|
|
394
|
|
395
|
|
396 class Test_TableJobAdaptator_MySQL( unittest.TestCase ):
|
|
397
|
|
398 def setUp(self):
|
|
399 self._jobTableName = "dummyJobTable"
|
|
400 self._db = DbMySql()
|
|
401 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
|
|
402 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
|
|
403 self._iJob = self._createJobInstance()
|
|
404
|
|
405 def tearDown(self):
|
|
406 self._db.dropTable(self._jobTableName)
|
|
407 self._iTJA = None
|
|
408 self._db.close()
|
|
409
|
|
410 def test_recordJob(self):
|
|
411 self._iTJA.recordJob(self._iJob)
|
|
412 qryParams = "SELECT jobid, jobname, groupid, launcher, queue, resources, status, node FROM " + self._jobTableName + " WHERE jobid = %s"
|
|
413 params = (self._iJob.jobid)
|
|
414 self._db.execute(qryParams, params)
|
|
415 tObs = self._db.fetchall()[0]
|
|
416 tExp =(self._iJob.jobid, self._iJob.jobname, self._iJob.groupid, self._iJob.launcher, self._iJob.queue, "['mem_free=10M']", "waiting", "?")
|
|
417 self.assertEquals(tExp,tObs)
|
|
418
|
|
419 def test_removeJob(self):
|
|
420 self._iTJA.recordJob(self._iJob)
|
|
421 self._iTJA.removeJob(self._iJob)
|
|
422 isTableEmpty = self._db.isEmpty(self._jobTableName)
|
|
423 self.assertTrue(isTableEmpty)
|
|
424
|
|
425 def test_getJobStatus(self):
|
|
426 self._iTJA.recordJob(self._iJob)
|
|
427 expStatus = "waiting"
|
|
428 obsStatus = self._iTJA.getJobStatus(self._iJob)
|
|
429 self.assertEquals(expStatus, obsStatus)
|
|
430
|
|
431 def test_getJobStatus_no_job(self):
|
|
432 expStatus = "unknown"
|
|
433 obsStatus = self._iTJA.getJobStatus(self._iJob)
|
|
434 self.assertEquals(expStatus, obsStatus)
|
|
435
|
|
436 def test_getJobStatus_no_name(self):
|
|
437 iJob = Job(self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources")
|
|
438 expStatus = "unknown"
|
|
439 obsStatus = self._iTJA.getJobStatus(iJob)
|
|
440 self.assertEquals(expStatus, obsStatus)
|
|
441
|
|
442 def test_getJobStatus_two_jobs(self):
|
|
443 # Warning : this case will not append, because recordJob() begin by removeJob()
|
|
444 sqlCmd = "INSERT INTO %s" % self._jobTableName
|
|
445 sqlCmd += " VALUES ("
|
|
446 sqlCmd += " \"%s\"," % self._iJob.jobid
|
|
447 sqlCmd += " \"%s\"," % self._iJob.jobname
|
|
448 sqlCmd += " \"%s\"," % self._iJob.groupid
|
|
449 sqlCmd += " \"%s\"," % self._iJob.launcher
|
|
450 sqlCmd += " \"%s\"," % self._iJob.queue
|
|
451 sqlCmd += " \"%s\"," % self._iJob.lResources
|
|
452 sqlCmd += " \"waiting\","
|
|
453 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S")
|
|
454 sqlCmd += " \"?\" );"
|
|
455 self._db.execute(sqlCmd)
|
|
456 self._db.execute(sqlCmd)
|
|
457
|
|
458 expError = "expError.txt"
|
|
459 expErrorHandler = open(expError, "w")
|
|
460 expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
|
|
461 expErrorHandler.close()
|
|
462 obsError = "obsError.txt"
|
|
463 obsErrorHandler = open(obsError, "w")
|
|
464 stderrRef = sys.stderr
|
|
465 sys.stderr = obsErrorHandler
|
|
466
|
|
467 isSysExitRaised = False
|
|
468 try:
|
|
469 self._iTJA.getJobStatus(self._iJob)
|
|
470 except SystemExit:
|
|
471 isSysExitRaised = True
|
|
472 obsErrorHandler.close()
|
|
473 self.assertTrue(isSysExitRaised)
|
|
474 self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
|
|
475 sys.stderr = stderrRef
|
|
476 os.remove(obsError)
|
|
477 os.remove(expError)
|
|
478
|
|
479 def test_changeJobStatus(self):
|
|
480 expStatus = "finished"
|
|
481 self._iTJA.recordJob(self._iJob)
|
|
482 self._iTJA.changeJobStatus(self._iJob, expStatus)
|
|
483 qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s"
|
|
484 params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue)
|
|
485 self._db.execute(qryParams, params)
|
|
486 obsStatus = self._db.fetchall()[0][0]
|
|
487 self.assertEquals(expStatus, obsStatus)
|
|
488
|
|
489 def test_getCountStatus(self):
|
|
490 iJob1 = self._createJobInstance()
|
|
491 iJob2 = Job(1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
492 self._iTJA.recordJob(iJob1)
|
|
493 self._iTJA.recordJob(iJob2)
|
|
494 expCount = 2
|
|
495 obsCount = self._iTJA.getCountStatus(iJob1.groupid, "waiting")
|
|
496 self.assertEquals(expCount, obsCount)
|
|
497
|
|
498 def test_getCountStatus_without_res(self):
|
|
499 expCount = 0
|
|
500 obsCount = self._iTJA.getCountStatus("groupid", "waiting")
|
|
501 self.assertEquals(expCount, obsCount)
|
|
502
|
|
503 def test_cleanJobGroup(self):
|
|
504 iJob1 = self._createJobInstance()
|
|
505 iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
506 iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
507 self._iTJA.recordJob(iJob1)
|
|
508 self._iTJA.recordJob(iJob2)
|
|
509 self._iTJA.recordJob(iJob3)
|
|
510 self._iTJA.cleanJobGroup(iJob1.groupid)
|
|
511 qryParams = "SELECT count(*) FROM %s" % self._jobTableName
|
|
512 self._db.execute(qryParams)
|
|
513 expCount = 1
|
|
514 obsCount = self._db.fetchall()[0][0]
|
|
515 self.assertEquals(expCount, obsCount)
|
|
516
|
|
517 def test_hasUnfinishedJob_one_waiting_one_finished(self):
|
|
518 iJob1 = self._createJobInstance()
|
|
519 iJob2 = Job(0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
520 iJob3 = Job(0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
521 self._iTJA.recordJob(iJob1)
|
|
522 self._iTJA.recordJob(iJob2)
|
|
523 self._iTJA.recordJob(iJob3)
|
|
524 self._iTJA.changeJobStatus(iJob2, "finished")
|
|
525 expHasGrpIdFinished = True
|
|
526 obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid)
|
|
527 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
528
|
|
529 def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self):
|
|
530 iJob1 = self._createJobInstance()
|
|
531 iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
532 iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
533 self._iTJA.recordJob(iJob1)
|
|
534 self._iTJA.recordJob(iJob2)
|
|
535 self._iTJA.recordJob(iJob3)
|
|
536 self._iTJA.changeJobStatus(iJob1, "finished")
|
|
537 self._iTJA.changeJobStatus(iJob2, "finished")
|
|
538 expHasGrpIdFinished = False
|
|
539 obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid)
|
|
540 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
541
|
|
542 def test_waitJobGroup_with_finished_job(self):
|
|
543 obs = False
|
|
544 self._iTJA.recordJob(self._iJob)
|
|
545 self._iTJA.changeJobStatus(self._iJob, "finished")
|
|
546 try:
|
|
547 self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0)
|
|
548 except SystemExit:
|
|
549 obs = True
|
|
550 self.assertFalse(obs)
|
|
551
|
|
552 def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
|
|
553 obs = False
|
|
554 self._iTJA.recordJob(self._iJob)
|
|
555 self._iTJA.changeJobStatus(self._iJob, "error")
|
|
556 try:
|
|
557 self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0)
|
|
558 except SystemExit:
|
|
559 obs = True
|
|
560 self.assertTrue(obs)
|
|
561
|
|
562 #TODO: how to test ?!?
|
|
563 # def test_waitJobGroup_with_error_relaunch(self):
|
|
564 # iJob = Job(0, "job1", "groupid", "queue.q", "command", "launcherFile", "node", ["mem_free=10M", "test=TRUE"])
|
|
565 # obs = False
|
|
566 # self._iTJA.recordJob(iJob)
|
|
567 # self._iTJA.changeJobStatus(iJob, "error")
|
|
568 # try:
|
|
569 # self._iTJA.waitJobGroup(iJob.groupid)
|
|
570 # except SystemExit:
|
|
571 # obs = True
|
|
572 # self.assertTrue(obs)
|
|
573
|
|
574 def test_updateJobIdInDB(self):
|
|
575 self._iTJA.recordJob(self._iJob)
|
|
576 self._iTJA.updateJobIdInDB(self._iJob, 1000)
|
|
577 qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s"
|
|
578 params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid)
|
|
579 self._db.execute(qryParams, params)
|
|
580 tObs = self._db.fetchall()[0]
|
|
581 tExp =(1000,)
|
|
582 self.assertEquals(tExp,tObs)
|
|
583
|
|
584 def test_getNodesListByGroupId(self):
|
|
585 iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources")
|
|
586 iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources")
|
|
587 iJob3 = Job(2, "job3", "groupid", "queue", "command", "launcherFile", "node2", "lResources")
|
|
588 iJob4 = Job(3, "job4", "groupid2", "queue", "command", "launcherFile", "node3", "lResources")
|
|
589 self._insertJob(iJob1)
|
|
590 self._insertJob(iJob2)
|
|
591 self._insertJob(iJob3)
|
|
592 self._insertJob(iJob4)
|
|
593 expNodeList = ["node1", "node2"]
|
|
594 obsNodeList = self._iTJA.getNodesListByGroupId("groupid")
|
|
595 self.assertEquals(expNodeList, obsNodeList)
|
|
596
|
|
597 def test_getNodesListByGroupId_empty_list(self):
|
|
598 iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources")
|
|
599 iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources")
|
|
600 iJob3 = Job(2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources")
|
|
601 self._insertJob(iJob1)
|
|
602 self._insertJob(iJob2)
|
|
603 self._insertJob(iJob3)
|
|
604 expNodeList = []
|
|
605 obsNodeList = self._iTJA.getNodesListByGroupId("groupid3")
|
|
606 self.assertEquals(expNodeList, obsNodeList)
|
|
607
|
|
608 # TODO test TableJobAdaptator._createJobInstance TableJobAdaptator._createLauncherFile
|
|
609 def _insertJob(self, iJob):
|
|
610 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
|
|
611 self._iTJA.removeJob(iJob)
|
|
612 sqlCmd = "INSERT INTO %s" % self._jobTableName
|
|
613 sqlCmd += " VALUES ("
|
|
614 sqlCmd += " \"%s\"," % iJob.jobid
|
|
615 sqlCmd += " \"%s\"," % iJob.jobname
|
|
616 sqlCmd += " \"%s\"," % iJob.groupid
|
|
617 sqlCmd += " \"%s\"," % iJob.launcher
|
|
618 sqlCmd += " \"%s\"," % iJob.queue
|
|
619 sqlCmd += " \"%s\"," % iJob.lResources
|
|
620 sqlCmd += " \"waiting\","
|
|
621 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S")
|
|
622 sqlCmd += " \"%s\" );" % iJob.node
|
|
623 self._db.execute(sqlCmd)
|
|
624
|
|
625 def _createJobInstance(self):
|
|
626 return Job(0, "job1", "groupid", "", "command", "launcherFile", "node", ["mem_free=10M"])
|
|
627
|
|
628 #class RecordJobThread(threading.Thread):
|
|
629 #
|
|
630 # def __init__(self, iTableJobAdaptator, iJob):
|
|
631 # threading.Thread.__init__(self)
|
|
632 # self._iTableJobAdaptator = iTableJobAdaptator
|
|
633 # self._iJob = iJob
|
|
634 #
|
|
635 # def run(self):
|
|
636 # self._iTableJobAdaptator.recordJob(self._iJob)
|
|
637 # #self._iTableJobAdaptator.submitJob(self._iJob)
|
|
638
|
|
639 if __name__ == "__main__":
|
|
640 unittest.main()
|