comparison commons/core/launcher/test/Test_Launcher.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children 94ab73e8a190
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 from commons.core.utils.FileUtils import FileUtils
2 from commons.core.launcher.Launcher import Launcher
3 from commons.core.launcher.WriteScript import WriteScript
4 from commons.core.stat.Stat import Stat
5 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
6 from commons.core.sql.DbFactory import DbFactory
7 from commons.core.sql.Job import Job
8 import unittest
9 import os
10 import shutil
11 import time
12 import stat
13
14 #TODO: Test_F_Launcher.py : to execute prepareCommands() and runSingleJob()
15 # to test runLauncherForMultipleJobs()
16 #TODO: check clean of "Test_runSingleJob"
17 #TODO: refactoring => choose between "self._queue" or "lResources" to set resources
18 class Test_Launcher(unittest.TestCase):
19
20 SARUMAN_NAME = "compute-2-46.local"
21
22 def setUp(self):
23 self._cDir = os.getcwd()
24 self._tmpDir = self._cDir
25 self._groupid = "test"
26 self._jobTable = "dummyJobTable"
27 self._iDb = DbFactory.createInstance()
28 self._iDb.createTable(self._jobTable, "jobs", overwrite = True)
29 self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable)
30 self._queue = ""
31 self._configFileName = "dummyConfigFile"
32
33 def tearDown(self):
34 self._iDb.dropTable(self._jobTable)
35 self._iDb.close()
36 FileUtils.removeFilesByPattern('*.e*')
37 FileUtils.removeFilesByPattern('*.o*')
38 FileUtils.removeFilesByPattern('launcherFileTest_BeginRun.py')
39 FileUtils.removeFilesByPattern(self._configFileName)
40 FileUtils.removeFilesByPattern('ClusterLauncher_*')
41
42 def test__init__wrong_fields_for_job_table(self):
43 self._iDb.dropTable(self._jobTable)
44 sqlCmd = "CREATE TABLE " + self._jobTable
45 sqlCmd += " ( jobid INT UNSIGNED"
46 sqlCmd += ", jobname VARCHAR(255)"
47 sqlCmd += ", groupid VARCHAR(255)"
48 sqlCmd += ", command TEXT"
49 sqlCmd += ", launcher VARCHAR(1024)"
50 sqlCmd += ", queue VARCHAR(255)"
51 sqlCmd += ", status VARCHAR(255)"
52 sqlCmd += ", time DATETIME"
53 sqlCmd += ", node VARCHAR(255) )"
54 self._iDb.execute(sqlCmd)
55 acronym = "Test__init__"
56 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
57 lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"])
58 lObsFields = sorted(self._iDb.getFieldList(self._jobTable))
59 self.assertEquals(lExpFields, lObsFields)
60 expJob = Job(queue = self._queue)
61 obsJob = iLauncher.job
62 self.assertEquals(expJob, obsJob)
63
64 def test__init__withResources(self):
65 queue = "main.q mem_free=3G"
66 acronym = "Test__init__"
67 expQueue = "main.q"
68 explResources = ['mem_free=3G']
69 expJob = Job(queue = expQueue, lResources = explResources)
70 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", queue, self._groupid, acronym)
71 obsJob = iLauncher.job
72 self.assertEquals(expJob, obsJob)
73
74 def test_createGroupidIfItNotExist(self):
75 acronym = "checkGroupID"
76 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
77 iLauncher.createGroupidIfItNotExist()
78 obsGroupid = iLauncher.job.groupid
79 self.assertEquals(self._groupid, obsGroupid)
80
81 def test_createGroupidIfItNotExist_without_groupid(self):
82 groupid = ""
83 acronym = "checkGroupID"
84 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, groupid, acronym)
85 iLauncher.createGroupidIfItNotExist()
86 obsGroupid = iLauncher.job.groupid
87 self.assertTrue(obsGroupid != "")
88
89 def test_beginRun_with_Job_finished_in_Table(self):
90 acronym = "BeginRun"
91 iJob = Job(queue = self._queue)
92 self._jobdb.recordJob(iJob)
93 self._jobdb.changeJobStatus(iJob, "finished")
94 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
95 iLauncher.beginRun()
96 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
97
98 def test_beginRun_with_Job_unfinished_in_Table(self):
99 acronym = "testU_BeginRun"
100 cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
101 pyFileName = "%s/launcherFileTest_BeginRun.py" % os.getcwd()
102 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
103 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
104 else:
105 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
106 iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
107 iWriteScript.run(cmd_start, "", pyFileName)
108 os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
109 self._jobdb.submitJob(iJob)
110 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
111
112 iLauncher.beginRun()
113
114 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 1)
115
116 def test_getStatsOfExecutionTime(self):
117 acronym = "test_statTime"
118
119 expLValues = [1000.00000, 1000.00000]
120 expStat = Stat(expLValues)
121
122 f = open(acronym +".o1", "w")
123 f.write("executionTime=1000.000000")
124 f.close()
125 f = open(acronym +".o2", "w")
126 f.write("executionTime=1000.000000")
127 f.close()
128
129 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
130 obsStat = iLauncher.getStatsOfExecutionTime(acronym)
131
132 self.assertEqual(expStat, obsStat)
133
134 def test_endRun(self):
135 acronym = "testU_EndRun"
136 cmd_start = "log = os.system( \"date;sleep 10;date\" )\n"
137 pyFileName = "%s/launcherFileTest_EndRun.py" % os.getcwd()
138 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
139 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"])
140 else:
141 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName)
142
143 iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir)
144 iWriteScript.run(cmd_start, "", pyFileName)
145 os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
146 self._jobdb.submitJob(iJob)
147 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
148 iLauncher.job.groupid = self._groupid
149 iLauncher.endRun()
150
151 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0)
152 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "error") == 0)
153 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "waiting") == 0)
154
155 os.remove(iJob.launcher)
156
157 def test_clean(self):
158 acronym = "test_clean"
159 f = open("ClusterLauncher" + acronym + ".py", "w")
160 f.close()
161 f = open(acronym + ".o1", "w")
162 f.close()
163 f = open(acronym + ".e1", "w")
164 f.close()
165 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym)
166 iLauncher.clean(acronym)
167 self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym + ".py"))
168
169 def test_clean_without_acronym(self):
170 acronym = ""
171 acronym2 = "toto"
172 f = open("ClusterLauncher" + acronym2 + ".py", "w")
173 f.close()
174 f = open(acronym2 + ".o1", "w")
175 f.close()
176 f = open(acronym2 + ".e1", "w")
177 f.close()
178 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym2)
179 iLauncher.clean(acronym)
180 self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym2 + ".py"))
181
182 def test_getQueueNameAndResources_queue_no_resource(self):
183 configQueue = "all.q"
184 expQueueName = "all.q"
185 expResources = []
186 iLauncher = Launcher(self._jobdb)
187 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
188 self.assertEquals(expQueueName, obsQueueName)
189 self.assertEquals(expResources, obsResources)
190
191 def test_getQueueNameAndResources_queue_one_resource(self):
192 configQueue = "test.q 'test=TRUE'"
193 expQueueName = "test.q"
194 expResources = ["test=TRUE"]
195 iLauncher = Launcher(self._jobdb)
196 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
197 self.assertEquals(expQueueName, obsQueueName)
198 self.assertEquals(expResources, obsResources)
199
200 def test_getQueueNameAndResources_queue_two_resources(self):
201 configQueue = "big.q 's_data=8G s_cpu=96:00:00'"
202 expQueueName = "big.q"
203 expResources = ["s_data=8G", "s_cpu=96:00:00"]
204 iLauncher = Launcher(self._jobdb)
205 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
206 self.assertEquals(expQueueName, obsQueueName)
207 self.assertEquals(expResources, obsResources)
208
209 def test_getQueueNameAndResources_no_queue_no_resource(self):
210 configQueue = ""
211 expQueueName = ""
212 expResources = []
213 iLauncher = Launcher(self._jobdb)
214 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
215 self.assertEquals(expQueueName, obsQueueName)
216 self.assertEquals(expResources, obsResources)
217
218 def test_getQueueNameAndResources_no_queue_one_resource(self):
219 configQueue = "s_data=8G"
220 expQueueName = ""
221 expResources = ["s_data=8G"]
222 iLauncher = Launcher(self._jobdb)
223 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
224 self.assertEquals(expQueueName, obsQueueName)
225 self.assertEquals(expResources, obsResources)
226
227 def test_getQueueNameAndResources_no_queue_two_resource(self):
228 configQueue = "s_data=8G s_cpu=96:00:00"
229 expQueueName = ""
230 expResources = ["s_data=8G", "s_cpu=96:00:00"]
231 iLauncher = Launcher(self._jobdb)
232 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue)
233 self.assertEquals(expQueueName, obsQueueName)
234 self.assertEquals(expResources, obsResources)
235
236 # #TODO: test with at least 2 lines in cmd
237 def test_runSingleJob(self):
238 acronym = "Test_runSingleJob"
239 os.mkdir(acronym)
240 os.chdir(acronym)
241 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
242 iLauncher.job.groupid = self._groupid
243 iLauncher.job.jobname = acronym
244 iLauncher.job.queue = self._queue
245 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
246 iLauncher.job.lResources = ["test=TRUE"]
247 cmd = "log = os.system(\"touch 'YuFei'\")\n"
248 iLauncher.runSingleJob(cmd)
249 time.sleep(20)
250 jobStatus = self._jobdb.getJobStatus(iLauncher.job)
251 os.chdir(self._cDir)
252 shutil.rmtree(acronym)
253 self.assertEqual(jobStatus, "finished")
254
255 def test_runSingleJob_catch_error_wrong_tmpDir(self):
256 acronym = "Test_runSingleJob_catch_error"
257 os.mkdir(acronym)
258 os.chdir(acronym)
259 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), "%s/toto" % self._tmpDir, "", self._queue, self._groupid, acronym)
260 iLauncher.job.groupid = self._groupid
261 iLauncher.job.jobname = acronym
262 iLauncher.job.queue = self._queue
263 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
264 iLauncher.job.lResources = ["test=TRUE"]
265 cmd = "log = os.system(\"touch 'YuFei'\")\n"
266 iLauncher.runSingleJob(cmd)
267 time.sleep(20)
268 jobStatus = self._jobdb.getJobStatus(iLauncher.job)
269 os.chdir(self._cDir)
270 shutil.rmtree(acronym)
271 self.assertEqual(jobStatus, "error")
272
273 def test_runSingleJob_catch_error_wrong_cmd(self):
274 acronym = "Test_runSingleJob_catch_error"
275 os.mkdir(acronym)
276 os.chdir(acronym)
277 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym)
278 iLauncher.job.groupid = self._groupid
279 iLauncher.job.jobname = acronym
280 iLauncher.job.queue = self._queue
281 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"):
282 iLauncher.job.lResources = ["test=TRUE"]
283 cmd = "log = os.system(\"truc -i toto\")\n"
284 iLauncher.runSingleJob(cmd)
285 time.sleep(20)
286 jobStatus = self._jobdb.getJobStatus(iLauncher.job)
287 self._jobdb.cleanJobGroup(self._groupid)
288 os.chdir(self._cDir)
289 shutil.rmtree(acronym)
290 self.assertEqual(jobStatus, "error")
291
292 def test_prepareCommands(self):
293 expCmdStart = "os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")\n\tos.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")\n\tlog = os.system(\"touch file\")\n\t"
294 expCmdFinish = "if os.path.exists(\"yufei.align\"):\n\t\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )\n\t"
295 expCmdSize = "fileSize = 3.2\n\t\t"
296 expCmdCopy = "shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")\n\t\tshutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")\n\t\t"
297
298 lCmdStart = []
299 lCmdStart.append("os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")")
300 lCmdStart.append("os.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")")
301 lCmds = []
302 lCmds.append("log = os.system(\"touch file\")")
303 lCmdFinish = []
304 lCmdFinish.append("if os.path.exists(\"yufei.align\"):")
305 lCmdFinish.append("\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )")
306 lCmdSize = []
307 lCmdSize.append("fileSize = 3.2")
308 lCmdCopy = []
309 lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")")
310 lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")")
311
312 iLauncher = Launcher(self._jobdb)
313 obsCmdStart, obsCmdFinish, obsCmdSize, obsCmdCopy = iLauncher.prepareCommands(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy)
314
315 self.assertEquals(expCmdStart, obsCmdStart)
316 self.assertEquals(expCmdFinish, obsCmdFinish)
317 self.assertEquals(expCmdSize, obsCmdSize)
318 self.assertEquals(expCmdCopy, obsCmdCopy)
319
320 def test_getSystemCommand(self):
321 prg = "touch"
322 lArgs = []
323 lArgs.append("file")
324 expCmd = "log = os.system(\"touch file\")"
325 iLauncher = Launcher(self._jobdb)
326 obsCmd = iLauncher.getSystemCommand(prg, lArgs)
327 self.assertEquals(expCmd, obsCmd)
328
329
330 test_suite = unittest.TestSuite()
331 test_suite.addTest( unittest.makeSuite( Test_Launcher ) )
332 if __name__ == "__main__":
333 unittest.TextTestRunner(verbosity=2).run( test_suite )