Mercurial > repos > yufei-luo > s_mart
view commons/core/launcher/Launcher.py @ 14:c79b9ae3f65f
Deleted selected files
author | m-zytnicki |
---|---|
date | Fri, 19 Apr 2013 10:13:11 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet from commons.core.stat.Stat import Stat from commons.core.launcher.WriteScript import WriteScript from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory from commons.core.sql.Job import Job import stat import os import re import sys import time import glob class Launcher(object): #TODO: remove unused parameters : query="", subject="", param="", job_table="" def __init__( self, jobdb, query="", subject="", param="", cdir="", tmpdir="", job_table="", queue="", groupid="", acro="X", chooseTemplateWithCopy = False, chooseTemplateLight = False): if jobdb.__class__.__name__ == "RepetJob": self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs") else: self.jobdb = jobdb self.jobdb.checkJobTable() if cdir == "": cdir = os.getcwd() self.cdir = cdir self.tmpdir = tmpdir self.groupid = groupid self.acronyme = acro self._chooseTemplateWithCopy = chooseTemplateWithCopy self._chooseTemplateLight = chooseTemplateLight self.queue, self.lResources = self.getQueueNameAndResources(queue) self._createJobInstance() self._nbJobs = 0 def getQueueNameAndResources(self, configQueue): tokens = configQueue.replace("'","").split(" ") queueName = "" lResources = [] if tokens[0] != "": if re.match(".*\.q", tokens[0]): queueName = tokens[0] lResources = tokens[1:] else: lResources = tokens return queueName, lResources def createGroupidIfItNotExist(self): if self.groupid == "": self.job.groupid = str(os.getpid()) else: self.job.groupid = self.groupid def beginRun( self ): self.createGroupidIfItNotExist() if self.jobdb.hasUnfinishedJob(self.job.groupid): self.jobdb.waitJobGroup(self.job.groupid) else: self.jobdb.cleanJobGroup(self.job.groupid) ## Launch one job in parallel # # @param cmdStart string command-line for the job to be launched # @param cmdFinish string command to retrieve result files # @warning the jobname has to be defined outside from this method # def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""): if self._nbJobs == 0: self._nbJobs = 1 pid = str(os.getpid()) now = time.localtime() #TODO: rename ClusterLauncher_ ... pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\ self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\ "-" + str(now[2]) + "_" + pid + ".py" self.job.launcher = pyFileName #TODO: to remove when refactoring is done cmdStart = self._indentCmd(cmdStart) cmdFinish = self._indentCmd(cmdFinish) iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight) iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy) os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH) sys.stdout.flush() log = self.jobdb.submitJob(self.job) if log != 0: print "ERROR while submitting job to the cluster" sys.exit(1) def endRun(self, cleanNodes = False): string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) print string; sys.stdout.flush() self.jobdb.waitJobGroup(self.job.groupid) if self._nbJobs > 1: string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) print string; sys.stdout.flush() if cleanNodes: string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S") print string; sys.stdout.flush() self.cleanNodes() string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S") print string; sys.stdout.flush() statsExecutionTime = self.getStatsOfExecutionTime() if self._nbJobs > 1: print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum() print "execution time per job: %s" % statsExecutionTime.string() sys.stdout.flush() self.jobdb.cleanJobGroup(self.job.groupid) def getStatsOfExecutionTime(self, acronyme = ""): stat = Stat() if acronyme == "": pattern = "%s*.o*" % self.acronyme else: pattern = "%s*.o*" % acronyme lJobFiles = glob.glob(pattern) for f in lJobFiles: fH = open(f, "r") while True: line = fH.readline() if line == "": break if "executionTime" in line: stat.add( float(line[:-1].split("=")[1] ) ) break fH.close() return stat def clean( self, acronyme = "", stdout = True, stderr = True ): lFileToRemove = [] if acronyme == "": acronyme = self.acronyme pattern = "ClusterLauncher*%s*.py" % ( acronyme ) lFileToRemove.extend(glob.glob( pattern )) if stdout: pattern = "%s*.o*" % ( acronyme ) lFileToRemove.extend(glob.glob( pattern )) if stderr: pattern = "%s*.e*" % ( acronyme ) lFileToRemove.extend(glob.glob( pattern )) for file in lFileToRemove: os.remove(file) #TODO: handle of nodesMustBeCleaned => class attribute ? def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False): self.beginRun() print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) for cmdsTuple in lCmdsTuples: self._nbJobs += 1 self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs) self.job.jobname = self.acronyme if len(cmdsTuple) == 2: self.runSingleJob(cmdsTuple[0], cmdsTuple[1]) else: self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3]) self._createJobInstance() self.createGroupidIfItNotExist() self.acronyme = acronymPrefix self.endRun(nodesMustBeCleaned) if cleanMustBeDone: self.clean("%s_" % acronymPrefix) self.jobdb.close() def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []): cmdStart = "" for cmd in lCmdStart: cmdStart += "%s\n\t" % cmd for cmd in lCmds: cmdStart += "%s\n\t" % cmd cmdFinish = "" for cmd in lCmdFinish: cmdFinish += "%s\n\t" % cmd cmdSize = "" for cmd in lCmdSize: cmdSize += "%s\n\t\t" % cmd cmdCopy = "" for cmd in lCmdCopy: cmdCopy += "%s\n\t\t" % cmd return (cmdStart, cmdFinish, cmdSize, cmdCopy) #TODO: to remove when refactoring is done def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []): cmdStart = "" for cmd in lCmdStart: cmdStart += "%s\n" % cmd for cmd in lCmds: cmdStart += "%s\n" % cmd cmdFinish = "" for cmd in lCmdFinish: cmdFinish += "%s\n" % cmd cmdSize = "" for cmd in lCmdSize: cmdSize += "%s\n\t\t" % cmd cmdCopy = "" for cmd in lCmdCopy: cmdCopy += "%s\n\t\t" % cmd return (cmdStart, cmdFinish, cmdSize, cmdCopy) def getSystemCommand(self, prg, lArgs): systemCmd = "log = os.system(\"" + prg for arg in lArgs: systemCmd += " " + arg systemCmd += "\")" return systemCmd def cleanNodes(self): iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet() iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid)) iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir) iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid) iCleanClusterNodeAfterRepet.run() #TODO: to remove when refactoring is done def _indentCmd(self, cmd): lCmd = cmd.split("\n") cmd_Tab = "%s\n" % lCmd[0] for line in lCmd[1:-1]: cmd_Tab += "\t%s\n" % line return cmd_Tab def _createJobInstance(self): if self.lResources == []: #To have mem_free=1G: self.job = Job(queue=self.queue) else: self.job = Job(queue=self.queue, lResources=self.lResources)