18
|
1 #!/usr/bin/env python
|
|
2
|
|
3 # Copyright INRA (Institut National de la Recherche Agronomique)
|
|
4 # http://www.inra.fr
|
|
5 # http://urgi.versailles.inra.fr
|
|
6 #
|
|
7 # This software is governed by the CeCILL license under French law and
|
|
8 # abiding by the rules of distribution of free software. You can use,
|
|
9 # modify and/ or redistribute the software under the terms of the CeCILL
|
|
10 # license as circulated by CEA, CNRS and INRIA at the following URL
|
|
11 # "http://www.cecill.info".
|
|
12 #
|
|
13 # As a counterpart to the access to the source code and rights to copy,
|
|
14 # modify and redistribute granted by the license, users are provided only
|
|
15 # with a limited warranty and the software's author, the holder of the
|
|
16 # economic rights, and the successive licensors have only limited
|
|
17 # liability.
|
|
18 #
|
|
19 # In this respect, the user's attention is drawn to the risks associated
|
|
20 # with loading, using, modifying and/or developing or reproducing the
|
|
21 # software by the user in light of its specific status of free software,
|
|
22 # that may mean that it is complicated to manipulate, and that also
|
|
23 # therefore means that it is reserved for developers and experienced
|
|
24 # professionals having in-depth computer knowledge. Users are therefore
|
|
25 # encouraged to load and test the software's suitability as regards their
|
|
26 # requirements in conditions enabling the security of their systems and/or
|
|
27 # data to be ensured and, more generally, to use and operate it in the
|
|
28 # same conditions as regards security.
|
|
29 #
|
|
30 # The fact that you are presently reading this means that you have had
|
|
31 # knowledge of the CeCILL license and that you accept its terms.
|
|
32
|
|
33 import getopt
|
|
34 import time
|
|
35 import glob
|
|
36 import sys
|
|
37 import os
|
|
38
|
|
39 from commons.core.checker.CheckerException import CheckerException
|
|
40 from commons.core.sql.RepetJob import RepetJob
|
|
41 from commons.core.sql.Job import Job
|
|
42 from commons.core.stat.Stat import Stat
|
|
43 from pyRepet.launcher.AbstractProgramLauncher import AbstractProgramLauncher
|
|
44
|
|
45 GENERIC_IN_FILE = "zDUMMYz"
|
|
46
|
|
47
|
|
48 ## Abstract class to launch a program in parallel on a cluster.
|
|
49 #
|
|
50 class AbstractClusterLauncher( object ): #( IClusterLauncher )
|
|
51
|
|
52 def __init__( self ):
|
|
53 """
|
|
54 Constructor.
|
|
55 """
|
|
56 self._inputDir = "" # path to the directory with input files
|
|
57 self._queueName = "" # name of the queue on the cluster
|
|
58 self._groupId = "" # identifier of the group of jobs (groupid)
|
|
59 self._inputFileSuffix = "fa" # suffix of the input files (default='fa')
|
|
60 self._prgAcronym = "" # acronym of the program to launch
|
|
61 self._configFile = "" # name of the configuration file (connect to MySQL)
|
|
62 self._currentDir = os.getcwd() # path to the current directory
|
|
63 self._tmpDir = "" # path to the temporary directory
|
|
64 self._jobTable = "jobs" # name of the table recording the jobs
|
|
65 self._catOutFiles = False # concatenate output files of all jobs
|
|
66 self._clean = False # clean job file, job stdout, job table...
|
|
67 self._verbose = 1 # verbosity level
|
|
68 self.jobdb = None # RepetJob instance
|
|
69 self.job = Job() # Job instance
|
|
70
|
|
71 self._nbJobs = 0
|
|
72 self._cmdLineGenericOptions = "hi:Q:g:S:a:C:d:j:Zcv:"
|
|
73 self._cmdLineSpecificOptions = ""
|
|
74
|
|
75 self._exeWrapper = "AbstractProgramLauncher.py"
|
|
76 self._prgLauncher = None
|
|
77 # list of instances derived from AbstractProgramLauncher()
|
|
78 # If several program are launched successively in the same job,
|
|
79 # 'lPrgLaunchers' has to be filled before run().
|
|
80 self.lPrgLaunchers = []
|
|
81
|
|
82 def setProgramLauncherAttributeFromCmdLine(self, o, a=""):
|
|
83 self.getProgramLauncherInstance().setASpecificAttributeFromCmdLine(o, a)
|
|
84
|
|
85 def setClusterLauncherAttributeFromCmdLine(self, o, a=""):
|
|
86 if o == "-h":
|
|
87 print self.getHelpAsString()
|
|
88 sys.exit(0)
|
|
89 elif o == "-i":
|
|
90 self.setInputDirectory(a)
|
|
91 elif o == "-Q":
|
|
92 self.setQueueName(a)
|
|
93 elif o == "-g":
|
|
94 self.setGroupIdentifier(a)
|
|
95 elif o == "-S":
|
|
96 self.setInputFileSuffix(a)
|
|
97 elif o == "-a":
|
|
98 self.setAcronym(a)
|
|
99 elif o == "-C":
|
|
100 self.setConfigFile(a)
|
|
101 elif o == "-d":
|
|
102 self.setTemporaryDirectory(a)
|
|
103 elif o == "-j":
|
|
104 self.setJobTableName(a)
|
|
105 elif o == "-Z":
|
|
106 self.setCatOutputFiles()
|
|
107 elif o == "-c":
|
|
108 self.setClean()
|
|
109 elif o == "-v":
|
|
110 self.setVerbosityLevel(a)
|
|
111
|
|
112 def setAttributesFromCmdLine(self):
|
|
113 try:
|
|
114 opts, args = getopt.getopt(sys.argv[1:], self.getCmdLineOptions())
|
|
115 except getopt.GetoptError, err:
|
|
116 print str(err);
|
|
117 print self.getHelpAsString()
|
|
118 sys.exit(1)
|
|
119 for o, a in opts:
|
|
120 self.setClusterLauncherAttributeFromCmdLine(o, a)
|
|
121 self.setProgramLauncherAttributeFromCmdLine(o, a)
|
|
122
|
|
123 def setAGenericAttributeFromCmdLine( self, o, a="" ):
|
|
124 self.setClusterLauncherAttributeFromCmdLine(o, a)
|
|
125
|
|
126 def setASpecificAttributeFromCmdLine( self, o, a="" ):
|
|
127 self.setProgramLauncherAttributeFromCmdLine(o, a)
|
|
128
|
|
129 def setInputDirectory( self, arg ):
|
|
130 self._inputDir = arg
|
|
131
|
|
132 def setQueueName( self, arg ):
|
|
133 self._queueName = arg
|
|
134
|
|
135 def setGroupIdentifier( self, arg ):
|
|
136 self._groupId = arg
|
|
137
|
|
138 def setInputFileSuffix( self, arg ):
|
|
139 self._inputFileSuffix = arg
|
|
140
|
|
141 def setAcronym( self, arg ):
|
|
142 self._prgAcronym = arg
|
|
143
|
|
144 def setConfigFile( self, arg ):
|
|
145 if os.path.dirname( arg ) == "":
|
|
146 self._configFile = "%s/%s" % ( os.getcwd(), arg )
|
|
147 else:
|
|
148 self._configFile = arg
|
|
149
|
|
150 def setCurrentDirectory( self, arg =os.getcwd()):
|
|
151 self._currentDir = arg
|
|
152
|
|
153 def setTemporaryDirectory( self, arg ):
|
|
154 self._tmpDir = arg
|
|
155
|
|
156 def setJobTableName( self, arg ):
|
|
157 self._jobTable = arg
|
|
158
|
|
159 def setCatOutputFiles( self ):
|
|
160 self._catOutFiles = True
|
|
161
|
|
162 def setClean( self):
|
|
163 self._clean = True
|
|
164
|
|
165 def setVerbosityLevel( self, arg ):
|
|
166 self._verbose = int(arg)
|
|
167
|
|
168 def setExecutableWrapper( self, arg = "AbstractProgramLauncher.py" ):
|
|
169 self._exeWrapper = arg
|
|
170
|
|
171 def setSingleProgramLauncher( self ):
|
|
172 """
|
|
173 Set the wrapper and program command-lines of the program launcher.
|
|
174 Append the program launcher to 'self.lPrgLaunchers'.
|
|
175 """
|
|
176 self.getProgramLauncherInstance().setWrapperCommandLine()
|
|
177 self.getProgramLauncherInstance().setProgramCommandLine()
|
|
178 self.lPrgLaunchers.append( self.getProgramLauncherInstance() )
|
|
179
|
|
180 def getGenericHelpAsString( self ):
|
|
181 string = ""
|
|
182 string += "usage: %s.py [options]" % (type(self).__name__ )
|
|
183 string += "\ngeneric options:"
|
|
184 string += "\n -h: this help"
|
|
185 string += "\n -i: directory with input files (absolute path)"
|
|
186 string += "\n -Q: name of the queue on the cluster"
|
|
187 string += "\n -g: identifier of the group of jobs (groupid)"
|
|
188 string += "\n -S: suffix of the input files (default='fa')"
|
|
189 string += "\n -a: acronym of the program to be launched (default='%s')" % ( self.getAcronym() )
|
|
190 string += "\n -C: configuration file to connect to MySQL (absolute path or in current dir)"
|
|
191 string += "\n -d: temporary directory (absolute path, default=None)"
|
|
192 string += "\n -j: table recording the jobs (default='jobs')"
|
|
193 string += "\n -c: clean the temporary data"
|
|
194 string += "\n -v: verbosity level (default=0/1/2)"
|
|
195 return string
|
|
196
|
|
197 def getSpecificHelpAsString( self ):
|
|
198 pass
|
|
199
|
|
200 def getHelpAsString(self):
|
|
201 return self.getGenericHelpAsString() + self.getSpecificHelpAsString()
|
|
202
|
|
203 def getInputDirectory( self ):
|
|
204 return self._inputDir
|
|
205
|
|
206 def getQueueName( self ):
|
|
207 return self._queueName
|
|
208
|
|
209 def getGroupIdentifier( self ):
|
|
210 return self._groupId
|
|
211
|
|
212 def getInputFileSuffix( self ):
|
|
213 return self._inputFileSuffix
|
|
214
|
|
215 def getAcronym( self ):
|
|
216 return self._prgAcronym
|
|
217
|
|
218 def getConfigFile( self ):
|
|
219 return self._configFile
|
|
220
|
|
221 def getCurrentDirectory( self ):
|
|
222 return self._currentDir
|
|
223
|
|
224 def getTemporaryDirectory( self ):
|
|
225 return self._tmpDir
|
|
226
|
|
227 def getJobTableName( self ):
|
|
228 return self._jobTable
|
|
229
|
|
230 def getCatOutputFiles( self ):
|
|
231 return self._catOutFiles
|
|
232
|
|
233 def getClean( self ):
|
|
234 return self._clean
|
|
235
|
|
236 def getVerbosityLevel( self ):
|
|
237 return self._verbose
|
|
238
|
|
239 def getWrapperName( self ):
|
|
240 return self.getProgramLauncherInstance().getWrapperName()
|
|
241
|
|
242 def getProgramName( self ):
|
|
243 return self.getProgramLauncherInstance().getProgramName()
|
|
244
|
|
245 def getPatternToConcatenate( self ):
|
|
246 return self.getProgramLauncherInstance().getOutputFile().replace( GENERIC_IN_FILE, "*" )
|
|
247
|
|
248 def getProgramLauncherInstance( self ):
|
|
249 if self._prgLauncher == None:
|
|
250 self._prgLauncher = AbstractProgramLauncher()
|
|
251 return self._prgLauncher
|
|
252
|
|
253 def getInputFilesList(self):
|
|
254 lInFiles = glob.glob("%s/*.%s" % (self._inputDir, self._inputFileSuffix))
|
|
255 return lInFiles
|
|
256
|
|
257 def getCmdLineOptions(self):
|
|
258 return "hi:Q:g:S:a:C:d:j:Zcv:"
|
|
259
|
|
260
|
|
261 def getProgramCommandLineAsString( self ):
|
|
262 """
|
|
263 Return the command-line to launch in each job.
|
|
264 Specified in each wrapper.
|
|
265 """
|
|
266 pass
|
|
267
|
|
268
|
|
269 def getListFilesToKeep( self ):
|
|
270 """
|
|
271 Return the list of files to keep at the end of each job.
|
|
272 Specified in each wrapper.
|
|
273 """
|
|
274 pass
|
|
275
|
|
276
|
|
277 def getListFilesToRemove( self ):
|
|
278 """
|
|
279 Return the list of files to remove at the end of each job.
|
|
280 Specified in each wrapper.
|
|
281 """
|
|
282 pass
|
|
283
|
|
284
|
|
285 def getJobFileNameAsString( self, count ):
|
|
286 """
|
|
287 Return the name of the job file as a string.
|
|
288 @param count: job number (e.g. '1') or '*'
|
|
289 @type count: integer or string
|
|
290 """
|
|
291 jobFile = "ClusterLauncher"
|
|
292 jobFile += "_groupid%s" % ( self.getGroupIdentifier() )
|
|
293 if count != "*":
|
|
294 jobFile += "_job%i" % ( count )
|
|
295 jobFile += "_time%s" % ( time.strftime("%Y-%m-%d-%H-%M-%S") )
|
|
296 else:
|
|
297 jobFile += "_job*"
|
|
298 jobFile += "_time%s-*" % ( time.strftime("%Y-%m") )
|
|
299 jobFile += ".py"
|
|
300 return jobFile
|
|
301
|
|
302
|
|
303 def getCmdUpdateJobStatusAsString( self, newStatus ):
|
|
304 """
|
|
305 Return the command to update the job status in the table.
|
|
306 """
|
|
307 prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py"
|
|
308 cmd = prg
|
|
309 cmd += " -t %s" % ( self.job.tablename )
|
|
310 if str(self.job.jobid).isdigit():
|
|
311 cmd += " -j %s" % ( self.job.jobname )
|
|
312 else:
|
|
313 cmd += " -j %s" % ( self.job.jobid )
|
|
314 cmd += " -g %s" % ( self.job.groupid )
|
|
315 if self.job.queue != "":
|
|
316 cmd += " -q %s" % ( self.job.queue )
|
|
317 cmd += " -s %s" % ( newStatus )
|
|
318 cmd += " -c %s" % ( self.getConfigFile() )
|
|
319 cmd += " -v %i" % ( self._verbose )
|
|
320 return "os.system( \"%s\" )\n" % ( cmd )
|
|
321
|
|
322
|
|
323 def getCmdToLaunchWrapper( self, fileName, genericCmd, exeWrapper ):
|
|
324 """
|
|
325 Return the launching command as a string.
|
|
326 Launch the wrapper, retrieve its exit status, update status if error.
|
|
327 """
|
|
328 specificCmd = genericCmd.replace( GENERIC_IN_FILE, fileName )
|
|
329 cmd = ""
|
|
330 cmd += "print \"LAUNCH: %s\"\n" % ( specificCmd )
|
|
331 cmd += "sys.stdout.flush()\n"
|
|
332 cmd += "exitStatus = os.system ( \"%s\" )\n" % ( specificCmd )
|
|
333 cmd += "if exitStatus != 0:\n"
|
|
334 cmd += "\tprint \"ERROR: wrapper '%s'" % ( exeWrapper )
|
|
335 cmd += " returned exit status '%i'\" % ( exitStatus )\n"
|
|
336 cmd += "\tos.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() )
|
|
337 cmd += "\tshutil.move( newDir, '%s' )\n" % ( self.getCurrentDirectory() )
|
|
338 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
|
|
339 cmd += "\tsys.exit(1)\n"
|
|
340 return cmd
|
|
341
|
|
342
|
|
343 def getCmdToKeepFiles( self, fileName, lFilesToKeep ):
|
|
344 """
|
|
345 Return the commands to keep the output files.
|
|
346 """
|
|
347 cmd = ""
|
|
348 for f in lFilesToKeep:
|
|
349 f = f.replace( GENERIC_IN_FILE, fileName )
|
|
350 cmd += "if not os.path.exists( \"%s\" ):\n" % ( f )
|
|
351 cmd += "\tprint \"ERROR: output file '%s' doesn't exist\"\n" % ( f )
|
|
352 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
|
|
353 cmd += "\tsys.exit(1)\n"
|
|
354 cmd += "if not os.path.exists( \"%s/%s\" ):\n" \
|
|
355 % ( self._currentDir, f )
|
|
356 cmd += "\tshutil.copy( \"%s\", \"%s/%s\" )\n" % ( f, self.getCurrentDirectory(), f )
|
|
357 return cmd
|
|
358
|
|
359
|
|
360 def getCmdToRemoveFiles( self, fileName, lFilesToRemove ):
|
|
361 """
|
|
362 Return the commands to remove the temporary files.
|
|
363 """
|
|
364 cmd = ""
|
|
365 if lFilesToRemove != []:
|
|
366 for f in lFilesToRemove:
|
|
367 f = f.replace( GENERIC_IN_FILE, fileName )
|
|
368 cmd += "if os.path.exists( \"%s\" ):\n" % ( f )
|
|
369 cmd += "\tos.remove( \"%s\" )\n" % ( f )
|
|
370 return cmd
|
|
371
|
|
372
|
|
373 def getJobCommandsAsString( self, fileName, jobName, minFreeGigaInTmpDir=1 ):
|
|
374 """
|
|
375 Return all the job commands as a string.
|
|
376 """
|
|
377 cmd = "#!/usr/bin/env python\n"
|
|
378 cmd += "\n"
|
|
379 cmd += "import os\n"
|
|
380 cmd += "import sys\n"
|
|
381 cmd += "import shutil\n"
|
|
382 cmd += "import time\n"
|
|
383 cmd += "\n"
|
|
384 cmd += "print \"system:\", os.uname()\n"
|
|
385 cmd += "beginTime = time.time()\n"
|
|
386 cmd += "print 'beginTime=%f' % ( beginTime )\n"
|
|
387 cmd += "\n"
|
|
388 cmd += self.getCmdUpdateJobStatusAsString( "running" )
|
|
389 cmd += "\n"
|
|
390 cmd += "if not os.path.exists( \"%s\" ):\n" % ( self.getTemporaryDirectory() )
|
|
391 cmd += "\tprint \"ERROR: working dir '%s' doesn't exist\"\n" % ( \
|
|
392 self.getTemporaryDirectory() )
|
|
393 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
|
|
394 cmd += "\tsys.exit(1)\n"
|
|
395 cmd += "freeSpace = os.statvfs( \"%s\" )\n" % ( self.getTemporaryDirectory() )
|
|
396 cmd += "if ( freeSpace.f_bavail * freeSpace.f_frsize ) / 1073741824.0 < %i:\n" % ( minFreeGigaInTmpDir ) # nb blocs * bloc size in bytes > 1 GigaByte ?
|
|
397 cmd += "\tprint \"ERROR: less than %iGb in '%s'\"\n" % ( minFreeGigaInTmpDir, self.getTemporaryDirectory() )
|
|
398 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
|
|
399 cmd += "\tsys.exit(1)\n"
|
|
400 cmd += "print \"working dir: %s\"\n" % ( self.getTemporaryDirectory() )
|
|
401 cmd += "sys.stdout.flush()\n"
|
|
402 cmd += "os.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() )
|
|
403 cmd += "\n"
|
|
404 cmd += "newDir = \"%s_%s_%s\"\n" % ( self.getGroupIdentifier(), jobName, time.strftime("%Y%m%d-%H%M%S") )
|
|
405 cmd += "if os.path.exists( newDir ):\n"
|
|
406 cmd += "\tshutil.rmtree( newDir )\n"
|
|
407 cmd += "os.mkdir( newDir )\n"
|
|
408 cmd += "os.chdir( newDir )\n"
|
|
409 cmd += "\n"
|
|
410 cmd += "if not os.path.exists( \"%s\" ):\n" % ( fileName )
|
|
411 cmd += "\tos.symlink( \"%s/%s\", \"%s\" )\n" % \
|
|
412 ( self._inputDir, fileName, fileName )
|
|
413 cmd += "\n"
|
|
414
|
|
415 for pL in self.lPrgLaunchers:
|
|
416 cmd += self.getCmdToLaunchWrapper( \
|
|
417 fileName, \
|
|
418 pL.getWrapperCommandLine(), \
|
|
419 "%s.py" % ( type(pL).__name__ ) )
|
|
420 cmd += "\n"
|
|
421 cmd += self.getCmdToKeepFiles( fileName, pL.getListFilesToKeep() )
|
|
422 cmd += "\n"
|
|
423 cmd += self.getCmdToRemoveFiles( fileName, \
|
|
424 pL.getListFilesToRemove() )
|
|
425
|
|
426 cmd += "if os.path.exists( \"%s\" ):\n" % ( fileName )
|
|
427 cmd += "\tos.remove( \"%s\" )\n" % ( fileName )
|
|
428 cmd += "os.chdir( \"..\" )\n"
|
|
429 cmd += "shutil.rmtree( newDir )\n"
|
|
430 cmd += self.getCmdUpdateJobStatusAsString( "finished" )
|
|
431 cmd += "\n"
|
|
432 cmd += "endTime = time.time()\n"
|
|
433 cmd += "print 'endTime=%f' % ( endTime)\n"
|
|
434 cmd += "print 'executionTime=%f' % ( endTime - beginTime )\n"
|
|
435 cmd += "print \"system:\", os.uname()\n"
|
|
436 cmd += "sys.exit(0)\n"
|
|
437 return cmd
|
|
438
|
|
439 def getStatsOfExecutionTime( self ):
|
|
440 """
|
|
441 Return a Stat object about the execution time of each job as a
|
|
442 float expressed in seconds since the epoch, in UTC.
|
|
443 """
|
|
444 stat = Stat()
|
|
445 pattern = "%s/%s*.o*" % ( self.getCurrentDirectory(), \
|
|
446 self.getAcronym() )
|
|
447 lJobFiles = glob.glob( pattern )
|
|
448 for f in lJobFiles:
|
|
449 fH = open( f, "r" )
|
|
450 while True:
|
|
451 line = fH.readline()
|
|
452 if line == "":
|
|
453 break
|
|
454 if "executionTime" in line:
|
|
455 stat.add( float(line[:-1].split("=")[1] ) )
|
|
456 break
|
|
457 fH.close()
|
|
458 return stat
|
|
459
|
|
460
|
|
461 def formatGroupidAndTime(self):
|
|
462 return self.job.groupid + " " + time.strftime("%Y-%m-%d %H:%M:%S") + ""
|
|
463
|
|
464 def submitJob(self, lInFiles):
|
|
465 count = 0
|
|
466 for inFile in lInFiles:
|
|
467 count += 1
|
|
468 fileName = os.path.basename(inFile)
|
|
469 if self._verbose > 1:
|
|
470 print "processing '%s' # %i..." % (fileName, count)
|
|
471 sys.stdout.flush()
|
|
472
|
|
473 self.initializeJob(fileName, count)
|
|
474 time.sleep(0.5)
|
|
475 exitStatus = self.jobdb.submitJob(self.job)
|
|
476 if exitStatus != 0:
|
|
477 print "ERROR while submitting job '%i' to the cluster" % (count)
|
|
478 sys.exit(1)
|
|
479
|
|
480 def checkClusterLauncherAttributes( self ):
|
|
481 if self.getInputDirectory() == "":
|
|
482 message = "ERROR: missing input directory"
|
|
483 raise CheckerException(message)
|
|
484 if not os.path.exists( self.getInputDirectory() ):
|
|
485 message = "ERROR: input directory '%s' doesn't exist" % ( self.getInputDirectory() )
|
|
486 raise CheckerException(message)
|
|
487 if self.getGroupIdentifier() == "":
|
|
488 message = "ERROR: missing group identifier"
|
|
489 raise CheckerException(message)
|
|
490 if self.getAcronym() == "":
|
|
491 message = "ERROR: missing program acronym"
|
|
492 raise CheckerException(message)
|
|
493 if self.getConfigFile() == "":
|
|
494 message = "ERROR: missing config file to access MySQL"
|
|
495 raise CheckerException(message)
|
|
496 if not os.path.exists( self.getConfigFile() ):
|
|
497 message = "ERROR: config file '%s' doesn't exist" % ( self.getConfigFile() )
|
|
498 raise CheckerException(message)
|
|
499 if self.getTemporaryDirectory() == "":
|
|
500 self.setTemporaryDirectory(self._currentDir)
|
|
501
|
|
502 def checkGenericAttributes( self ):
|
|
503 self.checkClusterLauncherAttributes()
|
|
504
|
|
505 def checkProgramLauncherAttributes( self ):
|
|
506 self.getProgramLauncherInstance().checkSpecificAttributes()
|
|
507
|
|
508 def checkSpecificAttributes( self ):
|
|
509 self.checkProgramLauncherAttributes()
|
|
510
|
|
511 def start( self ):
|
|
512
|
|
513 if self.lPrgLaunchers == []:
|
|
514 self.setSingleProgramLauncher()
|
|
515 for pL in self.lPrgLaunchers:
|
|
516 if pL.getWrapperCommandLine() == "":
|
|
517 string = "ERROR: wrapper command is empty !"
|
|
518 print string
|
|
519 sys.exit(1)
|
|
520 if pL.getProgramCommandLine() == "":
|
|
521 string = "ERROR: program command is empty !"
|
|
522 print string
|
|
523 sys.exit(1)
|
|
524 self.checkProgramAvailability()
|
|
525
|
|
526 try:
|
|
527 self.checkProgramLauncherAttributes()
|
|
528 except CheckerException, msg:
|
|
529 print msg
|
|
530 print self.getHelpAsString()
|
|
531 sys.exit(1)
|
|
532
|
|
533 if self.getVerbosityLevel() > 0:
|
|
534 string = "START %s" % ( type(self).__name__ )
|
|
535 print string
|
|
536 self.job.tablename = self.getJobTableName()
|
|
537 self.job.groupid = self.getGroupIdentifier()
|
|
538 tokens = self.getQueueName().replace("'","").split(" ")
|
|
539 self.job.setQueue( tokens[0] )
|
|
540 if len(tokens) > 1:
|
|
541 lResources = tokens[1:]
|
|
542 self.job.lResources = lResources
|
|
543 if self.getVerbosityLevel() > 0:
|
|
544 print "groupid: %s" % ( self.getGroupIdentifier() )
|
|
545 self.jobdb = RepetJob( cfgFileName=self.getConfigFile() )
|
|
546 if self.jobdb.hasUnfinishedJob( self.job.tablename, \
|
|
547 self.job.groupid ):
|
|
548 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid )
|
|
549 return
|
|
550 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid )
|
|
551 sys.stdout.flush()
|
|
552
|
|
553 def end( self ):
|
|
554 if self.getClean():
|
|
555 self.removeAllJobFiles()
|
|
556 self.removeAllJobStdouts()
|
|
557 self.removeAllJobStderrs()
|
|
558
|
|
559 if self.getCatOutputFiles():
|
|
560 self.catOutputFiles()
|
|
561
|
|
562 self.jobdb.close()
|
|
563
|
|
564 if self.getVerbosityLevel() > 0:
|
|
565 string = "END %s" % ( type(self).__name__ )
|
|
566 print string
|
|
567 sys.stdout.flush()
|
|
568
|
|
569 def run( self ):
|
|
570 try:
|
|
571 self.checkClusterLauncherAttributes()
|
|
572 except CheckerException, msg:
|
|
573 print msg
|
|
574 print self.getHelpAsString()
|
|
575 sys.exit(1)
|
|
576
|
|
577 self.start()
|
|
578
|
|
579 lInFiles = self.getInputFilesList()
|
|
580 self._nbJobs = len(lInFiles)
|
|
581
|
|
582 if self._verbose > 0:
|
|
583 string = "submitting " + str(self._nbJobs) + " jobs... " + self.formatGroupidAndTime()
|
|
584 print string; sys.stdout.flush()
|
|
585
|
|
586 self.submitJob(lInFiles)
|
|
587
|
|
588 if self._verbose > 0:
|
|
589 string = "waiting for jobs... " + self.formatGroupidAndTime()
|
|
590 print string; sys.stdout.flush()
|
|
591
|
|
592 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid )
|
|
593
|
|
594 if self._verbose > 0:
|
|
595 string = "all jobs completed ! " + self.formatGroupidAndTime()
|
|
596 print string; sys.stdout.flush()
|
|
597 statsExecutionTime = self.getStatsOfExecutionTime()
|
|
598 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
|
|
599 print "execution time per job: %s" % statsExecutionTime.string()
|
|
600 sys.stdout.flush()
|
|
601
|
|
602 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid )
|
|
603
|
|
604 self.end()
|
|
605
|
|
606
|