comparison smart_toolShed/commons/core/launcher/Launcher.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e0f8dcca02ed
1 from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet
2 from commons.core.stat.Stat import Stat
3 from commons.core.launcher.WriteScript import WriteScript
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
5 from commons.core.sql.Job import Job
6 import stat
7 import os
8 import re
9 import sys
10 import time
11 import glob
12
13 class Launcher(object):
14
15 #TODO: remove unused parameters : query="", subject="", param="", job_table=""
16 def __init__( self, jobdb, query="", subject="", param="", cdir="",
17 tmpdir="", job_table="", queue="", groupid="", acro="X",
18 chooseTemplateWithCopy = False, chooseTemplateLight = False):
19 if jobdb.__class__.__name__ == "RepetJob":
20 self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs")
21 else:
22 self.jobdb = jobdb
23 self.jobdb.checkJobTable()
24 if cdir == "":
25 cdir = os.getcwd()
26 self.cdir = cdir
27 self.tmpdir = tmpdir
28 self.groupid = groupid
29 self.acronyme = acro
30 self._chooseTemplateWithCopy = chooseTemplateWithCopy
31 self._chooseTemplateLight = chooseTemplateLight
32 self.queue, self.lResources = self.getQueueNameAndResources(queue)
33 self._createJobInstance()
34 self._nbJobs = 0
35
36 def getQueueNameAndResources(self, configQueue):
37 tokens = configQueue.replace("'","").split(" ")
38 queueName = ""
39 lResources = []
40 if tokens[0] != "":
41 if re.match(".*\.q", tokens[0]):
42 queueName = tokens[0]
43 lResources = tokens[1:]
44 else:
45 lResources = tokens
46 return queueName, lResources
47
48 def createGroupidIfItNotExist(self):
49 if self.groupid == "":
50 self.job.groupid = str(os.getpid())
51 else:
52 self.job.groupid = self.groupid
53
54 def beginRun( self ):
55 self.createGroupidIfItNotExist()
56 if self.jobdb.hasUnfinishedJob(self.job.groupid):
57 self.jobdb.waitJobGroup(self.job.groupid)
58 else:
59 self.jobdb.cleanJobGroup(self.job.groupid)
60
61 ## Launch one job in parallel
62 #
63 # @param cmdStart string command-line for the job to be launched
64 # @param cmdFinish string command to retrieve result files
65 # @warning the jobname has to be defined outside from this method
66 #
67 def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""):
68 if self._nbJobs == 0:
69 self._nbJobs = 1
70 pid = str(os.getpid())
71 now = time.localtime()
72 #TODO: rename ClusterLauncher_ ...
73 pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\
74 self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\
75 "-" + str(now[2]) + "_" + pid + ".py"
76 self.job.launcher = pyFileName
77
78 #TODO: to remove when refactoring is done
79 cmdStart = self._indentCmd(cmdStart)
80 cmdFinish = self._indentCmd(cmdFinish)
81
82 iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight)
83 iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
84 os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH)
85 sys.stdout.flush()
86 log = self.jobdb.submitJob(self.job)
87 if log != 0:
88 print "ERROR while submitting job to the cluster"
89 sys.exit(1)
90
91 def endRun(self, cleanNodes = False):
92 string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
93 print string; sys.stdout.flush()
94 self.jobdb.waitJobGroup(self.job.groupid)
95 if self._nbJobs > 1:
96 string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
97 print string; sys.stdout.flush()
98
99 if cleanNodes:
100 string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
101 print string; sys.stdout.flush()
102 self.cleanNodes()
103 string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
104 print string; sys.stdout.flush()
105
106 statsExecutionTime = self.getStatsOfExecutionTime()
107 if self._nbJobs > 1:
108 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
109 print "execution time per job: %s" % statsExecutionTime.string()
110 sys.stdout.flush()
111 self.jobdb.cleanJobGroup(self.job.groupid)
112
113 def getStatsOfExecutionTime(self, acronyme = ""):
114 stat = Stat()
115 if acronyme == "":
116 pattern = "%s*.o*" % self.acronyme
117 else:
118 pattern = "%s*.o*" % acronyme
119 lJobFiles = glob.glob(pattern)
120 for f in lJobFiles:
121 fH = open(f, "r")
122 while True:
123 line = fH.readline()
124 if line == "":
125 break
126 if "executionTime" in line:
127 stat.add( float(line[:-1].split("=")[1] ) )
128 break
129 fH.close()
130 return stat
131
132 def clean( self, acronyme = "", stdout = True, stderr = True ):
133 lFileToRemove = []
134 if acronyme == "":
135 acronyme = self.acronyme
136 pattern = "ClusterLauncher*%s*.py" % ( acronyme )
137 lFileToRemove.extend(glob.glob( pattern ))
138 if stdout:
139 pattern = "%s*.o*" % ( acronyme )
140 lFileToRemove.extend(glob.glob( pattern ))
141 if stderr:
142 pattern = "%s*.e*" % ( acronyme )
143 lFileToRemove.extend(glob.glob( pattern ))
144 for file in lFileToRemove:
145 os.remove(file)
146
147 #TODO: handle of nodesMustBeCleaned => class attribute ?
148 def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False):
149 self.beginRun()
150 print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
151 for cmdsTuple in lCmdsTuples:
152 self._nbJobs += 1
153 self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs)
154 self.job.jobname = self.acronyme
155 if len(cmdsTuple) == 2:
156 self.runSingleJob(cmdsTuple[0], cmdsTuple[1])
157 else:
158 self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3])
159 self._createJobInstance()
160 self.createGroupidIfItNotExist()
161 self.acronyme = acronymPrefix
162 self.endRun(nodesMustBeCleaned)
163 if cleanMustBeDone:
164 self.clean("%s_" % acronymPrefix)
165 self.jobdb.close()
166
167 def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
168 cmdStart = ""
169 for cmd in lCmdStart:
170 cmdStart += "%s\n\t" % cmd
171 for cmd in lCmds:
172 cmdStart += "%s\n\t" % cmd
173 cmdFinish = ""
174 for cmd in lCmdFinish:
175 cmdFinish += "%s\n\t" % cmd
176 cmdSize = ""
177 for cmd in lCmdSize:
178 cmdSize += "%s\n\t\t" % cmd
179 cmdCopy = ""
180 for cmd in lCmdCopy:
181 cmdCopy += "%s\n\t\t" % cmd
182 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
183
184 #TODO: to remove when refactoring is done
185 def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
186 cmdStart = ""
187 for cmd in lCmdStart:
188 cmdStart += "%s\n" % cmd
189 for cmd in lCmds:
190 cmdStart += "%s\n" % cmd
191 cmdFinish = ""
192 for cmd in lCmdFinish:
193 cmdFinish += "%s\n" % cmd
194 cmdSize = ""
195 for cmd in lCmdSize:
196 cmdSize += "%s\n\t\t" % cmd
197 cmdCopy = ""
198 for cmd in lCmdCopy:
199 cmdCopy += "%s\n\t\t" % cmd
200 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
201
202 def getSystemCommand(self, prg, lArgs):
203 systemCmd = "log = os.system(\"" + prg
204 for arg in lArgs:
205 systemCmd += " " + arg
206 systemCmd += "\")"
207 return systemCmd
208
209 def cleanNodes(self):
210 iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet()
211 iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid))
212 iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir)
213 iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid)
214 iCleanClusterNodeAfterRepet.run()
215
216 #TODO: to remove when refactoring is done
217 def _indentCmd(self, cmd):
218 lCmd = cmd.split("\n")
219 cmd_Tab = "%s\n" % lCmd[0]
220 for line in lCmd[1:-1]:
221 cmd_Tab += "\t%s\n" % line
222 return cmd_Tab
223
224 def _createJobInstance(self):
225 if self.lResources == []:
226 #To have mem_free=1G:
227 self.job = Job(queue=self.queue)
228 else:
229 self.job = Job(queue=self.queue, lResources=self.lResources)