Mercurial > repos > yufei-luo > s_mart
comparison smart_toolShed/commons/core/launcher/Launcher.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e0f8dcca02ed |
---|---|
1 from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet | |
2 from commons.core.stat.Stat import Stat | |
3 from commons.core.launcher.WriteScript import WriteScript | |
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory | |
5 from commons.core.sql.Job import Job | |
6 import stat | |
7 import os | |
8 import re | |
9 import sys | |
10 import time | |
11 import glob | |
12 | |
13 class Launcher(object): | |
14 | |
15 #TODO: remove unused parameters : query="", subject="", param="", job_table="" | |
16 def __init__( self, jobdb, query="", subject="", param="", cdir="", | |
17 tmpdir="", job_table="", queue="", groupid="", acro="X", | |
18 chooseTemplateWithCopy = False, chooseTemplateLight = False): | |
19 if jobdb.__class__.__name__ == "RepetJob": | |
20 self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs") | |
21 else: | |
22 self.jobdb = jobdb | |
23 self.jobdb.checkJobTable() | |
24 if cdir == "": | |
25 cdir = os.getcwd() | |
26 self.cdir = cdir | |
27 self.tmpdir = tmpdir | |
28 self.groupid = groupid | |
29 self.acronyme = acro | |
30 self._chooseTemplateWithCopy = chooseTemplateWithCopy | |
31 self._chooseTemplateLight = chooseTemplateLight | |
32 self.queue, self.lResources = self.getQueueNameAndResources(queue) | |
33 self._createJobInstance() | |
34 self._nbJobs = 0 | |
35 | |
36 def getQueueNameAndResources(self, configQueue): | |
37 tokens = configQueue.replace("'","").split(" ") | |
38 queueName = "" | |
39 lResources = [] | |
40 if tokens[0] != "": | |
41 if re.match(".*\.q", tokens[0]): | |
42 queueName = tokens[0] | |
43 lResources = tokens[1:] | |
44 else: | |
45 lResources = tokens | |
46 return queueName, lResources | |
47 | |
48 def createGroupidIfItNotExist(self): | |
49 if self.groupid == "": | |
50 self.job.groupid = str(os.getpid()) | |
51 else: | |
52 self.job.groupid = self.groupid | |
53 | |
54 def beginRun( self ): | |
55 self.createGroupidIfItNotExist() | |
56 if self.jobdb.hasUnfinishedJob(self.job.groupid): | |
57 self.jobdb.waitJobGroup(self.job.groupid) | |
58 else: | |
59 self.jobdb.cleanJobGroup(self.job.groupid) | |
60 | |
61 ## Launch one job in parallel | |
62 # | |
63 # @param cmdStart string command-line for the job to be launched | |
64 # @param cmdFinish string command to retrieve result files | |
65 # @warning the jobname has to be defined outside from this method | |
66 # | |
67 def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""): | |
68 if self._nbJobs == 0: | |
69 self._nbJobs = 1 | |
70 pid = str(os.getpid()) | |
71 now = time.localtime() | |
72 #TODO: rename ClusterLauncher_ ... | |
73 pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\ | |
74 self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\ | |
75 "-" + str(now[2]) + "_" + pid + ".py" | |
76 self.job.launcher = pyFileName | |
77 | |
78 #TODO: to remove when refactoring is done | |
79 cmdStart = self._indentCmd(cmdStart) | |
80 cmdFinish = self._indentCmd(cmdFinish) | |
81 | |
82 iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight) | |
83 iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy) | |
84 os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH) | |
85 sys.stdout.flush() | |
86 log = self.jobdb.submitJob(self.job) | |
87 if log != 0: | |
88 print "ERROR while submitting job to the cluster" | |
89 sys.exit(1) | |
90 | |
91 def endRun(self, cleanNodes = False): | |
92 string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) | |
93 print string; sys.stdout.flush() | |
94 self.jobdb.waitJobGroup(self.job.groupid) | |
95 if self._nbJobs > 1: | |
96 string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) | |
97 print string; sys.stdout.flush() | |
98 | |
99 if cleanNodes: | |
100 string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S") | |
101 print string; sys.stdout.flush() | |
102 self.cleanNodes() | |
103 string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S") | |
104 print string; sys.stdout.flush() | |
105 | |
106 statsExecutionTime = self.getStatsOfExecutionTime() | |
107 if self._nbJobs > 1: | |
108 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum() | |
109 print "execution time per job: %s" % statsExecutionTime.string() | |
110 sys.stdout.flush() | |
111 self.jobdb.cleanJobGroup(self.job.groupid) | |
112 | |
113 def getStatsOfExecutionTime(self, acronyme = ""): | |
114 stat = Stat() | |
115 if acronyme == "": | |
116 pattern = "%s*.o*" % self.acronyme | |
117 else: | |
118 pattern = "%s*.o*" % acronyme | |
119 lJobFiles = glob.glob(pattern) | |
120 for f in lJobFiles: | |
121 fH = open(f, "r") | |
122 while True: | |
123 line = fH.readline() | |
124 if line == "": | |
125 break | |
126 if "executionTime" in line: | |
127 stat.add( float(line[:-1].split("=")[1] ) ) | |
128 break | |
129 fH.close() | |
130 return stat | |
131 | |
132 def clean( self, acronyme = "", stdout = True, stderr = True ): | |
133 lFileToRemove = [] | |
134 if acronyme == "": | |
135 acronyme = self.acronyme | |
136 pattern = "ClusterLauncher*%s*.py" % ( acronyme ) | |
137 lFileToRemove.extend(glob.glob( pattern )) | |
138 if stdout: | |
139 pattern = "%s*.o*" % ( acronyme ) | |
140 lFileToRemove.extend(glob.glob( pattern )) | |
141 if stderr: | |
142 pattern = "%s*.e*" % ( acronyme ) | |
143 lFileToRemove.extend(glob.glob( pattern )) | |
144 for file in lFileToRemove: | |
145 os.remove(file) | |
146 | |
147 #TODO: handle of nodesMustBeCleaned => class attribute ? | |
148 def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False): | |
149 self.beginRun() | |
150 print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) | |
151 for cmdsTuple in lCmdsTuples: | |
152 self._nbJobs += 1 | |
153 self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs) | |
154 self.job.jobname = self.acronyme | |
155 if len(cmdsTuple) == 2: | |
156 self.runSingleJob(cmdsTuple[0], cmdsTuple[1]) | |
157 else: | |
158 self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3]) | |
159 self._createJobInstance() | |
160 self.createGroupidIfItNotExist() | |
161 self.acronyme = acronymPrefix | |
162 self.endRun(nodesMustBeCleaned) | |
163 if cleanMustBeDone: | |
164 self.clean("%s_" % acronymPrefix) | |
165 self.jobdb.close() | |
166 | |
167 def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []): | |
168 cmdStart = "" | |
169 for cmd in lCmdStart: | |
170 cmdStart += "%s\n\t" % cmd | |
171 for cmd in lCmds: | |
172 cmdStart += "%s\n\t" % cmd | |
173 cmdFinish = "" | |
174 for cmd in lCmdFinish: | |
175 cmdFinish += "%s\n\t" % cmd | |
176 cmdSize = "" | |
177 for cmd in lCmdSize: | |
178 cmdSize += "%s\n\t\t" % cmd | |
179 cmdCopy = "" | |
180 for cmd in lCmdCopy: | |
181 cmdCopy += "%s\n\t\t" % cmd | |
182 return (cmdStart, cmdFinish, cmdSize, cmdCopy) | |
183 | |
184 #TODO: to remove when refactoring is done | |
185 def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []): | |
186 cmdStart = "" | |
187 for cmd in lCmdStart: | |
188 cmdStart += "%s\n" % cmd | |
189 for cmd in lCmds: | |
190 cmdStart += "%s\n" % cmd | |
191 cmdFinish = "" | |
192 for cmd in lCmdFinish: | |
193 cmdFinish += "%s\n" % cmd | |
194 cmdSize = "" | |
195 for cmd in lCmdSize: | |
196 cmdSize += "%s\n\t\t" % cmd | |
197 cmdCopy = "" | |
198 for cmd in lCmdCopy: | |
199 cmdCopy += "%s\n\t\t" % cmd | |
200 return (cmdStart, cmdFinish, cmdSize, cmdCopy) | |
201 | |
202 def getSystemCommand(self, prg, lArgs): | |
203 systemCmd = "log = os.system(\"" + prg | |
204 for arg in lArgs: | |
205 systemCmd += " " + arg | |
206 systemCmd += "\")" | |
207 return systemCmd | |
208 | |
209 def cleanNodes(self): | |
210 iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet() | |
211 iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid)) | |
212 iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir) | |
213 iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid) | |
214 iCleanClusterNodeAfterRepet.run() | |
215 | |
216 #TODO: to remove when refactoring is done | |
217 def _indentCmd(self, cmd): | |
218 lCmd = cmd.split("\n") | |
219 cmd_Tab = "%s\n" % lCmd[0] | |
220 for line in lCmd[1:-1]: | |
221 cmd_Tab += "\t%s\n" % line | |
222 return cmd_Tab | |
223 | |
224 def _createJobInstance(self): | |
225 if self.lResources == []: | |
226 #To have mem_free=1G: | |
227 self.job = Job(queue=self.queue) | |
228 else: | |
229 self.job = Job(queue=self.queue, lResources=self.lResources) |