view smart_toolShed/commons/core/launcher/Launcher.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line source

from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet
from commons.core.stat.Stat import Stat
from commons.core.launcher.WriteScript import WriteScript
from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
from commons.core.sql.Job import Job
import stat
import os
import re
import sys
import time
import glob

class Launcher(object):

    #TODO: remove unused parameters : query="", subject="", param="", job_table=""
    def __init__( self, jobdb, query="", subject="", param="", cdir="",
                  tmpdir="", job_table="", queue="", groupid="", acro="X",
                  chooseTemplateWithCopy = False, chooseTemplateLight = False):
        if jobdb.__class__.__name__ == "RepetJob":
            self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs")
        else:
            self.jobdb = jobdb
        self.jobdb.checkJobTable()
        if cdir == "":
            cdir = os.getcwd()
        self.cdir = cdir
        self.tmpdir = tmpdir
        self.groupid = groupid
        self.acronyme = acro
        self._chooseTemplateWithCopy = chooseTemplateWithCopy
        self._chooseTemplateLight = chooseTemplateLight
        self.queue, self.lResources = self.getQueueNameAndResources(queue)
        self._createJobInstance()
        self._nbJobs = 0
        
    def getQueueNameAndResources(self, configQueue):
        tokens = configQueue.replace("'","").split(" ")
        queueName = ""
        lResources = []
        if tokens[0] != "":
            if re.match(".*\.q", tokens[0]):
                queueName = tokens[0]
                lResources = tokens[1:]
            else:
                lResources = tokens
        return queueName, lResources

    def createGroupidIfItNotExist(self):
        if self.groupid == "":
            self.job.groupid = str(os.getpid())
        else:
            self.job.groupid = self.groupid

    def beginRun( self ):
        self.createGroupidIfItNotExist()
        if self.jobdb.hasUnfinishedJob(self.job.groupid):
            self.jobdb.waitJobGroup(self.job.groupid)
        else:
            self.jobdb.cleanJobGroup(self.job.groupid)

    ## Launch one job in parallel
    #
    # @param cmdStart string command-line for the job to be launched
    # @param cmdFinish string command to retrieve result files
    # @warning the jobname has to be defined outside from this method
    #
    def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""):
        if self._nbJobs == 0:
            self._nbJobs = 1
        pid = str(os.getpid())
        now = time.localtime()
        #TODO: rename ClusterLauncher_ ...
        pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\
                     self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\
                     "-" + str(now[2]) + "_" + pid + ".py"
        self.job.launcher = pyFileName
        
        #TODO: to remove when refactoring is done
        cmdStart = self._indentCmd(cmdStart)
        cmdFinish = self._indentCmd(cmdFinish)
        
        iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight)
        iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
        os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH)
        sys.stdout.flush()
        log = self.jobdb.submitJob(self.job)
        if log != 0:
            print "ERROR while submitting job to the cluster"
            sys.exit(1)
        
    def endRun(self, cleanNodes = False):
        string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
        print string; sys.stdout.flush()
        self.jobdb.waitJobGroup(self.job.groupid)
        if self._nbJobs > 1:
            string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
            print string; sys.stdout.flush()

        if cleanNodes:
            string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
            print string; sys.stdout.flush()
            self.cleanNodes()
            string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
            print string; sys.stdout.flush()
            
        statsExecutionTime = self.getStatsOfExecutionTime()
        if self._nbJobs > 1:
            print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
        print "execution time per job: %s" % statsExecutionTime.string()
        sys.stdout.flush()
        self.jobdb.cleanJobGroup(self.job.groupid)
        
    def getStatsOfExecutionTime(self, acronyme = ""):
        stat = Stat()
        if acronyme == "":
            pattern = "%s*.o*" % self.acronyme
        else:
            pattern = "%s*.o*" % acronyme
        lJobFiles = glob.glob(pattern)
        for f in lJobFiles:
            fH = open(f, "r")
            while True:
                line = fH.readline()
                if line == "":
                    break
                if "executionTime" in line:
                    stat.add( float(line[:-1].split("=")[1] ) )
                    break
            fH.close()
        return stat     

    def clean( self, acronyme = "", stdout = True, stderr = True ):
        lFileToRemove = []
        if acronyme == "":
            acronyme = self.acronyme  
        pattern = "ClusterLauncher*%s*.py" % ( acronyme )
        lFileToRemove.extend(glob.glob( pattern ))
        if stdout:
            pattern = "%s*.o*" % ( acronyme )
            lFileToRemove.extend(glob.glob( pattern ))        
        if stderr:
            pattern = "%s*.e*" % ( acronyme )
            lFileToRemove.extend(glob.glob( pattern ))                   
        for file in lFileToRemove:
            os.remove(file)
    
    #TODO: handle of nodesMustBeCleaned => class attribute ?
    def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False):
        self.beginRun()
        print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid,  time.strftime("%Y-%m-%d %H:%M:%S"))
        for cmdsTuple in lCmdsTuples:
            self._nbJobs += 1
            self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs)
            self.job.jobname = self.acronyme
            if len(cmdsTuple) == 2:
                self.runSingleJob(cmdsTuple[0], cmdsTuple[1])
            else:
                self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3])
                self._createJobInstance()
                self.createGroupidIfItNotExist()
        self.acronyme = acronymPrefix
        self.endRun(nodesMustBeCleaned)
        if cleanMustBeDone:
            self.clean("%s_" % acronymPrefix)
        self.jobdb.close()

    def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
        cmdStart = ""
        for cmd in lCmdStart:
            cmdStart += "%s\n\t" % cmd
        for cmd in lCmds:
            cmdStart += "%s\n\t" % cmd
        cmdFinish = ""
        for cmd in lCmdFinish:
            cmdFinish += "%s\n\t" % cmd
        cmdSize = ""
        for cmd in lCmdSize:
            cmdSize += "%s\n\t\t" % cmd
        cmdCopy = ""
        for cmd in lCmdCopy:
            cmdCopy += "%s\n\t\t" % cmd
        return (cmdStart, cmdFinish, cmdSize, cmdCopy)

    #TODO: to remove when refactoring is done
    def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
        cmdStart = ""
        for cmd in lCmdStart:
            cmdStart += "%s\n" % cmd
        for cmd in lCmds:
            cmdStart += "%s\n" % cmd
        cmdFinish = ""
        for cmd in lCmdFinish:
            cmdFinish += "%s\n" % cmd
        cmdSize = ""
        for cmd in lCmdSize:
            cmdSize += "%s\n\t\t" % cmd
        cmdCopy = ""
        for cmd in lCmdCopy:
            cmdCopy += "%s\n\t\t" % cmd
        return (cmdStart, cmdFinish, cmdSize, cmdCopy)
    
    def getSystemCommand(self, prg, lArgs):
        systemCmd = "log = os.system(\"" + prg 
        for arg in lArgs:
            systemCmd += " " + arg
        systemCmd += "\")"
        return systemCmd

    def cleanNodes(self):
        iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet()
        iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid))
        iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir)
        iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid)
        iCleanClusterNodeAfterRepet.run()

    #TODO: to remove when refactoring is done
    def _indentCmd(self, cmd):
        lCmd = cmd.split("\n")
        cmd_Tab = "%s\n" % lCmd[0]
        for line in lCmd[1:-1]:
            cmd_Tab += "\t%s\n" % line
        return cmd_Tab
    
    def _createJobInstance(self):
        if self.lResources == []:
            #To have mem_free=1G:
            self.job = Job(queue=self.queue)
        else:
            self.job = Job(queue=self.queue, lResources=self.lResources)