comparison smart_toolShed/commons/core/sql/TableJobAdaptator.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e0f8dcca02ed
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31
32 import os
33 import time
34 import datetime
35 import sys
36 from commons.core.sql.Job import Job
37 from commons.core.sql.TableAdaptator import TableAdaptator
38
39 ## Methods for Job persistence
40 #
41 class TableJobAdaptator(TableAdaptator):
42
43 ## Record a job
44 #
45 # @param job Job instance with the job informations
46 #
47 def recordJob(self, job):
48 self.removeJob(job)
49 sqlCmd = "INSERT INTO %s" % self._table
50 sqlCmd += " VALUES ("
51 sqlCmd += " \"%s\"," % job.jobid
52 sqlCmd += " \"%s\"," % job.jobname
53 sqlCmd += " \"%s\"," % job.groupid
54 sqlCmd += " \"%s\"," % job.launcher
55 sqlCmd += " \"%s\"," % job.queue
56 sqlCmd += " \"%s\"," % job.lResources
57 sqlCmd += " \"waiting\","
58 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S")
59 sqlCmd += " \"?\" );"
60 self._iDb.execute(sqlCmd)
61
62
63 ## Remove a job from the job table
64 #
65 # @param job: job instance to remove
66 #
67 def removeJob(self, job):
68 qry = "DELETE FROM %s" % self._table
69 qry += " WHERE groupid='%s'" % job.groupid
70 qry += " AND jobname='%s'" % job.jobname
71 qry += " AND launcher='%s';" % job.launcher
72 self._iDb.execute(qry)
73
74
75 ## Set the jobid of a job with the id of SGE
76 #
77 # @param job job instance
78 # @param jobid integer
79 #
80 def updateJobIdInDB(self, job, jobid):
81 #TODO: check if only one job will be updated
82 qry = "UPDATE %s" % self._table
83 qry += " SET jobid='%i'" % int(jobid)
84 qry += " WHERE jobname='%s'" % job.jobname
85 qry += " AND groupid='%s'" % job.groupid
86 qry += " AND launcher='%s';" % job.launcher
87 self._iDb.execute(qry)
88
89
90 ## Get a job status
91 #
92 # @param job: a Job instance with the job informations
93 #
94 def getJobStatus(self, job):
95 if job.jobid != 0 and job.jobname == "":
96 job.jobname = job.jobid
97 job.jobid = 0
98 qry = "SELECT status FROM %s" % self._table
99 qry += " WHERE groupid='%s'" % job.groupid
100 qry += " AND jobname='%s'" % job.jobname
101 qry += " AND launcher='%s';" % job.launcher
102 self._iDb.execute(qry)
103 res = self._iDb.fetchall()
104 if len(res) > 1:
105 sys.stderr.write("ERROR while getting job status: non-unique jobs\n")
106 sys.stderr.flush()
107 sys.exit(1)
108 if res == None or len(res) == 0:
109 return "unknown"
110 return res[0][0]
111
112
113 ## Change a job status
114 #
115 # @param job: a Job instance with the job informations
116 # @param status: the new status (waiting,finished,error)
117 #
118 def changeJobStatus(self, job, status):
119 sqlCmd = "UPDATE %s" % self._table
120 sqlCmd += " SET status='%s'" % status
121 sqlCmd += ", node='%s'" % job.node
122 sqlCmd += " WHERE groupid='%s'" % job.groupid
123 sqlCmd += " AND jobname='%s'" % job.jobname
124 sqlCmd += " AND launcher='%s';" % job.launcher
125 self._iDb.execute(sqlCmd)
126
127
128 ## Get the number of jobs belonging to the desired groupid with the desired status.
129 #
130 # @param groupid string a group identifier to record related job series
131 # @param status string job status (waiting, running, finished, error)
132 # @return int
133 #
134 def getCountStatus(self, groupid, status):
135 qry = "SELECT count(jobname) FROM %s" % self._table
136 qry += " WHERE groupid='%s'" % groupid
137 qry += " AND status='%s';" % status
138 self._iDb.execute(qry)
139 res = self._iDb.fetchall()
140 return int(res[0][0])
141
142
143 ## Clean all job from a job group
144 #
145 # @param groupid: a group identifier to record related job series
146 #
147 def cleanJobGroup(self, groupid):
148 qry = "DELETE FROM %s WHERE groupid='%s';" % (self._table, groupid)
149 self._iDb.execute(qry)
150
151
152 ## Check if there is unfinished job from a job group.
153 #
154 # @param groupid string a group identifier to record related job series
155 #
156 def hasUnfinishedJob(self, groupid):
157 qry = "SELECT * FROM %s" % self._table
158 qry += " WHERE groupid='%s'" % groupid
159 qry += " and status!='finished';"
160 self._iDb.execute(qry)
161 res = self._iDb.fetchall()
162 if len(res) == 0:
163 return False
164 return True
165
166
167 ## Wait job finished status from a job group.
168 # Job are re-launched if error (max. 3 times)
169 #
170 # @param groupid string a group identifier to record related job series
171 # @param checkInterval integer time laps in seconds between two checks (default = 5)
172 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
173 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
174 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
175 #
176 def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
177 dJob2Err = {}
178
179 # retrieve the total number of jobs belonging to the desired groupid
180 qry = "SELECT count(jobname) FROM %s WHERE groupid='%s';" % (self._table, groupid)
181 self._iDb.execute(qry)
182 totalNbJobs = int(self._iDb.fetchall()[0][0])
183
184 nbTimeOuts = 0
185
186 while True:
187 time.sleep(checkInterval)
188 # retrieve the finished jobs and stop if all jobs are finished
189 nbFinishedJobs = self.getCountStatus(groupid, "finished")
190 if nbFinishedJobs == totalNbJobs:
191 break
192
193 # retrieve the jobs in error and relaunch them if they are in error (max. 'maxRelaunch' times)
194 qry = "SELECT * FROM %s" % self._table
195 qry += " WHERE groupid='%s'" % groupid
196 qry += " AND status ='error';"
197 self._iDb.execute(qry)
198 lJobsInError = self._iDb.fetchall()
199 for job in lJobsInError:
200 jobName = job[1]
201 if not dJob2Err.has_key(jobName):
202 dJob2Err[jobName] = 1
203 if dJob2Err[jobName] < maxRelaunch:
204 print "job '%s' in error, re-submitting (%i)" % (job[1], dJob2Err[job[1]])
205 sys.stdout.flush()
206 lResources = job[5].replace("[", "").replace("]", "").replace("'", "").split(", ")
207 newJob = Job(jobname=jobName, groupid=job[2], launcherFile=job[3], queue=job[4], lResources=lResources)
208 self.submitJob(newJob)
209 dJob2Err[jobName] += 1
210 else:
211 dJob2Err[jobName] += 1
212 cmd = "job '%s' in permanent error (>%i)" % (jobName, maxRelaunch)
213 cmd += "\ngroupid = %s" % groupid
214 cmd += "\nnb of jobs = %i" % totalNbJobs
215 cmd += "\nnb of finished jobs = %i" % self.getCountStatus(groupid, "finished")
216 cmd += "\nnb of waiting jobs = %i" % self.getCountStatus(groupid, "waiting")
217 cmd += "\nnb of running jobs = %i" % self.getCountStatus(groupid, "running")
218 cmd += "\nnb of jobs in error = %i" % self.getCountStatus(groupid, "error")
219 sys.stdout.flush()
220 if exitIfTooManyErrors:
221 self.cleanJobGroup(groupid)
222 sys.exit(1)
223 else:
224 checkInterval = 60
225 nbTimeOuts = self._checkIfJobsTableAndJobsManagerInfoAreConsistent(nbTimeOuts, timeOutPerJob, groupid)
226
227
228 ## Submit a job to a queue and record it in job table.
229 #
230 # @param job a job instance
231 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
232 # @param checkInterval integer time laps in seconds between two checks (default = 30)
233 # @param verbose integer (default = 0)
234 #
235 def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30):
236 if self.getJobStatus(job) in ["waiting", "running", "finished"]:
237 sys.stderr.write( "WARNING: job '%s' was already submitted\n" % job.jobname)
238 sys.stderr.flush()
239 self.cleanJobGroup(job.groupid)
240 sys.exit(1)
241
242 while self.getCountStatus(job.groupid, "waiting") > maxNbWaitingJobs:
243 time.sleep(checkInterval)
244
245 self.recordJob(job)
246 cmd = self._getQsubCommand(job)
247 returnStatus = os.system(cmd)
248
249 if returnStatus == 0:
250 fileName = "jobid.stdout"
251 jobidFileHandler = open(fileName, "r")
252 jobid = self._getJobidFromJobManager(jobidFileHandler)
253 if verbose > 0:
254 print "job '%i %s' submitted" % (jobid, job.jobname)
255 sys.stdout.flush()
256 job.jobid = jobid
257 jobidFileHandler.close()
258 self.updateJobIdInDB(job, jobid)
259 os.remove(fileName)
260 return returnStatus
261
262
263 ## Get the list of nodes where jobs of one group were executed
264 #
265 # @param groupid string a group identifier of job series
266 # @return lNodes list of nodes names without redundancy
267 #
268 def getNodesListByGroupId(self, groupId):
269 qry = "SELECT DISTINCT node FROM %s" % self._table
270 qry += " WHERE groupid='%s'" % groupId
271 self._iDb.execute(qry)
272 res = self._iDb.fetchall()
273 lNodes = []
274 for resTuple in res:
275 lNodes.append(resTuple[0])
276 return lNodes
277
278 def checkJobTable(self):
279 if not self._iDb.doesTableExist(self._table):
280 self._iDb.createTable(self._table, "jobs")
281 else:
282 lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"])
283 lObsFields = sorted(self._iDb.getFieldList(self._table))
284 if lExpFields != lObsFields:
285 self._iDb.createTable(self._table, "jobs", overwrite = True)
286
287 def close(self):
288 self._iDb.close()
289
290 def _getJobidAndNbJob(self, jobid) :
291 tab = jobid.split(".")
292 jobid = tab[0]
293 tab = tab[1].split(":")
294 nbJob = tab[0]
295 return jobid, nbJob
296
297 class TableJobAdaptatorSGE(TableJobAdaptator):
298
299 def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid):
300 # retrieve the date and time at which the oldest, still-running job was submitted
301 sql = "SELECT jobid,jobname,time FROM %s WHERE groupid='%s' AND status='running' ORDER BY time DESC LIMIT 1" % (self._table, groupid)
302 self._iDb.execute( sql )
303 res = self._iDb.fetchall()
304 if len(res) > 0:
305 jobid = res[0][0]
306 jobname = res[0][1]
307 dateTimeOldestJob = res[0][2]
308 dateTimeCurrent = datetime.datetime.now()
309 # delta is time between (i) first job launched of the given groupid and still in running state and (ii) current time
310 delta = dateTimeCurrent - dateTimeOldestJob
311 # check if delta is in an interval: 0 <= delta < 1h | 1h <= delta < 2h | 2h <= delta < 3h (timeOutPerJob = 1h)
312 if delta.seconds >= nbTimeOuts * timeOutPerJob and delta.seconds < (nbTimeOuts+1) * timeOutPerJob:
313 return nbTimeOuts
314 # delta outside the interval: go to next interval (time out)
315 if delta.seconds >= (nbTimeOuts+1) * timeOutPerJob:
316 nbTimeOuts += 1
317 # Job with 'running' status should be in qstat. Because status in DB is set at 'running' by the job launched.
318 if not self.isJobStillHandledBySge(jobid, jobname):
319 # But if not, let time for the status update (in DB), if the job finished between the query execution and now.
320 time.sleep( 5 )
321 # If no update at 'finished', exit
322 #TODO: check status in DB
323 if not self.isJobStillHandledBySge(jobid, jobname):
324 msg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore" % ( jobid )
325 msg += "\nit was launched the %s (> %.2f hours ago)" % ( dateTimeOldestJob, timeOutPerJob/3600.0 )
326 msg += "\nthis problem can be due to:"
327 msg += "\n* memory shortage, in that case, decrease the size of your jobs;"
328 msg += "\n* timeout, in that case, decrease the size of your jobs;"
329 msg += "\n* node failure or database error, in that case, launch the program again or ask your system administrator."
330 sys.stderr.write("%s\n" % msg)
331 sys.stderr.flush()
332 self.cleanJobGroup(groupid)
333 sys.exit(1)
334 return nbTimeOuts
335
336 ## Check if a job is still handled by SGE
337 #
338 # @param jobid string job identifier
339 # @param jobname string job name
340 #
341 def isJobStillHandledBySge(self, jobid, jobname):
342 isJobInQstat = False
343 qstatFile = "qstat_stdout"
344 cmd = "qstat > %s" % qstatFile
345 returnStatus = os.system(cmd)
346 if returnStatus != 0:
347 msg = "ERROR while launching 'qstat'"
348 sys.stderr.write( "%s\n" % msg )
349 sys.exit(1)
350 qstatFileHandler = open(qstatFile, "r")
351 lLines = qstatFileHandler.readlines()
352 for line in lLines:
353 tokens = line.split()
354 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
355 isJobInQstat = True
356 break
357 qstatFileHandler.close()
358 os.remove(qstatFile)
359 return isJobInQstat
360
361 def _getQsubCommand(self, job):
362 cmd = "echo '%s' | " % job.launcher
363 prg = "qsub"
364 cmd += prg
365 cmd += " -V"
366 cmd += " -N %s" % job.jobname
367 if job.queue != "":
368 cmd += " -q %s" % job.queue
369 cmd += " -cwd"
370 if job.lResources != []:
371 cmd += " -l \""
372 cmd += " ".join(job.lResources)
373 cmd += "\""
374 if job.parallelEnvironment != "":
375 cmd += " -pe " + job.parallelEnvironment
376 cmd += " > jobid.stdout"
377 return cmd
378
379 def _getJobidFromJobManager(self, jobidFileHandler):
380 return int(jobidFileHandler.readline().split(" ")[2])
381
382
383 class TableJobAdaptatorTorque(TableJobAdaptator):
384
385 def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid):
386 return nbTimeOuts
387
388 def _getQsubCommand(self, job):
389 cmd = "echo '%s' | " % job.launcher
390 prg = "qsub"
391 cmd += prg
392 cmd += " -V"
393 cmd += " -d %s" % os.getcwd()
394 cmd += " -N %s" % job.jobname
395 if job.queue != "":
396 cmd += " -q %s" % job.queue
397 if job.lResources != []:
398 cmd += " -l \""
399 cmd += " ".join(job.lResources).replace("mem_free","mem")
400 cmd += "\""
401 cmd += " > jobid.stdout"
402 return cmd
403
404 def _getJobidFromJobManager(self, jobidFileHandler):
405 return int(jobidFileHandler.readline().split(".")[0])