6
|
1 import unittest
|
|
2 import sys
|
|
3 import os
|
|
4 import time
|
|
5 from commons.core.sql.DbMySql import DbMySql
|
|
6 from commons.core.sql.Job import Job
|
|
7 from commons.core.sql.RepetJob import RepetJob
|
|
8 from commons.core.utils.FileUtils import FileUtils
|
|
9
|
|
10 #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()...
|
|
11 class Test_RepetJob( unittest.TestCase ):
|
|
12
|
|
13 def setUp(self):
|
|
14 self._jobTableName = "dummyJobTable"
|
|
15 self._db = DbMySql()
|
|
16 self._iRepetJob = RepetJob()
|
|
17
|
|
18 def tearDown(self):
|
|
19 self._iRepetJob = None
|
|
20 self._db.close()
|
|
21
|
|
22 def _createJobInstance(self):
|
|
23 return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
|
|
24
|
|
25 def test_createJobTable_is_table_created(self):
|
|
26 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
27
|
|
28 isTableCreated = self._db.doesTableExist(self._jobTableName)
|
|
29 self.assertTrue(isTableCreated)
|
|
30
|
|
31 self._db.dropTable(self._jobTableName)
|
|
32
|
|
33 def test_createJobTable_field_list(self):
|
|
34 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
35
|
|
36 obsLFiled = self._db.getFieldList(self._jobTableName)
|
|
37 expLField = ["jobid", "jobname", "groupid", "command", "launcher", "queue", "status", "time", "node"]
|
|
38
|
|
39 self.assertEquals(expLField, obsLFiled)
|
|
40
|
|
41 self._db.dropTable(self._jobTableName)
|
|
42
|
|
43 def test_recordJob(self):
|
|
44 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
45 iJob = self._createJobInstance()
|
|
46 self._iRepetJob.recordJob(iJob)
|
|
47
|
|
48 qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = %s"
|
|
49 params = (iJob.jobid)
|
|
50
|
|
51 self._db.execute(qryParams, params)
|
|
52
|
|
53 tObs = self._db.fetchall()[0]
|
|
54 tExp =(iJob.jobid, iJob.groupid, iJob.command, iJob.launcher, iJob.queue, "waiting", "?")
|
|
55
|
|
56 self.assertEquals(tExp,tObs)
|
|
57
|
|
58 self._db.dropTable(self._jobTableName)
|
|
59
|
|
60 def test_removeJob(self):
|
|
61 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
62 iJob = self._createJobInstance()
|
|
63 self._iRepetJob.recordJob(iJob)
|
|
64
|
|
65 self._iRepetJob.removeJob(iJob)
|
|
66
|
|
67 isTableEmpty = self._db.isEmpty(self._jobTableName)
|
|
68
|
|
69 self.assertTrue(isTableEmpty)
|
|
70
|
|
71 self._db.dropTable(self._jobTableName)
|
|
72
|
|
73 def test_getJobStatus(self):
|
|
74 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
75 iJob = self._createJobInstance()
|
|
76 self._iRepetJob.recordJob(iJob)
|
|
77
|
|
78 expStatus = "waiting"
|
|
79 obsStatus = self._iRepetJob.getJobStatus(iJob)
|
|
80
|
|
81 self.assertEquals(expStatus, obsStatus)
|
|
82 self._db.dropTable(self._jobTableName)
|
|
83
|
|
84 def test_getJobStatus_unknown(self):
|
|
85 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
86 iJob = self._createJobInstance()
|
|
87
|
|
88 expStatus = "unknown"
|
|
89 obsStatus = self._iRepetJob.getJobStatus(iJob)
|
|
90
|
|
91 self.assertEquals(expStatus, obsStatus)
|
|
92 self._db.dropTable(self._jobTableName)
|
|
93
|
|
94 def test_getJobStatus_no_name(self):
|
|
95 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
96 iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
|
|
97
|
|
98 expStatus = "unknown"
|
|
99 obsStatus = self._iRepetJob.getJobStatus(iJob)
|
|
100
|
|
101 self.assertEquals(expStatus, obsStatus)
|
|
102 self._db.dropTable(self._jobTableName)
|
|
103
|
|
104 def test_getJobStatus_non_unique_job(self):
|
|
105 # Warning : this case will not append, because recordJob() begin by removeJob()
|
|
106 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
107 iJob = self._createJobInstance()
|
|
108 sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
|
|
109 sqlCmd += " VALUES ("
|
|
110 sqlCmd += " \"%s\"," % ( iJob.jobid )
|
|
111 sqlCmd += " \"%s\"," % ( iJob.jobname )
|
|
112 sqlCmd += " \"%s\"," % ( iJob.groupid )
|
|
113 sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
|
|
114 sqlCmd += " \"%s\"," % ( iJob.launcher )
|
|
115 sqlCmd += " \"%s\"," % ( iJob.queue )
|
|
116 sqlCmd += " \"waiting\","
|
|
117 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
|
|
118 sqlCmd += " \"?\" );"
|
|
119 self._db.execute( sqlCmd )
|
|
120 self._db.execute( sqlCmd )
|
|
121
|
|
122 expError = "expError.txt"
|
|
123 expErrorHandler = open(expError, "w")
|
|
124 expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
|
|
125 expErrorHandler.close()
|
|
126
|
|
127 obsError = "obsError.txt"
|
|
128 obsErrorHandler = open(obsError, "w")
|
|
129 stderrRef = sys.stderr
|
|
130 sys.stderr = obsErrorHandler
|
|
131
|
|
132 isSysExitRaised = False
|
|
133 try:
|
|
134 self._iRepetJob.getJobStatus(iJob)
|
|
135 except SystemExit:
|
|
136 isSysExitRaised = True
|
|
137
|
|
138 obsErrorHandler.close()
|
|
139
|
|
140 self.assertTrue(isSysExitRaised)
|
|
141 self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
|
|
142
|
|
143 sys.stderr = stderrRef
|
|
144 os.remove(obsError)
|
|
145 os.remove(expError)
|
|
146
|
|
147 self._db.dropTable(self._jobTableName)
|
|
148
|
|
149 def test_updateInfoTable(self):
|
|
150 self._iRepetJob.updateInfoTable(self._jobTableName, "dummyInfo")
|
|
151
|
|
152 qryParams = "SELECT name, file FROM info_tables WHERE name=%s AND file=%s"
|
|
153 params = (self._jobTableName, "dummyInfo")
|
|
154
|
|
155 self._db.execute(qryParams, params)
|
|
156 tObs = self._db.fetchall()[0]
|
|
157 tExp = (self._jobTableName, "dummyInfo")
|
|
158
|
|
159 self.assertEquals(tExp, tObs)
|
|
160
|
|
161 self._db.dropTable(self._jobTableName)
|
|
162
|
|
163 def test_changeJobStatus(self):
|
|
164 expStatus = "finished"
|
|
165
|
|
166 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
167 iJob = self._createJobInstance()
|
|
168 self._iRepetJob.recordJob(iJob)
|
|
169 self._iRepetJob.changeJobStatus(iJob, expStatus, "method")
|
|
170
|
|
171 qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s"
|
|
172 params = (iJob.jobid, iJob.groupid, iJob.queue)
|
|
173 self._db.execute(qryParams, params)
|
|
174
|
|
175 obsStatus = self._db.fetchall()[0][0]
|
|
176 self.assertEquals(expStatus, obsStatus)
|
|
177 self._iRepetJob.removeJob(iJob)
|
|
178 self._db.dropTable(self._jobTableName)
|
|
179
|
|
180 def test_getCountStatus(self):
|
|
181 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
182
|
|
183 iJob1 = self._createJobInstance()
|
|
184 iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
185
|
|
186 self._iRepetJob.recordJob(iJob1)
|
|
187 self._iRepetJob.recordJob(iJob2)
|
|
188
|
|
189 expCount = 2
|
|
190 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, iJob1.groupid, "waiting")
|
|
191
|
|
192 self.assertEquals(expCount, obsCount)
|
|
193
|
|
194 def test_getCountStatus_without_res(self):
|
|
195 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
196 expCount = 0
|
|
197
|
|
198 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, "groupid", "waiting")
|
|
199
|
|
200 self.assertEquals(expCount, obsCount)
|
|
201
|
|
202 def test_cleanJobGroup(self):
|
|
203 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
204 iJob1 = self._createJobInstance()
|
|
205 iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
206 iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
207
|
|
208 self._iRepetJob.recordJob(iJob1)
|
|
209 self._iRepetJob.recordJob(iJob2)
|
|
210 self._iRepetJob.recordJob(iJob3)
|
|
211
|
|
212 self._iRepetJob.cleanJobGroup(self._jobTableName, iJob1.groupid)
|
|
213
|
|
214 qryParams = "SELECT count(*) FROM " + self._jobTableName
|
|
215
|
|
216 self._db.execute(qryParams)
|
|
217
|
|
218 expCount = 1
|
|
219 obsCount = self._db.fetchall()[0][0]
|
|
220
|
|
221 self.assertEquals(expCount, obsCount)
|
|
222
|
|
223 self._iRepetJob.removeJob(iJob3)
|
|
224 self._db.dropTable(self._jobTableName)
|
|
225
|
|
226 def test_hasUnfinishedJob(self):
|
|
227 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
228 iJob1 = self._createJobInstance()
|
|
229 iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
230 iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
231
|
|
232 self._iRepetJob.recordJob(iJob1)
|
|
233 self._iRepetJob.recordJob(iJob2)
|
|
234 self._iRepetJob.recordJob(iJob3)
|
|
235
|
|
236 self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
|
|
237
|
|
238 expHasGrpIdFinished = True
|
|
239 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
|
|
240
|
|
241 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
242
|
|
243 self._iRepetJob.removeJob(iJob1)
|
|
244 self._iRepetJob.removeJob(iJob2)
|
|
245 self._iRepetJob.removeJob(iJob3)
|
|
246 self._db.dropTable(self._jobTableName)
|
|
247
|
|
248 def test_hasUnfinishedJob_JobTableNotExists(self):
|
|
249 iJob1 = self._createJobInstance()
|
|
250
|
|
251 expHasGrpIdFinished = False
|
|
252 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
|
|
253
|
|
254 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
255
|
|
256 def test_hasUnfinishedJob_AllJobsFinished(self):
|
|
257 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
258 iJob1 = self._createJobInstance()
|
|
259 iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
260 iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
|
|
261
|
|
262 self._iRepetJob.recordJob(iJob1)
|
|
263 self._iRepetJob.recordJob(iJob2)
|
|
264 self._iRepetJob.recordJob(iJob3)
|
|
265
|
|
266 self._iRepetJob.changeJobStatus(iJob1, "finished", "method")
|
|
267 self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
|
|
268
|
|
269 expHasGrpIdFinished = False
|
|
270 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
|
|
271
|
|
272 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
|
|
273
|
|
274 self._iRepetJob.removeJob(iJob1)
|
|
275 self._iRepetJob.removeJob(iJob2)
|
|
276 self._iRepetJob.removeJob(iJob3)
|
|
277 self._db.dropTable(self._jobTableName)
|
|
278
|
|
279 def test_waitJobGroup_with_finished_job(self):
|
|
280 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
281 iJob = self._createJobInstance()
|
|
282 self._iRepetJob.recordJob(iJob)
|
|
283 self._iRepetJob.changeJobStatus(iJob, "finished", "method")
|
|
284
|
|
285 Obs = self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0)
|
|
286 Exp = None
|
|
287
|
|
288 self.assertEquals(Exp, Obs)
|
|
289 self._iRepetJob.removeJob(iJob)
|
|
290
|
|
291 def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
|
|
292 Obs = False
|
|
293 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
294 iJob = self._createJobInstance()
|
|
295 self._iRepetJob.recordJob(iJob)
|
|
296 self._iRepetJob.changeJobStatus(iJob, "error", "method")
|
|
297
|
|
298 try:
|
|
299 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 0)
|
|
300 except SystemExit:
|
|
301 Obs = True
|
|
302
|
|
303 self.assertTrue(Obs)
|
|
304 self._iRepetJob.removeJob(iJob)
|
|
305
|
|
306 def test_setJobIdFromSge(self):
|
|
307 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
308 iJob = self._createJobInstance()
|
|
309 self._iRepetJob.recordJob(iJob)
|
|
310 self._iRepetJob.setJobIdFromSge(iJob, 1000)
|
|
311
|
|
312 qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s"
|
|
313 params = (iJob.jobname, iJob.queue, iJob.groupid)
|
|
314
|
|
315 self._db.execute(qryParams, params)
|
|
316
|
|
317 tObs = self._db.fetchall()[0]
|
|
318 tExp =(1000,)
|
|
319
|
|
320 self.assertEquals(tExp,tObs)
|
|
321
|
|
322 self._db.dropTable(self._jobTableName)
|
|
323
|
|
324 def test_submitJob_8_fields_for_job_table(self):
|
|
325 iJob = self._createJobInstance()
|
|
326 self._db.dropTable(self._jobTableName)
|
|
327 sqlCmd = "CREATE TABLE " + self._jobTableName
|
|
328 sqlCmd += " ( jobid INT UNSIGNED"
|
|
329 sqlCmd += ", groupid VARCHAR(255)"
|
|
330 sqlCmd += ", command TEXT"
|
|
331 sqlCmd += ", launcher VARCHAR(1024)"
|
|
332 sqlCmd += ", queue VARCHAR(255)"
|
|
333 sqlCmd += ", status VARCHAR(255)"
|
|
334 sqlCmd += ", time DATETIME"
|
|
335 sqlCmd += ", node VARCHAR(255) )"
|
|
336 self._db.execute(sqlCmd)
|
|
337
|
|
338 self._iRepetJob.submitJob(iJob)
|
|
339
|
|
340 expFieldsNb = 9
|
|
341 obsFieldsNb = len(self._iRepetJob.getFieldList(self._jobTableName))
|
|
342
|
|
343 self.assertEquals(expFieldsNb, obsFieldsNb)
|
|
344
|
|
345 self._db.dropTable(self._jobTableName)
|
|
346
|
|
347 def test_getNodesListByGroupId(self):
|
|
348 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
349 iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
|
|
350 iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
|
|
351 iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" )
|
|
352
|
|
353 self._insertJob(iJob1)
|
|
354 self._insertJob(iJob2)
|
|
355 self._insertJob(iJob3)
|
|
356
|
|
357 expNodeList = ["node1", "node2"]
|
|
358 obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid")
|
|
359 self.assertEquals(expNodeList, obsNodeList)
|
|
360
|
|
361 self._db.dropTable(self._jobTableName)
|
|
362
|
|
363 def test_getNodesListByGroupId_empty_list(self):
|
|
364 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
365 iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
|
|
366 iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
|
|
367 iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" )
|
|
368
|
|
369 self._insertJob(iJob1)
|
|
370 self._insertJob(iJob2)
|
|
371 self._insertJob(iJob3)
|
|
372
|
|
373 expNodeList = []
|
|
374 obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid3")
|
|
375 self.assertEquals(expNodeList, obsNodeList)
|
|
376
|
|
377 self._db.dropTable(self._jobTableName)
|
|
378
|
|
379 def _insertJob(self, iJob):
|
|
380 self._iRepetJob.removeJob( iJob )
|
|
381 sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
|
|
382 sqlCmd += " VALUES ("
|
|
383 sqlCmd += " \"%s\"," % ( iJob.jobid )
|
|
384 sqlCmd += " \"%s\"," % ( iJob.jobname )
|
|
385 sqlCmd += " \"%s\"," % ( iJob.groupid )
|
|
386 sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
|
|
387 sqlCmd += " \"%s\"," % ( iJob.launcher )
|
|
388 sqlCmd += " \"%s\"," % ( iJob.queue )
|
|
389 sqlCmd += " \"waiting\","
|
|
390 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
|
|
391 sqlCmd += " \"%s\" );" % ( iJob.node )
|
|
392 self._iRepetJob.execute( sqlCmd )
|
|
393
|
|
394 if __name__ == "__main__":
|
|
395 unittest.main()
|