6
|
1 from commons.core.utils.FileUtils import FileUtils
|
|
2 from commons.core.launcher.Launcher import Launcher
|
|
3 from commons.core.launcher.WriteScript import WriteScript
|
|
4 from commons.core.stat.Stat import Stat
|
|
5 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
|
|
6 from commons.core.sql.DbFactory import DbFactory
|
|
7 from commons.core.sql.Job import Job
|
|
8 import unittest
|
|
9 import os
|
|
10 import shutil
|
|
11 import time
|
|
12 import stat
|
|
13
|
|
14 #TODO: Test_F_Launcher.py : to execute prepareCommands() and runSingleJob()
|
|
15 # to test runLauncherForMultipleJobs()
|
|
16 #TODO: check clean of "Test_runSingleJob"
|
|
17 #TODO: refactoring => choose between "self._queue" or "lResources" to set resources
|
|
18 class Test_Launcher(unittest.TestCase):
|
|
19
|
|
20 SARUMAN_NAME = "compute-2-46.local"
|
|
21
|
|
22 def setUp(self):
|
|
23 self._cDir = os.getcwd()
|
|
24 self._tmpDir = self._cDir
|
|
25 self._groupid = "test"
|
|
26 self._jobTable = "dummyJobTable"
|
|
27 self._iDb = DbFactory.createInstance()
|
|
28 self._iDb.createTable(self._jobTable, "jobs", overwrite = True)
|
|
29 self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable)
|
|
30 self._queue = ""
|
|
31 self._configFileName = "dummyConfigFile"
|
|
32
|
|
33 def tearDown(self):
|
|
34 self._iDb.dropTable(self._jobTable)
|
|
35 self._iDb.close()
|
|
36 FileUtils.removeFilesByPattern('*.e*')
|
|
37 FileUtils.removeFilesByPattern('*.o*')
|
|
38 FileUtils.removeFilesByPattern('launcherFileTest_BeginRun.py')
|
|
39 FileUtils.removeFilesByPattern(self._configFileName)
|
|
40 FileUtils.removeFilesByPattern('ClusterLauncher_*')
|
|
41
|
|
42 def test__init__wrong_fields_for_job_table(self):
|
|
43 self._iDb.dropTable(self._jobTable)
|
|
44 sqlCmd = "CREATE TABLE " + self._jobTable
|
|
45 sqlCmd += " ( jobid INT UNSIGNED"
|
|
46 sqlCmd += ", jobname VARCHAR(255)"
|
|
47 sqlCmd += ", groupid VARCHAR(255)"
|
|
48 sqlCmd += ", command TEXT"
|
|
49 sqlCmd += ", launcher VARCHAR(1024)"
|
|
50 sqlCmd += ", queue VARCHAR(255)"
|
|
51 sqlCmd += ", status VARCHAR(255)"
|
|
52 sqlCmd += ", time DATETIME"
|
|
53 sqlCmd += ", node VARCHAR(255) )"
|
|
54 self._iDb.execute(sqlCmd)
|
|
55 acronym = "Test__init__"
|
|
56 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
57 lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"])
|
|
58 lObsFields = sorted(self._iDb.getFieldList(self._jobTable))
|
|
59 self.assertEquals(lExpFields, lObsFields)
|
|
60 expJob = Job(queue = self._queue)
|
|
61 obsJob = iLauncher.job
|
|
62 self.assertEquals(expJob, obsJob)
|
|
63
|
|
64 def test__init__withResources(self):
|
|
65 queue = "main.q mem_free=3G"
|
|
66 acronym = "Test__init__"
|
|
67 expQueue = "main.q"
|
|
68 explResources = ['mem_free=3G']
|
|
69 expJob = Job(queue = expQueue, lResources = explResources)
|
|
70 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", queue, self._groupid, acronym)
|
|
71 obsJob = iLauncher.job
|
|
72 self.assertEquals(expJob, obsJob)
|
|
73
|
|
74 def test_createGroupidIfItNotExist(self):
|
|
75 acronym = "checkGroupID"
|
|
76 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
77 iLauncher.createGroupidIfItNotExist()
|
|
78 obsGroupid = iLauncher.job.groupid
|
|
79 self.assertEquals(self._groupid, obsGroupid)
|
|
80
|
|
81 def test_createGroupidIfItNotExist_without_groupid(self):
|
|
82 groupid = ""
|
|
83 acronym = "checkGroupID"
|
|
84 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, groupid, acronym)
|
|
85 iLauncher.createGroupidIfItNotExist()
|
|
86 obsGroupid = iLauncher.job.groupid
|
|
87 self.assertTrue(obsGroupid != "")
|
|
88
|
|
89 def test_beginRun_with_Job_finished_in_Table(self):
|
|
90 acronym = "BeginRun"
|
|
91 iJob = Job(queue = self._queue)
|
|
92 self._jobdb.recordJob(iJob)
|
|
93 self._jobdb.changeJobStatus(iJob, "finished")
|
|
94 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
95 iLauncher.beginRun()
|
|
96 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
|
|
97
|
|
98 def test_beginRun_with_Job_unfinished_in_Table(self):
|
|
99 acronym = "testU_BeginRun"
|
|
100 cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
|
|
101 pyFileName = "%s/launcherFileTest_BeginRun.py" % os.getcwd()
|
|
102 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
|
|
103 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
|
|
104 else:
|
|
105 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
|
|
106 iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
|
|
107 iWriteScript.run(cmd_start, "", pyFileName)
|
|
108 os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
109 self._jobdb.submitJob(iJob)
|
|
110 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
111
|
|
112 iLauncher.beginRun()
|
|
113
|
|
114 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 1)
|
|
115
|
|
116 def test_getStatsOfExecutionTime(self):
|
|
117 acronym = "test_statTime"
|
|
118
|
|
119 expLValues = [1000.00000, 1000.00000]
|
|
120 expStat = Stat(expLValues)
|
|
121
|
|
122 f = open(acronym +".o1", "w")
|
|
123 f.write("executionTime=1000.000000")
|
|
124 f.close()
|
|
125 f = open(acronym +".o2", "w")
|
|
126 f.write("executionTime=1000.000000")
|
|
127 f.close()
|
|
128
|
|
129 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
130 obsStat = iLauncher.getStatsOfExecutionTime(acronym)
|
|
131
|
|
132 self.assertEqual(expStat, obsStat)
|
|
133
|
|
134 def test_endRun(self):
|
|
135 acronym = "testU_EndRun"
|
|
136 cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
|
|
137 pyFileName = "%s/launcherFileTest_EndRun.py" % os.getcwd()
|
|
138 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
|
|
139 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
|
|
140 else:
|
|
141 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
|
|
142
|
|
143 iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
|
|
144 iWriteScript.run(cmd_start, "", pyFileName)
|
|
145 os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
146 self._jobdb.submitJob(iJob)
|
|
147 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
148 iLauncher.job.groupid = self._groupid
|
|
149 iLauncher.endRun()
|
|
150
|
|
151 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
|
|
152 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "error") == 0)
|
|
153 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "waiting") == 0)
|
|
154
|
|
155 os.remove(iJob.launcher)
|
|
156
|
|
157 def test_clean(self):
|
|
158 acronym = "test_clean"
|
|
159 f = open("ClusterLauncher" + acronym + ".py", "w")
|
|
160 f.close()
|
|
161 f = open(acronym + ".o1", "w")
|
|
162 f.close()
|
|
163 f = open(acronym + ".e1", "w")
|
|
164 f.close()
|
|
165 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
166 iLauncher.clean(acronym)
|
|
167 self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym + ".py"))
|
|
168
|
|
169 def test_clean_without_acronym(self):
|
|
170 acronym = ""
|
|
171 acronym2 = "toto"
|
|
172 f = open("ClusterLauncher" + acronym2 + ".py", "w")
|
|
173 f.close()
|
|
174 f = open(acronym2 + ".o1", "w")
|
|
175 f.close()
|
|
176 f = open(acronym2 + ".e1", "w")
|
|
177 f.close()
|
|
178 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym2)
|
|
179 iLauncher.clean(acronym)
|
|
180 self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym2 + ".py"))
|
|
181
|
|
182 def test_getQueueNameAndResources_queue_no_resource(self):
|
|
183 configQueue = "all.q"
|
|
184 expQueueName = "all.q"
|
|
185 expResources = []
|
|
186 iLauncher = Launcher(self._jobdb)
|
|
187 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
|
|
188 self.assertEquals(expQueueName, obsQueueName)
|
|
189 self.assertEquals(expResources, obsResources)
|
|
190
|
|
191 def test_getQueueNameAndResources_queue_one_resource(self):
|
|
192 configQueue = "test.q 'test=TRUE'"
|
|
193 expQueueName = "test.q"
|
|
194 expResources = ["test=TRUE"]
|
|
195 iLauncher = Launcher(self._jobdb)
|
|
196 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
|
|
197 self.assertEquals(expQueueName, obsQueueName)
|
|
198 self.assertEquals(expResources, obsResources)
|
|
199
|
|
200 def test_getQueueNameAndResources_queue_two_resources(self):
|
|
201 configQueue = "big.q 's_data=8G s_cpu=96:00:00'"
|
|
202 expQueueName = "big.q"
|
|
203 expResources = ["s_data=8G", "s_cpu=96:00:00"]
|
|
204 iLauncher = Launcher(self._jobdb)
|
|
205 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
|
|
206 self.assertEquals(expQueueName, obsQueueName)
|
|
207 self.assertEquals(expResources, obsResources)
|
|
208
|
|
209 def test_getQueueNameAndResources_no_queue_no_resource(self):
|
|
210 configQueue = ""
|
|
211 expQueueName = ""
|
|
212 expResources = []
|
|
213 iLauncher = Launcher(self._jobdb)
|
|
214 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
|
|
215 self.assertEquals(expQueueName, obsQueueName)
|
|
216 self.assertEquals(expResources, obsResources)
|
|
217
|
|
218 def test_getQueueNameAndResources_no_queue_one_resource(self):
|
|
219 configQueue = "s_data=8G"
|
|
220 expQueueName = ""
|
|
221 expResources = ["s_data=8G"]
|
|
222 iLauncher = Launcher(self._jobdb)
|
|
223 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
|
|
224 self.assertEquals(expQueueName, obsQueueName)
|
|
225 self.assertEquals(expResources, obsResources)
|
|
226
|
|
227 def test_getQueueNameAndResources_no_queue_two_resource(self):
|
|
228 configQueue = "s_data=8G s_cpu=96:00:00"
|
|
229 expQueueName = ""
|
|
230 expResources = ["s_data=8G", "s_cpu=96:00:00"]
|
|
231 iLauncher = Launcher(self._jobdb)
|
|
232 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
|
|
233 self.assertEquals(expQueueName, obsQueueName)
|
|
234 self.assertEquals(expResources, obsResources)
|
|
235
|
|
236 # #TODO: test with at least 2 lines in cmd
|
|
237 def test_runSingleJob(self):
|
|
238 acronym = "Test_runSingleJob"
|
|
239 os.mkdir(acronym)
|
|
240 os.chdir(acronym)
|
|
241 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
242 iLauncher.job.groupid = self._groupid
|
|
243 iLauncher.job.jobname = acronym
|
|
244 iLauncher.job.queue = self._queue
|
|
245 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
|
|
246 iLauncher.job.lResources = ["test=TRUE"]
|
|
247 cmd = "log = os.system(\"touch 'YuFei'\")\n"
|
|
248 iLauncher.runSingleJob(cmd)
|
|
249 time.sleep(20)
|
|
250 jobStatus = self._jobdb.getJobStatus(iLauncher.job)
|
|
251 os.chdir(self._cDir)
|
|
252 shutil.rmtree(acronym)
|
|
253 self.assertEqual(jobStatus, "finished")
|
|
254
|
|
255 def test_runSingleJob_catch_error_wrong_tmpDir(self):
|
|
256 acronym = "Test_runSingleJob_catch_error"
|
|
257 os.mkdir(acronym)
|
|
258 os.chdir(acronym)
|
|
259 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), "%s/toto" % self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
260 iLauncher.job.groupid = self._groupid
|
|
261 iLauncher.job.jobname = acronym
|
|
262 iLauncher.job.queue = self._queue
|
|
263 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
|
|
264 iLauncher.job.lResources = ["test=TRUE"]
|
|
265 cmd = "log = os.system(\"touch 'YuFei'\")\n"
|
|
266 iLauncher.runSingleJob(cmd)
|
|
267 time.sleep(20)
|
|
268 jobStatus = self._jobdb.getJobStatus(iLauncher.job)
|
|
269 os.chdir(self._cDir)
|
|
270 shutil.rmtree(acronym)
|
|
271 self.assertEqual(jobStatus, "error")
|
|
272
|
|
273 def test_runSingleJob_catch_error_wrong_cmd(self):
|
|
274 acronym = "Test_runSingleJob_catch_error"
|
|
275 os.mkdir(acronym)
|
|
276 os.chdir(acronym)
|
|
277 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
|
|
278 iLauncher.job.groupid = self._groupid
|
|
279 iLauncher.job.jobname = acronym
|
|
280 iLauncher.job.queue = self._queue
|
|
281 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
|
|
282 iLauncher.job.lResources = ["test=TRUE"]
|
|
283 cmd = "log = os.system(\"truc -i toto\")\n"
|
|
284 iLauncher.runSingleJob(cmd)
|
|
285 time.sleep(20)
|
|
286 jobStatus = self._jobdb.getJobStatus(iLauncher.job)
|
|
287 self._jobdb.cleanJobGroup(self._groupid)
|
|
288 os.chdir(self._cDir)
|
|
289 shutil.rmtree(acronym)
|
|
290 self.assertEqual(jobStatus, "error")
|
|
291
|
|
292 def test_prepareCommands(self):
|
|
293 expCmdStart = "os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")\n\tos.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")\n\tlog = os.system(\"touch file\")\n\t"
|
|
294 expCmdFinish = "if os.path.exists(\"yufei.align\"):\n\t\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )\n\t"
|
|
295 expCmdSize = "fileSize = 3.2\n\t\t"
|
|
296 expCmdCopy = "shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")\n\t\tshutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")\n\t\t"
|
|
297
|
|
298 lCmdStart = []
|
|
299 lCmdStart.append("os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")")
|
|
300 lCmdStart.append("os.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")")
|
|
301 lCmds = []
|
|
302 lCmds.append("log = os.system(\"touch file\")")
|
|
303 lCmdFinish = []
|
|
304 lCmdFinish.append("if os.path.exists(\"yufei.align\"):")
|
|
305 lCmdFinish.append("\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )")
|
|
306 lCmdSize = []
|
|
307 lCmdSize.append("fileSize = 3.2")
|
|
308 lCmdCopy = []
|
|
309 lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")")
|
|
310 lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")")
|
|
311
|
|
312 iLauncher = Launcher(self._jobdb)
|
|
313 obsCmdStart, obsCmdFinish, obsCmdSize, obsCmdCopy = iLauncher.prepareCommands(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy)
|
|
314
|
|
315 self.assertEquals(expCmdStart, obsCmdStart)
|
|
316 self.assertEquals(expCmdFinish, obsCmdFinish)
|
|
317 self.assertEquals(expCmdSize, obsCmdSize)
|
|
318 self.assertEquals(expCmdCopy, obsCmdCopy)
|
|
319
|
|
320 def test_getSystemCommand(self):
|
|
321 prg = "touch"
|
|
322 lArgs = []
|
|
323 lArgs.append("file")
|
|
324 expCmd = "log = os.system(\"touch file\")"
|
|
325 iLauncher = Launcher(self._jobdb)
|
|
326 obsCmd = iLauncher.getSystemCommand(prg, lArgs)
|
|
327 self.assertEquals(expCmd, obsCmd)
|
|
328
|
|
329
|
|
330 test_suite = unittest.TestSuite()
|
|
331 test_suite.addTest( unittest.makeSuite( Test_Launcher ) )
|
|
332 if __name__ == "__main__":
|
|
333 unittest.TextTestRunner(verbosity=2).run( test_suite )
|