comparison commons/core/launcher/Launcher2.py @ 31:0ab839023fe4

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 14:33:21 -0400
parents 94ab73e8a190
children
comparison
equal deleted inserted replaced
30:5677346472b5 31:0ab839023fe4
1 from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet
2 from commons.core.stat.Stat import Stat
3 from commons.core.launcher.WriteScript import WriteScript
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
5 from commons.core.sql.Job import Job
6 import stat
7 import os
8 import re
9 import sys
10 import time
11 import glob
12
13 class LauncherParameter(object):
14
15 def __init__(self, jobDB):
16 self._jobDB = jobDB
17
18 def getJobDB(self):
19 return self._jobDB
20
21 def setQuery(self, query):
22 self._query = query
23
24 def setSubject(self, subject):
25 self._subject = subject
26
27 def setParam(self, param):
28 self._param = param
29
30 def setCurrentDir(self, currentDir):
31 self._currentDir = currentDir
32
33 def getCurrentDir(self):
34 return self._currentDir
35
36 def setTempDir(self, tempDir):
37 self._tempDir = tempDir
38
39 def getTempDir(self):
40 return self._tempDir
41
42 def setJobTable(self, jobTable):
43 self._jobTable = jobTable
44
45 def setQueue(self, queue):
46 self._queue = queue
47
48 def getQueue(self):
49 return self._queue
50
51 def setGroupId(self, groupId):
52 self._groupId = groupId
53
54 def getGroupId(self):
55 return self._groupId
56
57 def setAcronym(self, acronym):
58 self._acronym = acronym
59
60 def getAcronym(self):
61 return self._acronym
62
63 @staticmethod
64 def createParameter(jobdb, groupid, acronym):
65 launcherParameter = LauncherParameter(jobdb)
66 launcherParameter.setQuery(os.getcwd())
67 launcherParameter.setSubject("")
68 launcherParameter.setParam("")
69 launcherParameter.setCurrentDir(os.getcwd())
70 launcherParameter.setTempDir(os.getcwd())
71 launcherParameter.setJobTable("")
72 launcherParameter.setQueue("")
73 launcherParameter.setGroupId(groupid)
74 launcherParameter.setAcronym(acronym)
75 return launcherParameter
76
77
78 class Launcher2(object):
79
80 #TODO: remove unused parameters : query="", subject="", param="", job_table=""
81 def __init__(self, iLauncherParameter):
82 jobdb = iLauncherParameter.getJobDB()
83 cdir = iLauncherParameter.getCurrentDir()
84 if jobdb.__class__.__name__ == "RepetJob":
85 self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs")
86 else:
87 self.jobdb = jobdb
88 self.jobdb.checkJobTable()
89 if cdir == "":
90 cdir = os.getcwd()
91 self.cdir = cdir
92 self.tmpdir = iLauncherParameter.getTempDir()
93 self.groupid = iLauncherParameter.getGroupId()
94 self.acronyme = iLauncherParameter.getAcronym()
95 self._chooseTemplateWithCopy = False
96 self._chooseTemplateLight = False
97 self.queue, self.lResources = self.getQueueNameAndResources(iLauncherParameter.getQueue())
98 self._createJobInstance()
99 self._nbJobs = 0
100
101 def getQueueNameAndResources(self, configQueue):
102 tokens = configQueue.replace("'","").split(" ")
103 queueName = ""
104 lResources = []
105 if tokens[0] != "":
106 if re.match(".*\.q", tokens[0]):
107 queueName = tokens[0]
108 lResources = tokens[1:]
109 else:
110 lResources = tokens
111 return queueName, lResources
112
113 def createGroupidIfItNotExist(self):
114 if self.groupid == "":
115 self.job.groupid = str(os.getpid())
116 else:
117 self.job.groupid = self.groupid
118
119 def beginRun( self ):
120 self.createGroupidIfItNotExist()
121 if self.jobdb.hasUnfinishedJob(self.job.groupid):
122 self.jobdb.waitJobGroup(self.job.groupid)
123 else:
124 self.jobdb.cleanJobGroup(self.job.groupid)
125
126 ## Launch one job in parallel
127 #
128 # @param cmdStart string command-line for the job to be launched
129 # @param cmdFinish string command to retrieve result files
130 # @warning the jobname has to be defined outside from this method
131 #
132 def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""):
133 if self._nbJobs == 0:
134 self._nbJobs = 1
135 pid = str(os.getpid())
136 now = time.localtime()
137 #TODO: rename ClusterLauncher_ ...
138 pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\
139 self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\
140 "-" + str(now[2]) + "_" + pid + ".py"
141 self.job.launcher = pyFileName
142
143 #TODO: to remove when refactoring is done
144 cmdStart = self._indentCmd(cmdStart)
145 cmdFinish = self._indentCmd(cmdFinish)
146
147 iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight)
148 iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
149 os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH)
150 sys.stdout.flush()
151 log = self.jobdb.submitJob(self.job)
152 if log != 0:
153 print "ERROR while submitting job to the cluster"
154 sys.exit(1)
155
156 def endRun(self, cleanNodes = False):
157 string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
158 print string; sys.stdout.flush()
159 self.jobdb.waitJobGroup(self.job.groupid)
160 if self._nbJobs > 1:
161 string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
162 print string; sys.stdout.flush()
163
164 if cleanNodes:
165 string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
166 print string; sys.stdout.flush()
167 self.cleanNodes()
168 string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
169 print string; sys.stdout.flush()
170
171 statsExecutionTime = self.getStatsOfExecutionTime()
172 if self._nbJobs > 1:
173 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
174 print "execution time per job: %s" % statsExecutionTime.string()
175 sys.stdout.flush()
176 self.jobdb.cleanJobGroup(self.job.groupid)
177
178 def getStatsOfExecutionTime(self, acronyme = ""):
179 stat = Stat()
180 if acronyme == "":
181 pattern = "%s*.o*" % self.acronyme
182 else:
183 pattern = "%s*.o*" % acronyme
184 lJobFiles = glob.glob(pattern)
185 for f in lJobFiles:
186 fH = open(f, "r")
187 while True:
188 line = fH.readline()
189 if line == "":
190 break
191 if "executionTime" in line:
192 stat.add( float(line[:-1].split("=")[1] ) )
193 break
194 fH.close()
195 return stat
196
197 def clean( self, acronyme = "", stdout = True, stderr = True ):
198 lFileToRemove = []
199 if acronyme == "":
200 acronyme = self.acronyme
201 pattern = "ClusterLauncher*%s*.py" % ( acronyme )
202 lFileToRemove.extend(glob.glob( pattern ))
203 if stdout:
204 pattern = "%s*.o*" % ( acronyme )
205 lFileToRemove.extend(glob.glob( pattern ))
206 if stderr:
207 pattern = "%s*.e*" % ( acronyme )
208 lFileToRemove.extend(glob.glob( pattern ))
209 for file in lFileToRemove:
210 os.remove(file)
211
212 #TODO: handle of nodesMustBeCleaned => class attribute ?
213 def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False):
214 self.beginRun()
215 print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
216 for cmdsTuple in lCmdsTuples:
217 self._nbJobs += 1
218 self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs)
219 self.job.jobname = self.acronyme
220 if len(cmdsTuple) == 2:
221 self.runSingleJob(cmdsTuple[0], cmdsTuple[1])
222 else:
223 self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3])
224 self._createJobInstance()
225 self.createGroupidIfItNotExist()
226 self.acronyme = acronymPrefix
227 self.endRun(nodesMustBeCleaned)
228 if cleanMustBeDone:
229 self.clean("%s_" % acronymPrefix)
230 self.jobdb.close()
231
232 def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
233 cmdStart = ""
234 for cmd in lCmdStart:
235 cmdStart += "%s\n\t" % cmd
236 for cmd in lCmds:
237 cmdStart += "%s\n\t" % cmd
238 cmdFinish = ""
239 for cmd in lCmdFinish:
240 cmdFinish += "%s\n\t" % cmd
241 cmdSize = ""
242 for cmd in lCmdSize:
243 cmdSize += "%s\n\t\t" % cmd
244 cmdCopy = ""
245 for cmd in lCmdCopy:
246 cmdCopy += "%s\n\t\t" % cmd
247 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
248
249 #TODO: to remove when refactoring is done
250 def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
251 cmdStart = ""
252 for cmd in lCmdStart:
253 cmdStart += "%s\n" % cmd
254 for cmd in lCmds:
255 cmdStart += "%s\n" % cmd
256 cmdFinish = ""
257 for cmd in lCmdFinish:
258 cmdFinish += "%s\n" % cmd
259 cmdSize = ""
260 for cmd in lCmdSize:
261 cmdSize += "%s\n\t\t" % cmd
262 cmdCopy = ""
263 for cmd in lCmdCopy:
264 cmdCopy += "%s\n\t\t" % cmd
265 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
266
267 def getSystemCommand(self, prg, lArgs):
268 systemCmd = "log = os.system(\"" + prg
269 for arg in lArgs:
270 systemCmd += " " + arg
271 systemCmd += "\")"
272 return systemCmd
273
274 def cleanNodes(self):
275 iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet()
276 iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid))
277 iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir)
278 iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid)
279 iCleanClusterNodeAfterRepet.run()
280
281 #TODO: to remove when refactoring is done
282 def _indentCmd(self, cmd):
283 lCmd = cmd.split("\n")
284 cmd_Tab = "%s\n" % lCmd[0]
285 for line in lCmd[1:-1]:
286 cmd_Tab += "\t%s\n" % line
287 return cmd_Tab
288
289 def _createJobInstance(self):
290 if self.lResources == []:
291 #To have mem_free=1G:
292 self.job = Job(queue=self.queue)
293 else:
294 self.job = Job(queue=self.queue, lResources=self.lResources)