Mercurial > repos > yufei-luo > s_mart
comparison smart_toolShed/commons/core/launcher/test/Test_Launcher.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e0f8dcca02ed |
---|---|
1 from commons.core.utils.FileUtils import FileUtils | |
2 from commons.core.launcher.Launcher import Launcher | |
3 from commons.core.launcher.WriteScript import WriteScript | |
4 from commons.core.stat.Stat import Stat | |
5 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory | |
6 from commons.core.sql.DbFactory import DbFactory | |
7 from commons.core.sql.Job import Job | |
8 import unittest | |
9 import os | |
10 import shutil | |
11 import time | |
12 import stat | |
13 | |
14 #TODO: Test_F_Launcher.py : to execute prepareCommands() and runSingleJob() | |
15 # to test runLauncherForMultipleJobs() | |
16 #TODO: check clean of "Test_runSingleJob" | |
17 #TODO: refactoring => choose between "self._queue" or "lResources" to set resources | |
18 class Test_Launcher(unittest.TestCase): | |
19 | |
20 SARUMAN_NAME = "compute-2-46.local" | |
21 | |
22 def setUp(self): | |
23 self._cDir = os.getcwd() | |
24 self._tmpDir = self._cDir | |
25 self._groupid = "test" | |
26 self._jobTable = "dummyJobTable" | |
27 self._iDb = DbFactory.createInstance() | |
28 self._iDb.createTable(self._jobTable, "jobs", overwrite = True) | |
29 self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable) | |
30 self._queue = "" | |
31 self._configFileName = "dummyConfigFile" | |
32 | |
33 def tearDown(self): | |
34 self._iDb.dropTable(self._jobTable) | |
35 self._iDb.close() | |
36 FileUtils.removeFilesByPattern('*.e*') | |
37 FileUtils.removeFilesByPattern('*.o*') | |
38 FileUtils.removeFilesByPattern('launcherFileTest_BeginRun.py') | |
39 FileUtils.removeFilesByPattern(self._configFileName) | |
40 FileUtils.removeFilesByPattern('ClusterLauncher_*') | |
41 | |
42 def test__init__wrong_fields_for_job_table(self): | |
43 self._iDb.dropTable(self._jobTable) | |
44 sqlCmd = "CREATE TABLE " + self._jobTable | |
45 sqlCmd += " ( jobid INT UNSIGNED" | |
46 sqlCmd += ", jobname VARCHAR(255)" | |
47 sqlCmd += ", groupid VARCHAR(255)" | |
48 sqlCmd += ", command TEXT" | |
49 sqlCmd += ", launcher VARCHAR(1024)" | |
50 sqlCmd += ", queue VARCHAR(255)" | |
51 sqlCmd += ", status VARCHAR(255)" | |
52 sqlCmd += ", time DATETIME" | |
53 sqlCmd += ", node VARCHAR(255) )" | |
54 self._iDb.execute(sqlCmd) | |
55 acronym = "Test__init__" | |
56 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) | |
57 lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"]) | |
58 lObsFields = sorted(self._iDb.getFieldList(self._jobTable)) | |
59 self.assertEquals(lExpFields, lObsFields) | |
60 expJob = Job(queue = self._queue) | |
61 obsJob = iLauncher.job | |
62 self.assertEquals(expJob, obsJob) | |
63 | |
64 def test__init__withResources(self): | |
65 queue = "main.q mem_free=3G" | |
66 acronym = "Test__init__" | |
67 expQueue = "main.q" | |
68 explResources = ['mem_free=3G'] | |
69 expJob = Job(queue = expQueue, lResources = explResources) | |
70 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", queue, self._groupid, acronym) | |
71 obsJob = iLauncher.job | |
72 self.assertEquals(expJob, obsJob) | |
73 | |
74 def test_createGroupidIfItNotExist(self): | |
75 acronym = "checkGroupID" | |
76 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) | |
77 iLauncher.createGroupidIfItNotExist() | |
78 obsGroupid = iLauncher.job.groupid | |
79 self.assertEquals(self._groupid, obsGroupid) | |
80 | |
81 def test_createGroupidIfItNotExist_without_groupid(self): | |
82 groupid = "" | |
83 acronym = "checkGroupID" | |
84 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, groupid, acronym) | |
85 iLauncher.createGroupidIfItNotExist() | |
86 obsGroupid = iLauncher.job.groupid | |
87 self.assertTrue(obsGroupid != "") | |
88 | |
89 def test_beginRun_with_Job_finished_in_Table(self): | |
90 acronym = "BeginRun" | |
91 iJob = Job(queue = self._queue) | |
92 self._jobdb.recordJob(iJob) | |
93 self._jobdb.changeJobStatus(iJob, "finished") | |
94 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) | |
95 iLauncher.beginRun() | |
96 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0) | |
97 | |
98 def test_beginRun_with_Job_unfinished_in_Table(self): | |
99 acronym = "testU_BeginRun" | |
100 cmd_start = "log = os.system( \"date;sleep 10;date\" )\n" | |
101 pyFileName = "%s/launcherFileTest_BeginRun.py" % os.getcwd() | |
102 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): | |
103 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"]) | |
104 else: | |
105 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName) | |
106 iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir) | |
107 iWriteScript.run(cmd_start, "", pyFileName) | |
108 os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) | |
109 self._jobdb.submitJob(iJob) | |
110 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) | |
111 | |
112 iLauncher.beginRun() | |
113 | |
114 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 1) | |
115 | |
116 def test_getStatsOfExecutionTime(self): | |
117 acronym = "test_statTime" | |
118 | |
119 expLValues = [1000.00000, 1000.00000] | |
120 expStat = Stat(expLValues) | |
121 | |
122 f = open(acronym +".o1", "w") | |
123 f.write("executionTime=1000.000000") | |
124 f.close() | |
125 f = open(acronym +".o2", "w") | |
126 f.write("executionTime=1000.000000") | |
127 f.close() | |
128 | |
129 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) | |
130 obsStat = iLauncher.getStatsOfExecutionTime(acronym) | |
131 | |
132 self.assertEqual(expStat, obsStat) | |
133 | |
134 def test_endRun(self): | |
135 acronym = "testU_EndRun" | |
136 cmd_start = "log = os.system( \"date;sleep 10;date\" )\n" | |
137 pyFileName = "%s/launcherFileTest_EndRun.py" % os.getcwd() | |
138 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): | |
139 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"]) | |
140 else: | |
141 iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName) | |
142 | |
143 iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir) | |
144 iWriteScript.run(cmd_start, "", pyFileName) | |
145 os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) | |
146 self._jobdb.submitJob(iJob) | |
147 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) | |
148 iLauncher.job.groupid = self._groupid | |
149 iLauncher.endRun() | |
150 | |
151 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0) | |
152 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "error") == 0) | |
153 self.assertTrue(self._jobdb.getCountStatus(self._groupid, "waiting") == 0) | |
154 | |
155 os.remove(iJob.launcher) | |
156 | |
157 def test_clean(self): | |
158 acronym = "test_clean" | |
159 f = open("ClusterLauncher" + acronym + ".py", "w") | |
160 f.close() | |
161 f = open(acronym + ".o1", "w") | |
162 f.close() | |
163 f = open(acronym + ".e1", "w") | |
164 f.close() | |
165 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) | |
166 iLauncher.clean(acronym) | |
167 self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym + ".py")) | |
168 | |
169 def test_clean_without_acronym(self): | |
170 acronym = "" | |
171 acronym2 = "toto" | |
172 f = open("ClusterLauncher" + acronym2 + ".py", "w") | |
173 f.close() | |
174 f = open(acronym2 + ".o1", "w") | |
175 f.close() | |
176 f = open(acronym2 + ".e1", "w") | |
177 f.close() | |
178 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym2) | |
179 iLauncher.clean(acronym) | |
180 self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym2 + ".py")) | |
181 | |
182 def test_getQueueNameAndResources_queue_no_resource(self): | |
183 configQueue = "all.q" | |
184 expQueueName = "all.q" | |
185 expResources = [] | |
186 iLauncher = Launcher(self._jobdb) | |
187 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) | |
188 self.assertEquals(expQueueName, obsQueueName) | |
189 self.assertEquals(expResources, obsResources) | |
190 | |
191 def test_getQueueNameAndResources_queue_one_resource(self): | |
192 configQueue = "test.q 'test=TRUE'" | |
193 expQueueName = "test.q" | |
194 expResources = ["test=TRUE"] | |
195 iLauncher = Launcher(self._jobdb) | |
196 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) | |
197 self.assertEquals(expQueueName, obsQueueName) | |
198 self.assertEquals(expResources, obsResources) | |
199 | |
200 def test_getQueueNameAndResources_queue_two_resources(self): | |
201 configQueue = "big.q 's_data=8G s_cpu=96:00:00'" | |
202 expQueueName = "big.q" | |
203 expResources = ["s_data=8G", "s_cpu=96:00:00"] | |
204 iLauncher = Launcher(self._jobdb) | |
205 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) | |
206 self.assertEquals(expQueueName, obsQueueName) | |
207 self.assertEquals(expResources, obsResources) | |
208 | |
209 def test_getQueueNameAndResources_no_queue_no_resource(self): | |
210 configQueue = "" | |
211 expQueueName = "" | |
212 expResources = [] | |
213 iLauncher = Launcher(self._jobdb) | |
214 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) | |
215 self.assertEquals(expQueueName, obsQueueName) | |
216 self.assertEquals(expResources, obsResources) | |
217 | |
218 def test_getQueueNameAndResources_no_queue_one_resource(self): | |
219 configQueue = "s_data=8G" | |
220 expQueueName = "" | |
221 expResources = ["s_data=8G"] | |
222 iLauncher = Launcher(self._jobdb) | |
223 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) | |
224 self.assertEquals(expQueueName, obsQueueName) | |
225 self.assertEquals(expResources, obsResources) | |
226 | |
227 def test_getQueueNameAndResources_no_queue_two_resource(self): | |
228 configQueue = "s_data=8G s_cpu=96:00:00" | |
229 expQueueName = "" | |
230 expResources = ["s_data=8G", "s_cpu=96:00:00"] | |
231 iLauncher = Launcher(self._jobdb) | |
232 obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) | |
233 self.assertEquals(expQueueName, obsQueueName) | |
234 self.assertEquals(expResources, obsResources) | |
235 | |
236 # #TODO: test with at least 2 lines in cmd | |
237 def test_runSingleJob(self): | |
238 acronym = "Test_runSingleJob" | |
239 os.mkdir(acronym) | |
240 os.chdir(acronym) | |
241 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym) | |
242 iLauncher.job.groupid = self._groupid | |
243 iLauncher.job.jobname = acronym | |
244 iLauncher.job.queue = self._queue | |
245 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): | |
246 iLauncher.job.lResources = ["test=TRUE"] | |
247 cmd = "log = os.system(\"touch 'YuFei'\")\n" | |
248 iLauncher.runSingleJob(cmd) | |
249 time.sleep(20) | |
250 jobStatus = self._jobdb.getJobStatus(iLauncher.job) | |
251 os.chdir(self._cDir) | |
252 shutil.rmtree(acronym) | |
253 self.assertEqual(jobStatus, "finished") | |
254 | |
255 def test_runSingleJob_catch_error_wrong_tmpDir(self): | |
256 acronym = "Test_runSingleJob_catch_error" | |
257 os.mkdir(acronym) | |
258 os.chdir(acronym) | |
259 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), "%s/toto" % self._tmpDir, "", self._queue, self._groupid, acronym) | |
260 iLauncher.job.groupid = self._groupid | |
261 iLauncher.job.jobname = acronym | |
262 iLauncher.job.queue = self._queue | |
263 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): | |
264 iLauncher.job.lResources = ["test=TRUE"] | |
265 cmd = "log = os.system(\"touch 'YuFei'\")\n" | |
266 iLauncher.runSingleJob(cmd) | |
267 time.sleep(20) | |
268 jobStatus = self._jobdb.getJobStatus(iLauncher.job) | |
269 os.chdir(self._cDir) | |
270 shutil.rmtree(acronym) | |
271 self.assertEqual(jobStatus, "error") | |
272 | |
273 def test_runSingleJob_catch_error_wrong_cmd(self): | |
274 acronym = "Test_runSingleJob_catch_error" | |
275 os.mkdir(acronym) | |
276 os.chdir(acronym) | |
277 iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym) | |
278 iLauncher.job.groupid = self._groupid | |
279 iLauncher.job.jobname = acronym | |
280 iLauncher.job.queue = self._queue | |
281 if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): | |
282 iLauncher.job.lResources = ["test=TRUE"] | |
283 cmd = "log = os.system(\"truc -i toto\")\n" | |
284 iLauncher.runSingleJob(cmd) | |
285 time.sleep(20) | |
286 jobStatus = self._jobdb.getJobStatus(iLauncher.job) | |
287 self._jobdb.cleanJobGroup(self._groupid) | |
288 os.chdir(self._cDir) | |
289 shutil.rmtree(acronym) | |
290 self.assertEqual(jobStatus, "error") | |
291 | |
292 def test_prepareCommands(self): | |
293 expCmdStart = "os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")\n\tos.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")\n\tlog = os.system(\"touch file\")\n\t" | |
294 expCmdFinish = "if os.path.exists(\"yufei.align\"):\n\t\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )\n\t" | |
295 expCmdSize = "fileSize = 3.2\n\t\t" | |
296 expCmdCopy = "shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")\n\t\tshutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")\n\t\t" | |
297 | |
298 lCmdStart = [] | |
299 lCmdStart.append("os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")") | |
300 lCmdStart.append("os.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")") | |
301 lCmds = [] | |
302 lCmds.append("log = os.system(\"touch file\")") | |
303 lCmdFinish = [] | |
304 lCmdFinish.append("if os.path.exists(\"yufei.align\"):") | |
305 lCmdFinish.append("\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )") | |
306 lCmdSize = [] | |
307 lCmdSize.append("fileSize = 3.2") | |
308 lCmdCopy = [] | |
309 lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")") | |
310 lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")") | |
311 | |
312 iLauncher = Launcher(self._jobdb) | |
313 obsCmdStart, obsCmdFinish, obsCmdSize, obsCmdCopy = iLauncher.prepareCommands(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy) | |
314 | |
315 self.assertEquals(expCmdStart, obsCmdStart) | |
316 self.assertEquals(expCmdFinish, obsCmdFinish) | |
317 self.assertEquals(expCmdSize, obsCmdSize) | |
318 self.assertEquals(expCmdCopy, obsCmdCopy) | |
319 | |
320 def test_getSystemCommand(self): | |
321 prg = "touch" | |
322 lArgs = [] | |
323 lArgs.append("file") | |
324 expCmd = "log = os.system(\"touch file\")" | |
325 iLauncher = Launcher(self._jobdb) | |
326 obsCmd = iLauncher.getSystemCommand(prg, lArgs) | |
327 self.assertEquals(expCmd, obsCmd) | |
328 | |
329 | |
330 test_suite = unittest.TestSuite() | |
331 test_suite.addTest( unittest.makeSuite( Test_Launcher ) ) | |
332 if __name__ == "__main__": | |
333 unittest.TextTestRunner(verbosity=2).run( test_suite ) |