Mercurial > repos > yufei-luo > s_mart
comparison commons/core/sql/test/Test_TableJobAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:ea3082881bf8 | 6:769e306b7933 |
---|---|
1 import unittest | |
2 import sys | |
3 import os | |
4 import time | |
5 #import stat | |
6 #import threading | |
7 from commons.core.sql.DbMySql import DbMySql | |
8 #from commons.core.sql.DbSQLite import DbSQLite | |
9 from commons.core.sql.Job import Job | |
10 from commons.core.utils.FileUtils import FileUtils | |
11 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory | |
12 | |
13 #class Test_TableJobAdaptator_SQLite( unittest.TestCase ): | |
14 # | |
15 # def setUp(self): | |
16 # self._jobTableName = "dummyJobTable" | |
17 # self._dbName = "test.db" | |
18 # self._db = DbSQLite(self._dbName) | |
19 # self._iTJA = TableJobAdaptator(self._db, self._jobTableName) | |
20 # if not self._db.doesTableExist(self._jobTableName): | |
21 # self._db.createJobTable(self._jobTableName) | |
22 # self._iJob = self._createJobInstance() | |
23 # | |
24 # def tearDown(self): | |
25 # self._iTJA = None | |
26 # self._db.close() | |
27 ## self._db.delete() | |
28 # | |
29 ## def test_recordJob(self): | |
30 ## self._iTJA.recordJob(self._iJob) | |
31 ## qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = ?" | |
32 ## params = (self._iJob.jobid,) | |
33 ## self._db.execute(qryParams, params) | |
34 ## tObs = self._db.fetchall()[0] | |
35 ## tExp =(self._iJob.jobid, self._iJob.groupid, self._iJob.command, self._iJob.launcher, self._iJob.queue, "waiting", "?") | |
36 ## self.assertEquals(tExp,tObs) | |
37 ## | |
38 ## def test_removeJob(self): | |
39 ## self._iTJA.recordJob(self._iJob) | |
40 ## self._iTJA.removeJob(self._iJob) | |
41 ## self.assertTrue(self._db.isEmpty(self._jobTableName)) | |
42 ## | |
43 ## def test_getJobStatus(self): | |
44 ## self._iTJA.recordJob(self._iJob) | |
45 ## expStatus = "waiting" | |
46 ## obsStatus = self._iTJA.getJobStatus(self._iJob) | |
47 ## self.assertEquals(expStatus, obsStatus) | |
48 ## | |
49 ## def test_getJobStatus_no_job(self): | |
50 ## expStatus = "unknown" | |
51 ## obsStatus = self._iTJA.getJobStatus(self._iJob) | |
52 ## self.assertEquals(expStatus, obsStatus) | |
53 ## | |
54 ## def test_getJobStatus_no_name(self): | |
55 ## iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) | |
56 ## expStatus = "unknown" | |
57 ## obsStatus = self._iTJA.getJobStatus(iJob) | |
58 ## self.assertEquals(expStatus, obsStatus) | |
59 ## | |
60 ## def test_getJobStatus_two_jobs(self): | |
61 ## # Warning : this case will not append, because recordJob() begin by removeJob() | |
62 ## sqlCmd = "INSERT INTO %s" % self._iJob.tablename | |
63 ## sqlCmd += " VALUES (" | |
64 ## sqlCmd += " \"%s\"," % self._iJob.jobid | |
65 ## sqlCmd += " \"%s\"," % self._iJob.jobname | |
66 ## sqlCmd += " \"%s\"," % self._iJob.groupid | |
67 ## sqlCmd += " \"%s\"," % self._iJob.command.replace("\"","\'") | |
68 ## sqlCmd += " \"%s\"," % self._iJob.launcher | |
69 ## sqlCmd += " \"%s\"," % self._iJob.queue | |
70 ## sqlCmd += " \"waiting\"," | |
71 ## sqlCmd += " \"%s\"," % time.strftime( "%Y-%m-%d %H:%M:%S" ) | |
72 ## sqlCmd += " \"?\" );" | |
73 ## self._db.execute(sqlCmd) | |
74 ## self._db.execute(sqlCmd) | |
75 ## | |
76 ## expError = "expError.txt" | |
77 ## expErrorHandler = open(expError, "w") | |
78 ## expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") | |
79 ## expErrorHandler.close() | |
80 ## obsError = "obsError.txt" | |
81 ## obsErrorHandler = open(obsError, "w") | |
82 ## stderrRef = sys.stderr | |
83 ## sys.stderr = obsErrorHandler | |
84 ## | |
85 ## isSysExitRaised = False | |
86 ## try: | |
87 ## self._iTJA.getJobStatus(self._iJob) | |
88 ## except SystemExit: | |
89 ## isSysExitRaised = True | |
90 ## | |
91 ## obsErrorHandler.close() | |
92 ## | |
93 ## self.assertTrue(isSysExitRaised) | |
94 ## self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) | |
95 ## sys.stderr = stderrRef | |
96 ## os.remove(obsError) | |
97 ## os.remove(expError) | |
98 ## | |
99 ## def test_changeJobStatus(self): | |
100 ## expStatus = "finished" | |
101 ## self._iTJA.recordJob(self._iJob) | |
102 ## self._iTJA.changeJobStatus(self._iJob, expStatus) | |
103 ## qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =? AND groupid=? AND queue=?" | |
104 ## params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue) | |
105 ## self._db.execute(qryParams, params) | |
106 ## obsStatus = self._db.fetchall()[0][0] | |
107 ## self.assertEquals(expStatus, obsStatus) | |
108 ## self._iTJA.removeJob(self._iJob) | |
109 ## | |
110 ## def test_getCountStatus(self): | |
111 ## iJob1 = self._createJobInstance() | |
112 ## iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
113 ## self._iTJA.recordJob(iJob1) | |
114 ## self._iTJA.recordJob(iJob2) | |
115 ## expCount = 2 | |
116 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, iJob1.groupid, "waiting") | |
117 ## self.assertEquals(expCount, obsCount) | |
118 ## | |
119 ## def test_getCountStatus_without_res(self): | |
120 ## expCount = 0 | |
121 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, "groupid", "waiting") | |
122 ## self.assertEquals(expCount, obsCount) | |
123 ## | |
124 ## def test_cleanJobGroup(self): | |
125 ## iJob1 = self._createJobInstance() | |
126 ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
127 ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
128 ## self._iTJA.recordJob(iJob1) | |
129 ## self._iTJA.recordJob(iJob2) | |
130 ## self._iTJA.recordJob(iJob3) | |
131 ## self._iTJA.cleanJobGroup(self._jobTableName, iJob1.groupid) | |
132 ## qryParams = "SELECT count(*) FROM " + self._jobTableName | |
133 ## self._db.execute(qryParams) | |
134 ## expCount = 1 | |
135 ## obsCount = self._db.fetchall()[0][0] | |
136 ## self.assertEquals(expCount, obsCount) | |
137 ## | |
138 ## def test_hasUnfinishedJob_one_waiting_one_finished(self): | |
139 ## iJob1 = self._createJobInstance() | |
140 ## iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
141 ## iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
142 ## self._iTJA.recordJob(iJob1) | |
143 ## self._iTJA.recordJob(iJob2) | |
144 ## self._iTJA.recordJob(iJob3) | |
145 ## self._iTJA.changeJobStatus(iJob2, "finished") | |
146 ## expHasGrpIdFinished = True | |
147 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid) | |
148 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
149 ## | |
150 ## def test_hasUnfinishedJob_jobTable_doesnt_exist(self): | |
151 ## self._db.dropTable(self._jobTableName) | |
152 ## expHasGrpIdFinished = False | |
153 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, self._iJob.groupid) | |
154 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
155 ## | |
156 ## def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self): | |
157 ## iJob1 = self._createJobInstance() | |
158 ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
159 ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
160 ## self._iTJA.recordJob(iJob1) | |
161 ## self._iTJA.recordJob(iJob2) | |
162 ## self._iTJA.recordJob(iJob3) | |
163 ## self._iTJA.changeJobStatus(iJob1, "finished") | |
164 ## self._iTJA.changeJobStatus(iJob2, "finished") | |
165 ## expHasGrpIdFinished = False | |
166 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid) | |
167 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
168 ## | |
169 ## def test_waitJobGroup_with_finished_job(self): | |
170 ## obs = False | |
171 ## self._iTJA.recordJob(self._iJob) | |
172 ## self._iTJA.changeJobStatus(self._iJob, "finished") | |
173 ## try: | |
174 ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0) | |
175 ## except SystemExit: | |
176 ## obs = True | |
177 ## self.assertFalse(obs) | |
178 ## | |
179 ## def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): | |
180 ## obs = False | |
181 ## self._iTJA.recordJob(self._iJob) | |
182 ## self._iTJA.changeJobStatus(self._iJob, "error") | |
183 ## try: | |
184 ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0) | |
185 ## except SystemExit: | |
186 ## obs = True | |
187 ## self.assertTrue(obs) | |
188 ## | |
189 ## def test_setJobIdFromSge(self): | |
190 ## self._iTJA.recordJob(self._iJob) | |
191 ## self._iTJA.setJobIdFromSge(self._iJob, 1000) | |
192 ## qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = ? AND queue = ? AND groupid = ?" | |
193 ## params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid) | |
194 ## self._db.execute(qryParams, params) | |
195 ## tObs = self._db.fetchall()[0] | |
196 ## tExp =(1000,) | |
197 ## self.assertEquals(tExp,tObs) | |
198 ## | |
199 ## def test_submitJob_8_fields_for_job_table(self): | |
200 ## self._db.dropTable(self._jobTableName) | |
201 ## sqlCmd = "CREATE TABLE " + self._jobTableName | |
202 ## sqlCmd += " ( jobid INT UNSIGNED" | |
203 ## sqlCmd += ", groupid VARCHAR(255)" | |
204 ## sqlCmd += ", command TEXT" | |
205 ## sqlCmd += ", launcher VARCHAR(1024)" | |
206 ## sqlCmd += ", queue VARCHAR(255)" | |
207 ## sqlCmd += ", status VARCHAR(255)" | |
208 ## sqlCmd += ", time DATETIME" | |
209 ## sqlCmd += ", node VARCHAR(255) )" | |
210 ## self._db.execute(sqlCmd) | |
211 ## self._iTJA.submitJob(self._iJob) | |
212 ## expFieldsNb = 9 | |
213 ## obsFieldsNb = len(self._db.getFieldList(self._jobTableName)) | |
214 ## self.assertEquals(expFieldsNb, obsFieldsNb) | |
215 ## os.remove("jobid.stdout") | |
216 ## | |
217 ## def test_getNodesListByGroupId(self): | |
218 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) | |
219 ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) | |
220 ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" ) | |
221 ## self._insertJob(iJob1) | |
222 ## self._insertJob(iJob2) | |
223 ## self._insertJob(iJob3) | |
224 ## expNodeList = ["node1", "node2"] | |
225 ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid") | |
226 ## self.assertEquals(expNodeList, obsNodeList) | |
227 ## | |
228 ## def test_getNodesListByGroupId_empty_list(self): | |
229 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) | |
230 ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) | |
231 ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" ) | |
232 ## self._insertJob(iJob1) | |
233 ## self._insertJob(iJob2) | |
234 ## self._insertJob(iJob3) | |
235 ## expNodeList = [] | |
236 ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid3") | |
237 ## self.assertEquals(expNodeList, obsNodeList) | |
238 ## | |
239 ## def test_commitJob(self): | |
240 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) | |
241 ## self._insertJob(iJob1) | |
242 ## | |
243 ## expJobStatus = "waiting" | |
244 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob) | |
245 ## self.assertEquals(expJobStatus, obsJobStatus) | |
246 ## expJobStatus = "waiting" | |
247 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob) | |
248 ## self.assertEquals(expJobStatus, obsJobStatus) | |
249 ## self._db.close() | |
250 ## | |
251 ## self._db = DbSQLite(self._dbName) | |
252 ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName) | |
253 ## expJobStatus = "waiting" | |
254 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob) | |
255 ## self.assertEquals(expJobStatus, obsJobStatus) | |
256 ## | |
257 ## def _insertJob(self, iJob): | |
258 ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName) | |
259 ## self._iTJA.removeJob( iJob ) | |
260 ## sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) | |
261 ## sqlCmd += " VALUES (" | |
262 ## sqlCmd += " \"%s\"," % ( iJob.jobid ) | |
263 ## sqlCmd += " \"%s\"," % ( iJob.jobname ) | |
264 ## sqlCmd += " \"%s\"," % ( iJob.groupid ) | |
265 ## sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) | |
266 ## sqlCmd += " \"%s\"," % ( iJob.launcher ) | |
267 ## sqlCmd += " \"%s\"," % ( iJob.queue ) | |
268 ## sqlCmd += " \"waiting\"," | |
269 ## sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) | |
270 ## sqlCmd += " \"%s\" );" % ( iJob.node ) | |
271 ## self._db.execute( sqlCmd ) | |
272 # | |
273 ## def testRecordJob_in_parallel_with_2_thread(self) : | |
274 ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py") | |
275 ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py") | |
276 ## | |
277 ## db1 = DbSQLite('threadJobTable.db') | |
278 ## db1.createJobTable(self._jobTableName) | |
279 ## | |
280 ## db2 = DbSQLite(self._dbName) | |
281 ## | |
282 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName) | |
283 ## iTJA2 = TableJobAdaptator(db2, self._jobTableName) | |
284 ## | |
285 ## iRJT1 = RecordJobThread(iTJA1, job1) | |
286 ## iRJT2 = RecordJobThread(iTJA2, job2) | |
287 ## iRJT1.start() | |
288 ## iRJT2.start() | |
289 ## | |
290 ## while iRJT1.isAlive() or iRJT2.isAlive(): | |
291 ## time.sleep(5) | |
292 ## | |
293 ## expJobStatus = "waiting" | |
294 ## obsJobStatus1 = iTJA1.getJobStatus(job1) | |
295 ## obsJobStatus2 = iTJA2.getJobStatus(job2) | |
296 ## | |
297 ## self.assertEquals(expJobStatus, obsJobStatus1) | |
298 ## self.assertEquals(expJobStatus, obsJobStatus2) | |
299 ## db1.db.close() | |
300 ## db1.delete() | |
301 ## | |
302 # | |
303 # def test_ThreadRecordJob_sqlite3_connection_object_different_instances(self): | |
304 # | |
305 ## for i in range(1, 11): | |
306 ## job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i) | |
307 ## db1 = DbSQLite(self._dbName) | |
308 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName) | |
309 ## iRJT1 = RecordJobThread(iTJA1, job) | |
310 # | |
311 # #self._db.createJobTable(self._jobTableName) | |
312 # | |
313 # for i in range(1, 30) : | |
314 # job = "job%s"% i | |
315 # db = "db%s"%i | |
316 # job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i) | |
317 # db = DbSQLite(self._dbName) | |
318 # if i == 1 : | |
319 # db.createJobTable(self._jobTableName) | |
320 # iTJA = TableJobAdaptator(db, self._jobTableName) | |
321 # iRJT = RecordJobThread(iTJA, job) | |
322 # iRJT.start() | |
323 # | |
324 # #while iRJT.isAlive() : | |
325 # #time.sleep(1) | |
326 # | |
327 ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py") | |
328 ## self._createLauncherFile(job1) | |
329 ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py") | |
330 ## self._createLauncherFile(job2) | |
331 ## | |
332 ## db1 = DbSQLite(self._dbName) | |
333 ## db2 = DbSQLite(self._dbName) | |
334 ## | |
335 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName) | |
336 ## iTJA2 = TableJobAdaptator(db2, self._jobTableName) | |
337 ## | |
338 ## | |
339 ## iRJT1 = RecordJobThread(iTJA1, job1) | |
340 ## iRJT2 = RecordJobThread(iTJA2, job2) | |
341 ## | |
342 ## iRJT1.start() | |
343 ## iRJT2.start() | |
344 ## | |
345 ## while iRJT1.isAlive() or iRJT2.isAlive(): | |
346 ## time.sleep(5) | |
347 # | |
348 # | |
349 ## self.assertNotEquals(iRJT1._iTableJobAdaptator._iDb.db, iRJT2._iTableJobAdaptator._iDb.db) | |
350 # | |
351 # | |
352 # def _createLauncherFile(self, iJob): | |
353 # jobFileHandler = open(iJob.launcher , "w") | |
354 ## self.cdir | |
355 ## self.job | |
356 # cDir = os.getcwd() | |
357 # | |
358 # launcher = "#!/usr/bin/python\n" | |
359 # launcher += "import os\n" | |
360 # launcher += "import sys\n" | |
361 # | |
362 # launcher += "print \"system:\", os.uname()\n" | |
363 # launcher += "sys.stdout.flush()\n" | |
364 # | |
365 # newStatus = "running" | |
366 # launcher += "from commons.core.sql.Job import Job\n" | |
367 # launcher += "from commons.core.sql.DbSQLite import DbSQLite\n" | |
368 # launcher += "from commons.core.sql.TableJobAdaptator import TableJobAdaptator\n" | |
369 # launcher += "iJob = Job('%s', %s, '%s', '%s')\n" % (iJob.tablename, iJob.jobid, iJob.jobname, iJob.groupid) | |
370 # launcher += "iDb = DbSQLite('%s/%s')\n" % (cDir, self._dbName) | |
371 # launcher += "iTJA = TableJobAdaptator(iDb, '%s')\n" % self._jobTableName | |
372 # launcher += "if not iDb.doesTableExist('%s'):\n" % (iJob.tablename) | |
373 # launcher += "\tiDb.createJobTable('%s')\n" % self._jobTableName | |
374 # | |
375 # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus | |
376 # | |
377 # launcher += "print \"LAUNCH: " + iJob.command + "\"\n" | |
378 # launcher += "sys.stdout.flush()\n" | |
379 # launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n" | |
380 # launcher += "if exitStatus != 0:\n" | |
381 # launcher += "\tprint \"ERROR: " + iJob.command + " returned exit status '%i'\" % ( exitStatus )\n" | |
382 # | |
383 # newStatus = "finished" | |
384 # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus | |
385 # launcher += "iDb.close()\n" | |
386 # | |
387 # launcher += "sys.exit(0)\n" | |
388 # jobFileHandler.write(launcher) | |
389 # jobFileHandler.close() | |
390 # os.chmod(iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) | |
391 # | |
392 # def _createJobInstance(self): | |
393 # return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) | |
394 | |
395 | |
396 class Test_TableJobAdaptator_MySQL( unittest.TestCase ): | |
397 | |
398 def setUp(self): | |
399 self._jobTableName = "dummyJobTable" | |
400 self._db = DbMySql() | |
401 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) | |
402 self._db.createTable(self._jobTableName, "jobs", overwrite = True) | |
403 self._iJob = self._createJobInstance() | |
404 | |
405 def tearDown(self): | |
406 self._db.dropTable(self._jobTableName) | |
407 self._iTJA = None | |
408 self._db.close() | |
409 | |
410 def test_recordJob(self): | |
411 self._iTJA.recordJob(self._iJob) | |
412 qryParams = "SELECT jobid, jobname, groupid, launcher, queue, resources, status, node FROM " + self._jobTableName + " WHERE jobid = %s" | |
413 params = (self._iJob.jobid) | |
414 self._db.execute(qryParams, params) | |
415 tObs = self._db.fetchall()[0] | |
416 tExp =(self._iJob.jobid, self._iJob.jobname, self._iJob.groupid, self._iJob.launcher, self._iJob.queue, "['mem_free=10M']", "waiting", "?") | |
417 self.assertEquals(tExp,tObs) | |
418 | |
419 def test_removeJob(self): | |
420 self._iTJA.recordJob(self._iJob) | |
421 self._iTJA.removeJob(self._iJob) | |
422 isTableEmpty = self._db.isEmpty(self._jobTableName) | |
423 self.assertTrue(isTableEmpty) | |
424 | |
425 def test_getJobStatus(self): | |
426 self._iTJA.recordJob(self._iJob) | |
427 expStatus = "waiting" | |
428 obsStatus = self._iTJA.getJobStatus(self._iJob) | |
429 self.assertEquals(expStatus, obsStatus) | |
430 | |
431 def test_getJobStatus_no_job(self): | |
432 expStatus = "unknown" | |
433 obsStatus = self._iTJA.getJobStatus(self._iJob) | |
434 self.assertEquals(expStatus, obsStatus) | |
435 | |
436 def test_getJobStatus_no_name(self): | |
437 iJob = Job(self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources") | |
438 expStatus = "unknown" | |
439 obsStatus = self._iTJA.getJobStatus(iJob) | |
440 self.assertEquals(expStatus, obsStatus) | |
441 | |
442 def test_getJobStatus_two_jobs(self): | |
443 # Warning : this case will not append, because recordJob() begin by removeJob() | |
444 sqlCmd = "INSERT INTO %s" % self._jobTableName | |
445 sqlCmd += " VALUES (" | |
446 sqlCmd += " \"%s\"," % self._iJob.jobid | |
447 sqlCmd += " \"%s\"," % self._iJob.jobname | |
448 sqlCmd += " \"%s\"," % self._iJob.groupid | |
449 sqlCmd += " \"%s\"," % self._iJob.launcher | |
450 sqlCmd += " \"%s\"," % self._iJob.queue | |
451 sqlCmd += " \"%s\"," % self._iJob.lResources | |
452 sqlCmd += " \"waiting\"," | |
453 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") | |
454 sqlCmd += " \"?\" );" | |
455 self._db.execute(sqlCmd) | |
456 self._db.execute(sqlCmd) | |
457 | |
458 expError = "expError.txt" | |
459 expErrorHandler = open(expError, "w") | |
460 expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") | |
461 expErrorHandler.close() | |
462 obsError = "obsError.txt" | |
463 obsErrorHandler = open(obsError, "w") | |
464 stderrRef = sys.stderr | |
465 sys.stderr = obsErrorHandler | |
466 | |
467 isSysExitRaised = False | |
468 try: | |
469 self._iTJA.getJobStatus(self._iJob) | |
470 except SystemExit: | |
471 isSysExitRaised = True | |
472 obsErrorHandler.close() | |
473 self.assertTrue(isSysExitRaised) | |
474 self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) | |
475 sys.stderr = stderrRef | |
476 os.remove(obsError) | |
477 os.remove(expError) | |
478 | |
479 def test_changeJobStatus(self): | |
480 expStatus = "finished" | |
481 self._iTJA.recordJob(self._iJob) | |
482 self._iTJA.changeJobStatus(self._iJob, expStatus) | |
483 qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" | |
484 params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue) | |
485 self._db.execute(qryParams, params) | |
486 obsStatus = self._db.fetchall()[0][0] | |
487 self.assertEquals(expStatus, obsStatus) | |
488 | |
489 def test_getCountStatus(self): | |
490 iJob1 = self._createJobInstance() | |
491 iJob2 = Job(1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
492 self._iTJA.recordJob(iJob1) | |
493 self._iTJA.recordJob(iJob2) | |
494 expCount = 2 | |
495 obsCount = self._iTJA.getCountStatus(iJob1.groupid, "waiting") | |
496 self.assertEquals(expCount, obsCount) | |
497 | |
498 def test_getCountStatus_without_res(self): | |
499 expCount = 0 | |
500 obsCount = self._iTJA.getCountStatus("groupid", "waiting") | |
501 self.assertEquals(expCount, obsCount) | |
502 | |
503 def test_cleanJobGroup(self): | |
504 iJob1 = self._createJobInstance() | |
505 iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
506 iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
507 self._iTJA.recordJob(iJob1) | |
508 self._iTJA.recordJob(iJob2) | |
509 self._iTJA.recordJob(iJob3) | |
510 self._iTJA.cleanJobGroup(iJob1.groupid) | |
511 qryParams = "SELECT count(*) FROM %s" % self._jobTableName | |
512 self._db.execute(qryParams) | |
513 expCount = 1 | |
514 obsCount = self._db.fetchall()[0][0] | |
515 self.assertEquals(expCount, obsCount) | |
516 | |
517 def test_hasUnfinishedJob_one_waiting_one_finished(self): | |
518 iJob1 = self._createJobInstance() | |
519 iJob2 = Job(0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
520 iJob3 = Job(0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
521 self._iTJA.recordJob(iJob1) | |
522 self._iTJA.recordJob(iJob2) | |
523 self._iTJA.recordJob(iJob3) | |
524 self._iTJA.changeJobStatus(iJob2, "finished") | |
525 expHasGrpIdFinished = True | |
526 obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid) | |
527 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
528 | |
529 def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self): | |
530 iJob1 = self._createJobInstance() | |
531 iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
532 iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
533 self._iTJA.recordJob(iJob1) | |
534 self._iTJA.recordJob(iJob2) | |
535 self._iTJA.recordJob(iJob3) | |
536 self._iTJA.changeJobStatus(iJob1, "finished") | |
537 self._iTJA.changeJobStatus(iJob2, "finished") | |
538 expHasGrpIdFinished = False | |
539 obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid) | |
540 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
541 | |
542 def test_waitJobGroup_with_finished_job(self): | |
543 obs = False | |
544 self._iTJA.recordJob(self._iJob) | |
545 self._iTJA.changeJobStatus(self._iJob, "finished") | |
546 try: | |
547 self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0) | |
548 except SystemExit: | |
549 obs = True | |
550 self.assertFalse(obs) | |
551 | |
552 def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): | |
553 obs = False | |
554 self._iTJA.recordJob(self._iJob) | |
555 self._iTJA.changeJobStatus(self._iJob, "error") | |
556 try: | |
557 self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0) | |
558 except SystemExit: | |
559 obs = True | |
560 self.assertTrue(obs) | |
561 | |
562 #TODO: how to test ?!? | |
563 # def test_waitJobGroup_with_error_relaunch(self): | |
564 # iJob = Job(0, "job1", "groupid", "queue.q", "command", "launcherFile", "node", ["mem_free=10M", "test=TRUE"]) | |
565 # obs = False | |
566 # self._iTJA.recordJob(iJob) | |
567 # self._iTJA.changeJobStatus(iJob, "error") | |
568 # try: | |
569 # self._iTJA.waitJobGroup(iJob.groupid) | |
570 # except SystemExit: | |
571 # obs = True | |
572 # self.assertTrue(obs) | |
573 | |
574 def test_updateJobIdInDB(self): | |
575 self._iTJA.recordJob(self._iJob) | |
576 self._iTJA.updateJobIdInDB(self._iJob, 1000) | |
577 qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" | |
578 params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid) | |
579 self._db.execute(qryParams, params) | |
580 tObs = self._db.fetchall()[0] | |
581 tExp =(1000,) | |
582 self.assertEquals(tExp,tObs) | |
583 | |
584 def test_getNodesListByGroupId(self): | |
585 iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources") | |
586 iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources") | |
587 iJob3 = Job(2, "job3", "groupid", "queue", "command", "launcherFile", "node2", "lResources") | |
588 iJob4 = Job(3, "job4", "groupid2", "queue", "command", "launcherFile", "node3", "lResources") | |
589 self._insertJob(iJob1) | |
590 self._insertJob(iJob2) | |
591 self._insertJob(iJob3) | |
592 self._insertJob(iJob4) | |
593 expNodeList = ["node1", "node2"] | |
594 obsNodeList = self._iTJA.getNodesListByGroupId("groupid") | |
595 self.assertEquals(expNodeList, obsNodeList) | |
596 | |
597 def test_getNodesListByGroupId_empty_list(self): | |
598 iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources") | |
599 iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources") | |
600 iJob3 = Job(2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources") | |
601 self._insertJob(iJob1) | |
602 self._insertJob(iJob2) | |
603 self._insertJob(iJob3) | |
604 expNodeList = [] | |
605 obsNodeList = self._iTJA.getNodesListByGroupId("groupid3") | |
606 self.assertEquals(expNodeList, obsNodeList) | |
607 | |
608 # TODO test TableJobAdaptator._createJobInstance TableJobAdaptator._createLauncherFile | |
609 def _insertJob(self, iJob): | |
610 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) | |
611 self._iTJA.removeJob(iJob) | |
612 sqlCmd = "INSERT INTO %s" % self._jobTableName | |
613 sqlCmd += " VALUES (" | |
614 sqlCmd += " \"%s\"," % iJob.jobid | |
615 sqlCmd += " \"%s\"," % iJob.jobname | |
616 sqlCmd += " \"%s\"," % iJob.groupid | |
617 sqlCmd += " \"%s\"," % iJob.launcher | |
618 sqlCmd += " \"%s\"," % iJob.queue | |
619 sqlCmd += " \"%s\"," % iJob.lResources | |
620 sqlCmd += " \"waiting\"," | |
621 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") | |
622 sqlCmd += " \"%s\" );" % iJob.node | |
623 self._db.execute(sqlCmd) | |
624 | |
625 def _createJobInstance(self): | |
626 return Job(0, "job1", "groupid", "", "command", "launcherFile", "node", ["mem_free=10M"]) | |
627 | |
628 #class RecordJobThread(threading.Thread): | |
629 # | |
630 # def __init__(self, iTableJobAdaptator, iJob): | |
631 # threading.Thread.__init__(self) | |
632 # self._iTableJobAdaptator = iTableJobAdaptator | |
633 # self._iJob = iJob | |
634 # | |
635 # def run(self): | |
636 # self._iTableJobAdaptator.recordJob(self._iJob) | |
637 # #self._iTableJobAdaptator.submitJob(self._iJob) | |
638 | |
639 if __name__ == "__main__": | |
640 unittest.main() |