Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/launcher/Launcher.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/launcher/Launcher.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,229 @@ +from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet +from commons.core.stat.Stat import Stat +from commons.core.launcher.WriteScript import WriteScript +from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory +from commons.core.sql.Job import Job +import stat +import os +import re +import sys +import time +import glob + +class Launcher(object): + + #TODO: remove unused parameters : query="", subject="", param="", job_table="" + def __init__( self, jobdb, query="", subject="", param="", cdir="", + tmpdir="", job_table="", queue="", groupid="", acro="X", + chooseTemplateWithCopy = False, chooseTemplateLight = False): + if jobdb.__class__.__name__ == "RepetJob": + self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs") + else: + self.jobdb = jobdb + self.jobdb.checkJobTable() + if cdir == "": + cdir = os.getcwd() + self.cdir = cdir + self.tmpdir = tmpdir + self.groupid = groupid + self.acronyme = acro + self._chooseTemplateWithCopy = chooseTemplateWithCopy + self._chooseTemplateLight = chooseTemplateLight + self.queue, self.lResources = self.getQueueNameAndResources(queue) + self._createJobInstance() + self._nbJobs = 0 + + def getQueueNameAndResources(self, configQueue): + tokens = configQueue.replace("'","").split(" ") + queueName = "" + lResources = [] + if tokens[0] != "": + if re.match(".*\.q", tokens[0]): + queueName = tokens[0] + lResources = tokens[1:] + else: + lResources = tokens + return queueName, lResources + + def createGroupidIfItNotExist(self): + if self.groupid == "": + self.job.groupid = str(os.getpid()) + else: + self.job.groupid = self.groupid + + def beginRun( self ): + self.createGroupidIfItNotExist() + if self.jobdb.hasUnfinishedJob(self.job.groupid): + self.jobdb.waitJobGroup(self.job.groupid) + else: + self.jobdb.cleanJobGroup(self.job.groupid) + + ## Launch one job in parallel + # + # @param cmdStart string command-line for the job to be launched + # @param cmdFinish string command to retrieve result files + # @warning the jobname has to be defined outside from this method + # + def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""): + if self._nbJobs == 0: + self._nbJobs = 1 + pid = str(os.getpid()) + now = time.localtime() + #TODO: rename ClusterLauncher_ ... + pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\ + self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\ + "-" + str(now[2]) + "_" + pid + ".py" + self.job.launcher = pyFileName + + #TODO: to remove when refactoring is done + cmdStart = self._indentCmd(cmdStart) + cmdFinish = self._indentCmd(cmdFinish) + + iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight) + iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy) + os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH) + sys.stdout.flush() + log = self.jobdb.submitJob(self.job) + if log != 0: + print "ERROR while submitting job to the cluster" + sys.exit(1) + + def endRun(self, cleanNodes = False): + string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) + print string; sys.stdout.flush() + self.jobdb.waitJobGroup(self.job.groupid) + if self._nbJobs > 1: + string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) + print string; sys.stdout.flush() + + if cleanNodes: + string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S") + print string; sys.stdout.flush() + self.cleanNodes() + string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S") + print string; sys.stdout.flush() + + statsExecutionTime = self.getStatsOfExecutionTime() + if self._nbJobs > 1: + print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum() + print "execution time per job: %s" % statsExecutionTime.string() + sys.stdout.flush() + self.jobdb.cleanJobGroup(self.job.groupid) + + def getStatsOfExecutionTime(self, acronyme = ""): + stat = Stat() + if acronyme == "": + pattern = "%s*.o*" % self.acronyme + else: + pattern = "%s*.o*" % acronyme + lJobFiles = glob.glob(pattern) + for f in lJobFiles: + fH = open(f, "r") + while True: + line = fH.readline() + if line == "": + break + if "executionTime" in line: + stat.add( float(line[:-1].split("=")[1] ) ) + break + fH.close() + return stat + + def clean( self, acronyme = "", stdout = True, stderr = True ): + lFileToRemove = [] + if acronyme == "": + acronyme = self.acronyme + pattern = "ClusterLauncher*%s*.py" % ( acronyme ) + lFileToRemove.extend(glob.glob( pattern )) + if stdout: + pattern = "%s*.o*" % ( acronyme ) + lFileToRemove.extend(glob.glob( pattern )) + if stderr: + pattern = "%s*.e*" % ( acronyme ) + lFileToRemove.extend(glob.glob( pattern )) + for file in lFileToRemove: + os.remove(file) + + #TODO: handle of nodesMustBeCleaned => class attribute ? + def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False): + self.beginRun() + print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S")) + for cmdsTuple in lCmdsTuples: + self._nbJobs += 1 + self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs) + self.job.jobname = self.acronyme + if len(cmdsTuple) == 2: + self.runSingleJob(cmdsTuple[0], cmdsTuple[1]) + else: + self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3]) + self._createJobInstance() + self.createGroupidIfItNotExist() + self.acronyme = acronymPrefix + self.endRun(nodesMustBeCleaned) + if cleanMustBeDone: + self.clean("%s_" % acronymPrefix) + self.jobdb.close() + + def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []): + cmdStart = "" + for cmd in lCmdStart: + cmdStart += "%s\n\t" % cmd + for cmd in lCmds: + cmdStart += "%s\n\t" % cmd + cmdFinish = "" + for cmd in lCmdFinish: + cmdFinish += "%s\n\t" % cmd + cmdSize = "" + for cmd in lCmdSize: + cmdSize += "%s\n\t\t" % cmd + cmdCopy = "" + for cmd in lCmdCopy: + cmdCopy += "%s\n\t\t" % cmd + return (cmdStart, cmdFinish, cmdSize, cmdCopy) + + #TODO: to remove when refactoring is done + def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []): + cmdStart = "" + for cmd in lCmdStart: + cmdStart += "%s\n" % cmd + for cmd in lCmds: + cmdStart += "%s\n" % cmd + cmdFinish = "" + for cmd in lCmdFinish: + cmdFinish += "%s\n" % cmd + cmdSize = "" + for cmd in lCmdSize: + cmdSize += "%s\n\t\t" % cmd + cmdCopy = "" + for cmd in lCmdCopy: + cmdCopy += "%s\n\t\t" % cmd + return (cmdStart, cmdFinish, cmdSize, cmdCopy) + + def getSystemCommand(self, prg, lArgs): + systemCmd = "log = os.system(\"" + prg + for arg in lArgs: + systemCmd += " " + arg + systemCmd += "\")" + return systemCmd + + def cleanNodes(self): + iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet() + iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid)) + iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir) + iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid) + iCleanClusterNodeAfterRepet.run() + + #TODO: to remove when refactoring is done + def _indentCmd(self, cmd): + lCmd = cmd.split("\n") + cmd_Tab = "%s\n" % lCmd[0] + for line in lCmd[1:-1]: + cmd_Tab += "\t%s\n" % line + return cmd_Tab + + def _createJobInstance(self): + if self.lResources == []: + #To have mem_free=1G: + self.job = Job(queue=self.queue) + else: + self.job = Job(queue=self.queue, lResources=self.lResources)