comparison smart_toolShed/commons/core/sql/test/Test_F_TableJobAdaptator.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e0f8dcca02ed
1 from commons.core.launcher.WriteScript import WriteScript
2 from commons.core.sql.Job import Job
3 from commons.core.sql.DbFactory import DbFactory
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
5 import sys
6 import stat
7 import os
8 import time
9 import unittest
10 import glob
11
12 class Test_F_TableJobAdaptator(unittest.TestCase):
13
14 def setUp(self):
15 self._jobTableName = "dummyJobTable"
16 self._db = DbFactory.createInstance()
17 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
18
19 def tearDown(self):
20 self._db.dropTable(self._jobTableName)
21 self._db.close()
22
23 def test_submitJob_with_multiple_jobs(self):
24 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
25 job1 = _createJobInstance("job1")
26 _createLauncherFile(job1, self._iTJA)
27 job2 = _createJobInstance("job2")
28 _createLauncherFile(job2, self._iTJA)
29 job3 = _createJobInstance("job3")
30 _createLauncherFile(job3, self._iTJA)
31
32 self._iTJA.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
33 self._iTJA.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
34 self._iTJA.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
35
36 time.sleep(120)
37
38 expJobStatus = "finished"
39 obsJobStatus1 = self._iTJA.getJobStatus(job1)
40 obsJobStatus2 = self._iTJA.getJobStatus(job2)
41 obsJobStatus3 = self._iTJA.getJobStatus(job3)
42
43 self.assertEquals(expJobStatus, obsJobStatus1)
44 self.assertEquals(expJobStatus, obsJobStatus2)
45 self.assertEquals(expJobStatus, obsJobStatus3)
46
47 expErrorFilePrefix1 = job1.jobname + ".e"
48 expOutputFilePrefix1 = job1.jobname + ".o"
49 expErrorFilePrefix2 = job2.jobname + ".e"
50 expOutputFilePrefix2 = job2.jobname + ".o"
51 expErrorFilePrefix3 = job3.jobname + ".e"
52 expOutputFilePrefix3 = job3.jobname + ".o"
53
54 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
55 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
56 lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
57 lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
58 lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
59 lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
60
61 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
62 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
63 isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0)
64 isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
65 isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0)
66 isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
67
68 os.system("rm launcherFileTest*.py *.e* *.o*")
69 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
70 self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
71 self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)
72
73 def test_submitJob_job_already_submitted(self):
74 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
75 iJob = _createJobInstance("job")
76 self._iTJA.recordJob(iJob)
77
78 isSysExitRaised = False
79 try:
80 self._iTJA.submitJob(iJob)
81 except SystemExit:
82 isSysExitRaised = True
83 self.assertTrue(isSysExitRaised)
84
85 def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
86 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
87 iJob = _createJobInstance("job")
88 _createLauncherFile(iJob, self._iTJA)
89
90 self._iTJA.recordJob(iJob)
91 self._iTJA.changeJobStatus(iJob, "error")
92
93 self._iTJA.waitJobGroup(iJob.groupid, 0, 2)
94
95 time.sleep(120)
96
97 expJobStatus = "finished"
98 obsJobStatus1 = self._iTJA.getJobStatus(iJob)
99
100 self.assertEquals(expJobStatus, obsJobStatus1)
101
102 expErrorFilePrefix1 = iJob.jobname + ".e"
103 expOutputFilePrefix1 = iJob.jobname + ".o"
104
105 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
106 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
107
108 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
109 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
110
111 self._iTJA.removeJob(iJob)
112 os.system("rm launcherFileTest*.py *.e* *.o*")
113 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
114
115 class Test_F_TableJobAdaptator_SGE(unittest.TestCase):
116
117 def setUp(self):
118 if os.environ["REPET_JOB_MANAGER"].lower() != "sge":
119 print "ERROR: jobs manager is not SGE: REPET_JOB_MANAGER = %s." % os.environ["REPET_JOB_MANAGER"]
120 sys.exit(0)
121 self._jobTableName = "dummyJobTable"
122 self._db = DbFactory.createInstance()
123 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
124 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
125 self._iJob = _createJobInstance("job")
126 _createLauncherFile(self._iJob, self._iTJA)
127
128 def tearDown(self):
129 self._db.dropTable(self._jobTableName)
130 self._db.close()
131
132 def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
133 self._iTJA.recordJob(self._iJob)
134 self._iTJA.changeJobStatus(self._iJob, "running")
135
136 expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % self._iJob.jobid
137
138 obsError = "obsError.txt"
139 obsErrorHandler = open(obsError, "w")
140 stderrRef = sys.stderr
141 sys.stderr = obsErrorHandler
142
143 isSysExitRaised = False
144 try:
145 self._iTJA.waitJobGroup(self._iJob.groupid, timeOutPerJob = 3)
146 except SystemExit:
147 isSysExitRaised = True
148
149 obsErrorHandler.close()
150
151 obsErrorHandler = open(obsError, "r")
152 obsMsg = obsErrorHandler.readline()
153 obsErrorHandler.close()
154
155 sys.stderr = stderrRef
156 os.remove(obsError)
157 os.system("rm launcherFileTest*.py")
158 self.assertTrue(isSysExitRaised)
159 self.assertEquals(expMsg, obsMsg)
160
161 def test_isJobStillHandledBySge_True(self):
162 self._iTJA.submitJob(self._iJob)
163 isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
164 os.system("rm launcherFileTest*.py")
165 self.assertTrue(isJobHandledBySge)
166
167 def test_isJobStillHandledBySge_False(self):
168 self._iTJA.recordJob(self._iJob)
169 isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
170 os.system("rm launcherFileTest*.py")
171 self.assertFalse(isJobHandledBySge)
172
173 def _createJobInstance(name):
174 lResources = []
175 if os.environ.get("HOSTNAME") == "compute-2-46.local":
176 lResources.append("test=TRUE")
177 return Job(0, name, "test", "", "log = os.system(\"date;sleep 5;date\")", "%s/launcherFileTest_%s.py" % (os.getcwd(), name), lResources=lResources)
178
179 def _createLauncherFile(iJob, iTJA):
180 iWriteScript = WriteScript(iJob, iTJA, os.getcwd(), os.getcwd())
181 iWriteScript.run(iJob.command, "", iJob.launcher)
182 os.chmod(iJob.launcher, stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO)
183
184 if __name__ == "__main__":
185 unittest.main()