comparison smart_toolShed/commons/core/launcher/test/Test_WriteScript.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e0f8dcca02ed
1 from commons.core.utils.FileUtils import FileUtils
2 from commons.core.launcher.WriteScript import WriteScript
3 from commons.core.sql.Job import Job
4 from commons.core.sql.DbFactory import DbFactory
5 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
6 import unittest
7 import os
8 import shutil
9 import time
10 import threading
11
12 class Test_WriteScript(unittest.TestCase):
13
14 def setUp(self):
15 self._testDir = os.getcwd()
16 self._acronym = "dummyAcronym"
17 self._jobTable = "dummyJobsTable"
18 self._iDb = DbFactory.createInstance()
19 self._iDb.createTable(self._jobTable, "jobs", overwrite = True)
20 self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable)
21 self._job = Job()
22 self._job.groupid = "groupid"
23 self._job.jobname = self._acronym
24 self._job.launcher = "ClusterLauncher"
25 self._jobdb.recordJob(self._job)
26 self._dummyScratch = "dummyScratch"
27 os.mkdir(self._dummyScratch)
28 os.chdir(self._dummyScratch)
29 self._tmpDir = os.getcwd()
30 self._iScriptWriter = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir)
31
32 def tearDown(self):
33 self._iDb.dropTable(self._jobTable)
34 self._iDb.close()
35 if FileUtils.isRessourceExists(self._dummyScratch):
36 shutil.rmtree(self._dummyScratch)
37
38 def test_run(self):
39 isScriptAsRun = False
40 fileToCreate = 'dummyFile'
41 cmdStart = "log = os.system( \"touch %s\" )\n" % fileToCreate
42 cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
43 pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)
44
45 self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
46 os.system("python %s" % pyFileName)
47
48 os.chdir(self._testDir)
49 if FileUtils.isRessourceExists(fileToCreate):
50 os.remove(fileToCreate)
51 isScriptAsRun = True
52 expJobStatus = "finished"
53 obsJobStatus = self._jobdb.getJobStatus(self._job)
54
55 self.assertTrue(isScriptAsRun)
56 self.assertEquals(expJobStatus, obsJobStatus)
57
58 def test_run_with_cmdSize_and_cmdCopy(self):
59 isScriptAsRun = False
60 fileToCreate = 'dummyFile'
61 fileSize = 0.5
62 cmdSize = "fileSize = %f\n" % fileSize
63 cmdCopy = "os.system(\"touch bank.fa\")\n"
64 cmdStart = "log = os.system(\"touch %s\")\n" % fileToCreate
65 cmdFinish = "shutil.move(\"%s\", \"%s\")" % (fileToCreate, self._testDir)
66 pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)
67
68 iWriteScript = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir, True)
69 iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
70 os.system("python %s" % pyFileName)
71
72 os.chdir(self._testDir)
73 if FileUtils.isRessourceExists(fileToCreate):
74 os.remove(fileToCreate)
75 isScriptAsRun = True
76 expJobStatus = "finished"
77 obsJobStatus = self._jobdb.getJobStatus(self._job)
78
79 self.assertTrue(isScriptAsRun)
80 self.assertEquals(expJobStatus, obsJobStatus)
81
82 #TODO: how to test ?
83 # def test_run_2_jobs_trying_to_create_same_groupIdDir(self):
84 # fileToCreate1 = 'dummyFile1'
85 # fileToCreate2 = 'dummyFile2'
86 # flagFileOSError = "osErrorRaised"
87 #
88 # fileSize = 0.5
89 # cmd_checkSize = ""
90 # cmd_checkSize += "if not os.path.exists( \"%s\" ):\n" % self._job.groupid
91 # cmd_checkSize += "\tfileSize = %f\n" % fileSize
92 #
93 # cmd_checkGroupidDir1 = ""
94 # cmd_checkGroupidDir1 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid
95 # cmd_checkGroupidDir1 += "\ttry:\n"
96 # cmd_checkGroupidDir1 += "\t\ttime.sleep(10)\n"
97 # cmd_checkGroupidDir1 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid
98 # cmd_checkGroupidDir1 += "\texcept OSError, e :\n"
99 # cmd_checkGroupidDir1 += "\t\tos.system(\"touch %s\")\n" % flagFileOSError
100 # cmd_checkGroupidDir1 += "\t\tif e.args[0] != 17:\n"
101 # cmd_checkGroupidDir1 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid
102 # cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid
103 # cmd_checkGroupidDir1 += "\tos.system(\"touch bank.fa\")\n" #cp
104 # cmd_checkGroupidDir1 += "else:\n"
105 # cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid
106 #
107 # cmdStart1 = "log = os.system(\"touch %s\")\n" % fileToCreate1
108 # cmdFinish1 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate1, self._testDir)
109 # pyFileName1 = "%s/ClusterLauncher1_job1.py" % os.getcwd()
110 #
111 # cmd_checkGroupidDir2 = ""
112 # cmd_checkGroupidDir2 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid
113 # cmd_checkGroupidDir2 += "\ttry:\n"
114 # cmd_checkGroupidDir2 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid
115 # cmd_checkGroupidDir2 += "\texcept OSError, e :\n"
116 # cmd_checkGroupidDir2 += "\t\tif e.args[0] != 17:\n"
117 # cmd_checkGroupidDir2 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid
118 # cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid
119 # cmd_checkGroupidDir2 += "\tos.system(\"touch bank.fa\")\n" #cp
120 # cmd_checkGroupidDir2 += "else:\n"
121 # cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid
122 #
123 # cmdStart2 = "log = os.system(\"touch %s\")\n" % fileToCreate2
124 # cmdFinish2 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate2, self._testDir)
125 # pyFileName2 = "%s/ClusterLauncher2_job2.py" % os.getcwd()
126 #
127 # job1 = Job(self._jobTable, jobname = "job1", groupid = self._job.groupid)
128 # self._jobdb.recordJob(job1)
129 # job2 = Job(self._jobTable, jobname = "job2", groupid = self._job.groupid)
130 # self._jobdb.recordJob(job2)
131 # iScriptWriter1 = WriteScript(job1, self._jobdb, self._testDir, self._tmpDir)
132 # iScriptWriter1.run(cmdStart1, cmdFinish1, pyFileName1, cmd_checkSize, cmd_checkGroupidDir1)
133 # iScriptWriter2 = WriteScript(job2, self._jobdb, self._testDir, self._tmpDir)
134 # iScriptWriter2.run(cmdStart2, cmdFinish2, pyFileName2, cmd_checkSize, cmd_checkGroupidDir2)
135 #
136 # iCFT1 = CreateFileThread(pyFileName1)
137 # iCFT2 = CreateFileThread(pyFileName2)
138 # iCFT1.start()
139 # iCFT2.start()
140 # while iCFT1.isAlive() or iCFT2.isAlive():
141 # time.sleep(5)
142 # self.assertTrue(FileUtils.isRessourceExists(flagFileOSError))
143 # os.chdir(self._testDir)
144 #
145 # if FileUtils.isRessourceExists(fileToCreate1):
146 # os.remove(fileToCreate1)
147 #
148 # if FileUtils.isRessourceExists(fileToCreate2):
149 # os.remove(fileToCreate2)
150
151 def test_run_2_lines_in_cmd_start(self):
152 isScriptAsRun = False
153 fileToCreate = 'dummyFile'
154
155 cmdStart = "log = 0\n\t"
156 cmdStart += "if True:\n\t"
157 cmdStart += "\tos.system( \"touch dummyFile\" )\n"
158 cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
159 pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)
160
161 self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
162 os.system("python %s" % pyFileName)
163
164 os.chdir(self._testDir)
165 if FileUtils.isRessourceExists(fileToCreate):
166 os.remove(fileToCreate)
167 isScriptAsRun = True
168 self.assertTrue(isScriptAsRun)
169
170 def test_run_2_lines_in_cmd_finish(self):
171 isScriptAsRun = False
172 fileToCreate = 'dummyFile'
173
174 cmdStart = "log = 0\n\t"
175 cmdStart += "if True:\n\t"
176 cmdStart += "\tos.system( \"touch dummyFile\" )\n"
177 cmdFinish = "if True:\n\t"
178 cmdFinish += "\tos.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
179 pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)
180
181 self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
182 os.system("python %s" % pyFileName)
183
184 os.chdir(self._testDir)
185 if FileUtils.isRessourceExists(fileToCreate):
186 os.remove(fileToCreate)
187 isScriptAsRun = True
188 self.assertTrue(isScriptAsRun)
189
190 def test_fillTemplate_with_JobScriptTemplate(self):
191 os.chdir("..")
192 d = {
193 "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
194 "jobTableName" : "dummyJobsTable",
195 "groupId" : "groupid",
196 "jobName" : "job1",
197 "launcher" : "ClusterLauncher",
198 "time" : "20110505-105353",
199 "repet_path" : "/home/user/workspace/repet_pipe",
200 "repet_host" : "pisano",
201 "repet_user" : "user",
202 "repet_pw" : "user",
203 "repet_db" : "repet_user",
204 "repet_port" : "3306",
205 "cmdStart" : "log = os.system(\"touch dummyFile1\")",
206 "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
207 "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/"
208 }
209 expFileName = "expFiles/expJobScriptTemplate.py"
210 obsFileName = "obsFile.py"
211
212 iWS = WriteScript()
213 iWS.fillTemplate(obsFileName, d)
214 self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
215 os.remove(obsFileName)
216
217 def test_fillTemplate_with_JobScriptTemplate_2_lines_in_cmd_start(self):
218 os.chdir("..")
219 d = {
220 "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
221 "jobTableName" : "dummyJobsTable",
222 "groupId" : "groupid",
223 "jobName" : "job1",
224 "launcher" : "ClusterLauncher",
225 "time" : "20110505-105353",
226 "repet_path" : "/home/user/workspace/repet_pipe",
227 "repet_host" : "pisano",
228 "repet_user" : "user",
229 "repet_pw" : "user",
230 "repet_db" : "repet_user",
231 "repet_port" : "3306",
232 "cmdStart" : "print \"Hello Yufei\"\n\tlog = os.system(\"touch dummyFile1\")",
233 "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
234 "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/"
235 }
236 expFileName = "expFiles/expJobScriptTemplate_cmdWith2Lines.py"
237 obsFileName = "obsFile.py"
238
239 iWS = WriteScript()
240 iWS.fillTemplate(obsFileName, d)
241 self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
242 os.remove(obsFileName)
243
244 def test_fillTemplate_with_JobScriptWithFilesCopyTemplate(self):
245 os.chdir("..")
246 d = {
247 "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
248 "jobTableName" : "dummyJobsTable",
249 "groupId" : "groupid",
250 "jobName" : "job1",
251 "launcher" : "ClusterLauncher",
252 "time" : "20110505-105353",
253 "repet_path" : "/home/user/workspace/repet_pipe",
254 "repet_host" : "pisano",
255 "repet_user" : "user",
256 "repet_pw" : "user",
257 "repet_db" : "repet_user",
258 "repet_port" : "3306",
259 "cmdStart" : "log = os.system(\"touch dummyFile1\")",
260 "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
261 "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/",
262 "cmdSize" : "fileSize = 0.500000",
263 "cmdCopy" : "os.system(\"touch bank.fa\")"
264 }
265 expFileName = "expFiles/expJobScriptWithFilesCopyTemplate.py"
266 obsFileName = "obsFile.py"
267
268 iWS = WriteScript(chooseTemplateWithCopy = True)
269 iWS.fillTemplate(obsFileName, d)
270 self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
271 os.remove(obsFileName)
272
273 def test_fillTemplate_with_JobScriptTemplateLight(self):
274 os.chdir("..")
275 d = {
276 "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
277 "jobTableName" : "dummyJobsTable",
278 "groupId" : "groupid",
279 "jobName" : "job1",
280 "launcher" : "ClusterLauncher",
281 "time" : "20110505-105353",
282 "repet_path" : "/home/user/workspace/repet_pipe",
283 "cmdStart" : "log = os.system(\"touch dummyFile1\")",
284 "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
285 "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/",
286 "cmdSize" : "fileSize = 0.500000",
287 "cmdCopy" : "os.system(\"touch bank.fa\")"
288 }
289 expFileName = "expFiles/expJobScriptTemplateLight.py"
290 obsFileName = "obs.py"
291
292 iWS = WriteScript(chooseTemplateLight = True)
293 iWS.fillTemplate(obsFileName, d)
294 self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
295 os.remove(obsFileName)
296
297 def test_createJobScriptDict(self):
298 os.chdir("..")
299 cmd_start = "log = os.system(\"touch dummyFile1\")"
300 cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")"
301 cmd_size = ""
302 cmd_copy = ""
303 expDict = {
304 "tmpDir" : self._tmpDir,
305 "jobTableName" : self._jobTable,
306 "groupId" : self._job.groupid,
307 "jobName" : self._acronym,
308 "launcher" : self._job.launcher,
309 "time" : time.strftime("%Y%m%d-%H%M%S"),
310 "repet_path" : os.environ["REPET_PATH"],
311 "repet_host" : os.environ["REPET_HOST"],
312 "repet_user" : os.environ["REPET_USER"],
313 "repet_pw" : os.environ["REPET_PW"],
314 "repet_db" : os.environ["REPET_DB"],
315 "repet_port" : os.environ["REPET_PORT"],
316 "cmdStart" : cmd_start,
317 "cmdFinish" : cmd_finish,
318 "cDir" : self._testDir,
319 "cmdSize" : cmd_size,
320 "cmdCopy" : cmd_copy
321 }
322 obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy)
323 self.assertEquals(expDict, obsDict)
324
325 def test_createJobScriptDict_with_cmdSize_and_cmdCopy(self):
326 os.chdir("..")
327 cmd_start = "log = os.system(\"touch dummyFile1\")"
328 cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")"
329 cmd_size = "fileSize = 0.500000"
330 cmd_copy = "os.system(\"touch bank.fa\")"
331 expDict = {
332 "tmpDir" : self._tmpDir,
333 "jobTableName" : self._jobTable,
334 "groupId" : self._job.groupid,
335 "jobName" : self._acronym,
336 "launcher" : self._job.launcher,
337 "time" : time.strftime("%Y%m%d-%H%M%S"),
338 "repet_path" : os.environ["REPET_PATH"],
339 "repet_host" : os.environ["REPET_HOST"],
340 "repet_user" : os.environ["REPET_USER"],
341 "repet_pw" : os.environ["REPET_PW"],
342 "repet_db" : os.environ["REPET_DB"],
343 "repet_port" : os.environ["REPET_PORT"],
344 "cmdStart" : cmd_start,
345 "cmdFinish" : cmd_finish,
346 "cDir" : self._testDir,
347 "cmdSize" : cmd_size,
348 "cmdCopy" : cmd_copy
349 }
350 obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy)
351 self.assertEquals(expDict, obsDict)
352
353 class CreateFileThread(threading.Thread):
354
355 def __init__(self, pyFileName):
356 threading.Thread.__init__(self)
357 self._pyFileName = pyFileName
358
359 def run(self):
360 os.system("python %s" % self._pyFileName)
361
362 test_suite = unittest.TestSuite()
363 test_suite.addTest( unittest.makeSuite( Test_WriteScript ) )
364 if __name__ == "__main__":
365 unittest.TextTestRunner(verbosity=2).run( test_suite )