diff commons/core/launcher/Launcher2.py @ 31:0ab839023fe4

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 14:33:21 -0400
parents 94ab73e8a190
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/launcher/Launcher2.py	Tue Apr 30 14:33:21 2013 -0400
@@ -0,0 +1,294 @@
+from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet
+from commons.core.stat.Stat import Stat
+from commons.core.launcher.WriteScript import WriteScript
+from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
+from commons.core.sql.Job import Job
+import stat
+import os
+import re
+import sys
+import time
+import glob
+
+class LauncherParameter(object):
+
+    def __init__(self, jobDB):
+        self._jobDB = jobDB
+    
+    def getJobDB(self):
+        return self._jobDB
+
+    def setQuery(self, query):
+        self._query = query
+
+    def setSubject(self, subject):
+        self._subject = subject
+        
+    def setParam(self, param):
+        self._param = param
+    
+    def setCurrentDir(self, currentDir):
+        self._currentDir = currentDir
+    
+    def getCurrentDir(self):
+        return self._currentDir    
+
+    def setTempDir(self, tempDir):
+        self._tempDir = tempDir
+    
+    def getTempDir(self):
+        return self._tempDir
+        
+    def setJobTable(self, jobTable):
+        self._jobTable = jobTable
+        
+    def setQueue(self, queue):
+        self._queue = queue
+    
+    def getQueue(self):
+        return self._queue
+        
+    def setGroupId(self, groupId):
+        self._groupId = groupId
+    
+    def getGroupId(self):
+        return self._groupId
+    
+    def setAcronym(self, acronym):
+        self._acronym = acronym
+    
+    def getAcronym(self):
+        return self._acronym
+   
+    @staticmethod
+    def createParameter(jobdb, groupid, acronym):
+	launcherParameter = LauncherParameter(jobdb)
+        launcherParameter.setQuery(os.getcwd())
+        launcherParameter.setSubject("")
+        launcherParameter.setParam("")
+        launcherParameter.setCurrentDir(os.getcwd())
+        launcherParameter.setTempDir(os.getcwd())
+        launcherParameter.setJobTable("")
+        launcherParameter.setQueue("")
+        launcherParameter.setGroupId(groupid)
+        launcherParameter.setAcronym(acronym)
+	return launcherParameter       
+
+        
+class Launcher2(object):
+
+    #TODO: remove unused parameters : query="", subject="", param="", job_table=""
+    def __init__(self, iLauncherParameter):
+        jobdb = iLauncherParameter.getJobDB()
+        cdir = iLauncherParameter.getCurrentDir()
+        if jobdb.__class__.__name__ == "RepetJob":
+            self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs")
+        else:
+            self.jobdb = jobdb
+        self.jobdb.checkJobTable()
+        if cdir == "":
+            cdir = os.getcwd()
+        self.cdir = cdir
+        self.tmpdir = iLauncherParameter.getTempDir()
+        self.groupid = iLauncherParameter.getGroupId()
+        self.acronyme = iLauncherParameter.getAcronym()
+        self._chooseTemplateWithCopy = False
+        self._chooseTemplateLight = False
+        self.queue, self.lResources = self.getQueueNameAndResources(iLauncherParameter.getQueue())
+        self._createJobInstance()
+        self._nbJobs = 0
+        
+    def getQueueNameAndResources(self, configQueue):
+        tokens = configQueue.replace("'","").split(" ")
+        queueName = ""
+        lResources = []
+        if tokens[0] != "":
+            if re.match(".*\.q", tokens[0]):
+                queueName = tokens[0]
+                lResources = tokens[1:]
+            else:
+                lResources = tokens
+        return queueName, lResources
+
+    def createGroupidIfItNotExist(self):
+        if self.groupid == "":
+            self.job.groupid = str(os.getpid())
+        else:
+            self.job.groupid = self.groupid
+
+    def beginRun( self ):
+        self.createGroupidIfItNotExist()
+        if self.jobdb.hasUnfinishedJob(self.job.groupid):
+            self.jobdb.waitJobGroup(self.job.groupid)
+        else:
+            self.jobdb.cleanJobGroup(self.job.groupid)
+
+    ## Launch one job in parallel
+    #
+    # @param cmdStart string command-line for the job to be launched
+    # @param cmdFinish string command to retrieve result files
+    # @warning the jobname has to be defined outside from this method
+    #
+    def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""):
+        if self._nbJobs == 0:
+            self._nbJobs = 1
+        pid = str(os.getpid())
+        now = time.localtime()
+        #TODO: rename ClusterLauncher_ ...
+        pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\
+                     self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\
+                     "-" + str(now[2]) + "_" + pid + ".py"
+        self.job.launcher = pyFileName
+        
+        #TODO: to remove when refactoring is done
+        cmdStart = self._indentCmd(cmdStart)
+        cmdFinish = self._indentCmd(cmdFinish)
+        
+        iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight)
+        iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
+        os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH)
+        sys.stdout.flush()
+        log = self.jobdb.submitJob(self.job)
+        if log != 0:
+            print "ERROR while submitting job to the cluster"
+            sys.exit(1)
+        
+    def endRun(self, cleanNodes = False):
+        string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
+        print string; sys.stdout.flush()
+        self.jobdb.waitJobGroup(self.job.groupid)
+        if self._nbJobs > 1:
+            string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
+            print string; sys.stdout.flush()
+
+        if cleanNodes:
+            string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
+            print string; sys.stdout.flush()
+            self.cleanNodes()
+            string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
+            print string; sys.stdout.flush()
+            
+        statsExecutionTime = self.getStatsOfExecutionTime()
+        if self._nbJobs > 1:
+            print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
+        print "execution time per job: %s" % statsExecutionTime.string()
+        sys.stdout.flush()
+        self.jobdb.cleanJobGroup(self.job.groupid)
+        
+    def getStatsOfExecutionTime(self, acronyme = ""):
+        stat = Stat()
+        if acronyme == "":
+            pattern = "%s*.o*" % self.acronyme
+        else:
+            pattern = "%s*.o*" % acronyme
+        lJobFiles = glob.glob(pattern)
+        for f in lJobFiles:
+            fH = open(f, "r")
+            while True:
+                line = fH.readline()
+                if line == "":
+                    break
+                if "executionTime" in line:
+                    stat.add( float(line[:-1].split("=")[1] ) )
+                    break
+            fH.close()
+        return stat     
+
+    def clean( self, acronyme = "", stdout = True, stderr = True ):
+        lFileToRemove = []
+        if acronyme == "":
+            acronyme = self.acronyme  
+        pattern = "ClusterLauncher*%s*.py" % ( acronyme )
+        lFileToRemove.extend(glob.glob( pattern ))
+        if stdout:
+            pattern = "%s*.o*" % ( acronyme )
+            lFileToRemove.extend(glob.glob( pattern ))        
+        if stderr:
+            pattern = "%s*.e*" % ( acronyme )
+            lFileToRemove.extend(glob.glob( pattern ))                   
+        for file in lFileToRemove:
+            os.remove(file)
+    
+    #TODO: handle of nodesMustBeCleaned => class attribute ?
+    def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False):
+        self.beginRun()
+        print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid,  time.strftime("%Y-%m-%d %H:%M:%S"))
+        for cmdsTuple in lCmdsTuples:
+            self._nbJobs += 1
+            self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs)
+            self.job.jobname = self.acronyme
+            if len(cmdsTuple) == 2:
+                self.runSingleJob(cmdsTuple[0], cmdsTuple[1])
+            else:
+                self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3])
+                self._createJobInstance()
+                self.createGroupidIfItNotExist()
+        self.acronyme = acronymPrefix
+        self.endRun(nodesMustBeCleaned)
+        if cleanMustBeDone:
+            self.clean("%s_" % acronymPrefix)
+        self.jobdb.close()
+
+    def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
+        cmdStart = ""
+        for cmd in lCmdStart:
+            cmdStart += "%s\n\t" % cmd
+        for cmd in lCmds:
+            cmdStart += "%s\n\t" % cmd
+        cmdFinish = ""
+        for cmd in lCmdFinish:
+            cmdFinish += "%s\n\t" % cmd
+        cmdSize = ""
+        for cmd in lCmdSize:
+            cmdSize += "%s\n\t\t" % cmd
+        cmdCopy = ""
+        for cmd in lCmdCopy:
+            cmdCopy += "%s\n\t\t" % cmd
+        return (cmdStart, cmdFinish, cmdSize, cmdCopy)
+
+    #TODO: to remove when refactoring is done
+    def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
+        cmdStart = ""
+        for cmd in lCmdStart:
+            cmdStart += "%s\n" % cmd
+        for cmd in lCmds:
+            cmdStart += "%s\n" % cmd
+        cmdFinish = ""
+        for cmd in lCmdFinish:
+            cmdFinish += "%s\n" % cmd
+        cmdSize = ""
+        for cmd in lCmdSize:
+            cmdSize += "%s\n\t\t" % cmd
+        cmdCopy = ""
+        for cmd in lCmdCopy:
+            cmdCopy += "%s\n\t\t" % cmd
+        return (cmdStart, cmdFinish, cmdSize, cmdCopy)
+    
+    def getSystemCommand(self, prg, lArgs):
+        systemCmd = "log = os.system(\"" + prg 
+        for arg in lArgs:
+            systemCmd += " " + arg
+        systemCmd += "\")"
+        return systemCmd
+
+    def cleanNodes(self):
+        iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet()
+        iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid))
+        iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir)
+        iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid)
+        iCleanClusterNodeAfterRepet.run()
+
+    #TODO: to remove when refactoring is done
+    def _indentCmd(self, cmd):
+        lCmd = cmd.split("\n")
+        cmd_Tab = "%s\n" % lCmd[0]
+        for line in lCmd[1:-1]:
+            cmd_Tab += "\t%s\n" % line
+        return cmd_Tab
+    
+    def _createJobInstance(self):
+        if self.lResources == []:
+            #To have mem_free=1G:
+            self.job = Job(queue=self.queue)
+        else:
+            self.job = Job(queue=self.queue, lResources=self.lResources)