Mercurial > repos > yufei-luo > s_mart
comparison commons/core/sql/test/Tst_RepetJob.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:ea3082881bf8 | 6:769e306b7933 |
---|---|
1 import unittest | |
2 import sys | |
3 import os | |
4 import time | |
5 from commons.core.sql.DbMySql import DbMySql | |
6 from commons.core.sql.Job import Job | |
7 from commons.core.sql.RepetJob import RepetJob | |
8 from commons.core.utils.FileUtils import FileUtils | |
9 | |
10 #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()... | |
11 class Test_RepetJob( unittest.TestCase ): | |
12 | |
13 def setUp(self): | |
14 self._jobTableName = "dummyJobTable" | |
15 self._db = DbMySql() | |
16 self._iRepetJob = RepetJob() | |
17 | |
18 def tearDown(self): | |
19 self._iRepetJob = None | |
20 self._db.close() | |
21 | |
22 def _createJobInstance(self): | |
23 return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) | |
24 | |
25 def test_createJobTable_is_table_created(self): | |
26 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
27 | |
28 isTableCreated = self._db.doesTableExist(self._jobTableName) | |
29 self.assertTrue(isTableCreated) | |
30 | |
31 self._db.dropTable(self._jobTableName) | |
32 | |
33 def test_createJobTable_field_list(self): | |
34 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
35 | |
36 obsLFiled = self._db.getFieldList(self._jobTableName) | |
37 expLField = ["jobid", "jobname", "groupid", "command", "launcher", "queue", "status", "time", "node"] | |
38 | |
39 self.assertEquals(expLField, obsLFiled) | |
40 | |
41 self._db.dropTable(self._jobTableName) | |
42 | |
43 def test_recordJob(self): | |
44 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
45 iJob = self._createJobInstance() | |
46 self._iRepetJob.recordJob(iJob) | |
47 | |
48 qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = %s" | |
49 params = (iJob.jobid) | |
50 | |
51 self._db.execute(qryParams, params) | |
52 | |
53 tObs = self._db.fetchall()[0] | |
54 tExp =(iJob.jobid, iJob.groupid, iJob.command, iJob.launcher, iJob.queue, "waiting", "?") | |
55 | |
56 self.assertEquals(tExp,tObs) | |
57 | |
58 self._db.dropTable(self._jobTableName) | |
59 | |
60 def test_removeJob(self): | |
61 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
62 iJob = self._createJobInstance() | |
63 self._iRepetJob.recordJob(iJob) | |
64 | |
65 self._iRepetJob.removeJob(iJob) | |
66 | |
67 isTableEmpty = self._db.isEmpty(self._jobTableName) | |
68 | |
69 self.assertTrue(isTableEmpty) | |
70 | |
71 self._db.dropTable(self._jobTableName) | |
72 | |
73 def test_getJobStatus(self): | |
74 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
75 iJob = self._createJobInstance() | |
76 self._iRepetJob.recordJob(iJob) | |
77 | |
78 expStatus = "waiting" | |
79 obsStatus = self._iRepetJob.getJobStatus(iJob) | |
80 | |
81 self.assertEquals(expStatus, obsStatus) | |
82 self._db.dropTable(self._jobTableName) | |
83 | |
84 def test_getJobStatus_unknown(self): | |
85 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
86 iJob = self._createJobInstance() | |
87 | |
88 expStatus = "unknown" | |
89 obsStatus = self._iRepetJob.getJobStatus(iJob) | |
90 | |
91 self.assertEquals(expStatus, obsStatus) | |
92 self._db.dropTable(self._jobTableName) | |
93 | |
94 def test_getJobStatus_no_name(self): | |
95 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
96 iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) | |
97 | |
98 expStatus = "unknown" | |
99 obsStatus = self._iRepetJob.getJobStatus(iJob) | |
100 | |
101 self.assertEquals(expStatus, obsStatus) | |
102 self._db.dropTable(self._jobTableName) | |
103 | |
104 def test_getJobStatus_non_unique_job(self): | |
105 # Warning : this case will not append, because recordJob() begin by removeJob() | |
106 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
107 iJob = self._createJobInstance() | |
108 sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) | |
109 sqlCmd += " VALUES (" | |
110 sqlCmd += " \"%s\"," % ( iJob.jobid ) | |
111 sqlCmd += " \"%s\"," % ( iJob.jobname ) | |
112 sqlCmd += " \"%s\"," % ( iJob.groupid ) | |
113 sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) | |
114 sqlCmd += " \"%s\"," % ( iJob.launcher ) | |
115 sqlCmd += " \"%s\"," % ( iJob.queue ) | |
116 sqlCmd += " \"waiting\"," | |
117 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) | |
118 sqlCmd += " \"?\" );" | |
119 self._db.execute( sqlCmd ) | |
120 self._db.execute( sqlCmd ) | |
121 | |
122 expError = "expError.txt" | |
123 expErrorHandler = open(expError, "w") | |
124 expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") | |
125 expErrorHandler.close() | |
126 | |
127 obsError = "obsError.txt" | |
128 obsErrorHandler = open(obsError, "w") | |
129 stderrRef = sys.stderr | |
130 sys.stderr = obsErrorHandler | |
131 | |
132 isSysExitRaised = False | |
133 try: | |
134 self._iRepetJob.getJobStatus(iJob) | |
135 except SystemExit: | |
136 isSysExitRaised = True | |
137 | |
138 obsErrorHandler.close() | |
139 | |
140 self.assertTrue(isSysExitRaised) | |
141 self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) | |
142 | |
143 sys.stderr = stderrRef | |
144 os.remove(obsError) | |
145 os.remove(expError) | |
146 | |
147 self._db.dropTable(self._jobTableName) | |
148 | |
149 def test_updateInfoTable(self): | |
150 self._iRepetJob.updateInfoTable(self._jobTableName, "dummyInfo") | |
151 | |
152 qryParams = "SELECT name, file FROM info_tables WHERE name=%s AND file=%s" | |
153 params = (self._jobTableName, "dummyInfo") | |
154 | |
155 self._db.execute(qryParams, params) | |
156 tObs = self._db.fetchall()[0] | |
157 tExp = (self._jobTableName, "dummyInfo") | |
158 | |
159 self.assertEquals(tExp, tObs) | |
160 | |
161 self._db.dropTable(self._jobTableName) | |
162 | |
163 def test_changeJobStatus(self): | |
164 expStatus = "finished" | |
165 | |
166 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
167 iJob = self._createJobInstance() | |
168 self._iRepetJob.recordJob(iJob) | |
169 self._iRepetJob.changeJobStatus(iJob, expStatus, "method") | |
170 | |
171 qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" | |
172 params = (iJob.jobid, iJob.groupid, iJob.queue) | |
173 self._db.execute(qryParams, params) | |
174 | |
175 obsStatus = self._db.fetchall()[0][0] | |
176 self.assertEquals(expStatus, obsStatus) | |
177 self._iRepetJob.removeJob(iJob) | |
178 self._db.dropTable(self._jobTableName) | |
179 | |
180 def test_getCountStatus(self): | |
181 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
182 | |
183 iJob1 = self._createJobInstance() | |
184 iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
185 | |
186 self._iRepetJob.recordJob(iJob1) | |
187 self._iRepetJob.recordJob(iJob2) | |
188 | |
189 expCount = 2 | |
190 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, iJob1.groupid, "waiting") | |
191 | |
192 self.assertEquals(expCount, obsCount) | |
193 | |
194 def test_getCountStatus_without_res(self): | |
195 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
196 expCount = 0 | |
197 | |
198 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, "groupid", "waiting") | |
199 | |
200 self.assertEquals(expCount, obsCount) | |
201 | |
202 def test_cleanJobGroup(self): | |
203 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
204 iJob1 = self._createJobInstance() | |
205 iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
206 iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
207 | |
208 self._iRepetJob.recordJob(iJob1) | |
209 self._iRepetJob.recordJob(iJob2) | |
210 self._iRepetJob.recordJob(iJob3) | |
211 | |
212 self._iRepetJob.cleanJobGroup(self._jobTableName, iJob1.groupid) | |
213 | |
214 qryParams = "SELECT count(*) FROM " + self._jobTableName | |
215 | |
216 self._db.execute(qryParams) | |
217 | |
218 expCount = 1 | |
219 obsCount = self._db.fetchall()[0][0] | |
220 | |
221 self.assertEquals(expCount, obsCount) | |
222 | |
223 self._iRepetJob.removeJob(iJob3) | |
224 self._db.dropTable(self._jobTableName) | |
225 | |
226 def test_hasUnfinishedJob(self): | |
227 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
228 iJob1 = self._createJobInstance() | |
229 iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
230 iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
231 | |
232 self._iRepetJob.recordJob(iJob1) | |
233 self._iRepetJob.recordJob(iJob2) | |
234 self._iRepetJob.recordJob(iJob3) | |
235 | |
236 self._iRepetJob.changeJobStatus(iJob2, "finished", "method") | |
237 | |
238 expHasGrpIdFinished = True | |
239 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) | |
240 | |
241 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
242 | |
243 self._iRepetJob.removeJob(iJob1) | |
244 self._iRepetJob.removeJob(iJob2) | |
245 self._iRepetJob.removeJob(iJob3) | |
246 self._db.dropTable(self._jobTableName) | |
247 | |
248 def test_hasUnfinishedJob_JobTableNotExists(self): | |
249 iJob1 = self._createJobInstance() | |
250 | |
251 expHasGrpIdFinished = False | |
252 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) | |
253 | |
254 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
255 | |
256 def test_hasUnfinishedJob_AllJobsFinished(self): | |
257 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
258 iJob1 = self._createJobInstance() | |
259 iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
260 iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") | |
261 | |
262 self._iRepetJob.recordJob(iJob1) | |
263 self._iRepetJob.recordJob(iJob2) | |
264 self._iRepetJob.recordJob(iJob3) | |
265 | |
266 self._iRepetJob.changeJobStatus(iJob1, "finished", "method") | |
267 self._iRepetJob.changeJobStatus(iJob2, "finished", "method") | |
268 | |
269 expHasGrpIdFinished = False | |
270 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) | |
271 | |
272 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) | |
273 | |
274 self._iRepetJob.removeJob(iJob1) | |
275 self._iRepetJob.removeJob(iJob2) | |
276 self._iRepetJob.removeJob(iJob3) | |
277 self._db.dropTable(self._jobTableName) | |
278 | |
279 def test_waitJobGroup_with_finished_job(self): | |
280 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
281 iJob = self._createJobInstance() | |
282 self._iRepetJob.recordJob(iJob) | |
283 self._iRepetJob.changeJobStatus(iJob, "finished", "method") | |
284 | |
285 Obs = self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0) | |
286 Exp = None | |
287 | |
288 self.assertEquals(Exp, Obs) | |
289 self._iRepetJob.removeJob(iJob) | |
290 | |
291 def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): | |
292 Obs = False | |
293 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
294 iJob = self._createJobInstance() | |
295 self._iRepetJob.recordJob(iJob) | |
296 self._iRepetJob.changeJobStatus(iJob, "error", "method") | |
297 | |
298 try: | |
299 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 0) | |
300 except SystemExit: | |
301 Obs = True | |
302 | |
303 self.assertTrue(Obs) | |
304 self._iRepetJob.removeJob(iJob) | |
305 | |
306 def test_setJobIdFromSge(self): | |
307 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
308 iJob = self._createJobInstance() | |
309 self._iRepetJob.recordJob(iJob) | |
310 self._iRepetJob.setJobIdFromSge(iJob, 1000) | |
311 | |
312 qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" | |
313 params = (iJob.jobname, iJob.queue, iJob.groupid) | |
314 | |
315 self._db.execute(qryParams, params) | |
316 | |
317 tObs = self._db.fetchall()[0] | |
318 tExp =(1000,) | |
319 | |
320 self.assertEquals(tExp,tObs) | |
321 | |
322 self._db.dropTable(self._jobTableName) | |
323 | |
324 def test_submitJob_8_fields_for_job_table(self): | |
325 iJob = self._createJobInstance() | |
326 self._db.dropTable(self._jobTableName) | |
327 sqlCmd = "CREATE TABLE " + self._jobTableName | |
328 sqlCmd += " ( jobid INT UNSIGNED" | |
329 sqlCmd += ", groupid VARCHAR(255)" | |
330 sqlCmd += ", command TEXT" | |
331 sqlCmd += ", launcher VARCHAR(1024)" | |
332 sqlCmd += ", queue VARCHAR(255)" | |
333 sqlCmd += ", status VARCHAR(255)" | |
334 sqlCmd += ", time DATETIME" | |
335 sqlCmd += ", node VARCHAR(255) )" | |
336 self._db.execute(sqlCmd) | |
337 | |
338 self._iRepetJob.submitJob(iJob) | |
339 | |
340 expFieldsNb = 9 | |
341 obsFieldsNb = len(self._iRepetJob.getFieldList(self._jobTableName)) | |
342 | |
343 self.assertEquals(expFieldsNb, obsFieldsNb) | |
344 | |
345 self._db.dropTable(self._jobTableName) | |
346 | |
347 def test_getNodesListByGroupId(self): | |
348 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
349 iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) | |
350 iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) | |
351 iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" ) | |
352 | |
353 self._insertJob(iJob1) | |
354 self._insertJob(iJob2) | |
355 self._insertJob(iJob3) | |
356 | |
357 expNodeList = ["node1", "node2"] | |
358 obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid") | |
359 self.assertEquals(expNodeList, obsNodeList) | |
360 | |
361 self._db.dropTable(self._jobTableName) | |
362 | |
363 def test_getNodesListByGroupId_empty_list(self): | |
364 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
365 iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) | |
366 iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) | |
367 iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" ) | |
368 | |
369 self._insertJob(iJob1) | |
370 self._insertJob(iJob2) | |
371 self._insertJob(iJob3) | |
372 | |
373 expNodeList = [] | |
374 obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid3") | |
375 self.assertEquals(expNodeList, obsNodeList) | |
376 | |
377 self._db.dropTable(self._jobTableName) | |
378 | |
379 def _insertJob(self, iJob): | |
380 self._iRepetJob.removeJob( iJob ) | |
381 sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) | |
382 sqlCmd += " VALUES (" | |
383 sqlCmd += " \"%s\"," % ( iJob.jobid ) | |
384 sqlCmd += " \"%s\"," % ( iJob.jobname ) | |
385 sqlCmd += " \"%s\"," % ( iJob.groupid ) | |
386 sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) | |
387 sqlCmd += " \"%s\"," % ( iJob.launcher ) | |
388 sqlCmd += " \"%s\"," % ( iJob.queue ) | |
389 sqlCmd += " \"waiting\"," | |
390 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) | |
391 sqlCmd += " \"%s\" );" % ( iJob.node ) | |
392 self._iRepetJob.execute( sqlCmd ) | |
393 | |
394 if __name__ == "__main__": | |
395 unittest.main() |