diff smart_toolShed/commons/core/sql/TableJobAdaptator.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/smart_toolShed/commons/core/sql/TableJobAdaptator.py	Thu Jan 17 10:52:14 2013 -0500
@@ -0,0 +1,405 @@
+# Copyright INRA (Institut National de la Recherche Agronomique)
+# http://www.inra.fr
+# http://urgi.versailles.inra.fr
+#
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software.  You can  use, 
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info". 
+#
+# As a counterpart to the access to the source code and  rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty  and the software's author,  the holder of the
+# economic rights,  and the successive licensors  have only  limited
+# liability. 
+#
+# In this respect, the user's attention is drawn to the risks associated
+# with loading,  using,  modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean  that it is complicated to manipulate,  and  that  also
+# therefore means  that it is reserved for developers  and  experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or 
+# data to be ensured and,  more generally, to use and operate it in the 
+# same conditions as regards security. 
+#
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+
+
+import os
+import time
+import datetime
+import sys
+from commons.core.sql.Job import Job 
+from commons.core.sql.TableAdaptator import TableAdaptator
+
+## Methods for Job persistence 
+#
+class TableJobAdaptator(TableAdaptator):
+        
+    ## Record a job
+    #
+    # @param job Job instance with the job informations
+    #
+    def recordJob(self, job):
+        self.removeJob(job)
+        sqlCmd = "INSERT INTO %s" % self._table
+        sqlCmd += " VALUES ("
+        sqlCmd += " \"%s\"," % job.jobid
+        sqlCmd += " \"%s\"," % job.jobname
+        sqlCmd += " \"%s\"," % job.groupid
+        sqlCmd += " \"%s\"," % job.launcher
+        sqlCmd += " \"%s\"," % job.queue
+        sqlCmd += " \"%s\"," % job.lResources
+        sqlCmd += " \"waiting\","
+        sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S")
+        sqlCmd += " \"?\" );"
+        self._iDb.execute(sqlCmd)
+        
+       
+    ## Remove a job from the job table
+    #
+    #  @param job: job instance to remove
+    #
+    def removeJob(self, job):
+        qry = "DELETE FROM %s" % self._table
+        qry += " WHERE groupid='%s'" % job.groupid
+        qry += " AND jobname='%s'" % job.jobname
+        qry += " AND launcher='%s';" % job.launcher
+        self._iDb.execute(qry)
+            
+            
+    ## Set the jobid of a job with the id of SGE
+    #
+    # @param job job instance
+    # @param jobid integer
+    #
+    def updateJobIdInDB(self, job, jobid):
+        #TODO: check if only one job will be updated
+        qry = "UPDATE %s" % self._table
+        qry += " SET jobid='%i'" % int(jobid)
+        qry += " WHERE jobname='%s'" % job.jobname
+        qry += " AND groupid='%s'" % job.groupid
+        qry += " AND launcher='%s';" % job.launcher
+        self._iDb.execute(qry)
+        
+        
+    ## Get a job status
+    #
+    # @param job: a Job instance with the job informations
+    #
+    def getJobStatus(self, job):
+        if job.jobid != 0 and job.jobname == "":
+            job.jobname = job.jobid
+            job.jobid = 0
+        qry = "SELECT status FROM %s" % self._table
+        qry += " WHERE groupid='%s'" % job.groupid
+        qry += " AND jobname='%s'" % job.jobname
+        qry += " AND launcher='%s';" % job.launcher
+        self._iDb.execute(qry)
+        res = self._iDb.fetchall()
+        if len(res) > 1:
+            sys.stderr.write("ERROR while getting job status: non-unique jobs\n")
+            sys.stderr.flush()
+            sys.exit(1)
+        if res == None or len(res) == 0:
+            return "unknown"
+        return res[0][0]
+    
+    
+    ## Change a job status
+    #
+    # @param job: a Job instance with the job informations
+    # @param status: the new status (waiting,finished,error)
+    #
+    def changeJobStatus(self, job, status):
+        sqlCmd = "UPDATE %s" % self._table
+        sqlCmd += " SET status='%s'" % status
+        sqlCmd += ", node='%s'" % job.node
+        sqlCmd += " WHERE groupid='%s'" % job.groupid
+        sqlCmd += " AND jobname='%s'" % job.jobname
+        sqlCmd += " AND launcher='%s';" % job.launcher
+        self._iDb.execute(sqlCmd)
+        
+        
+    ## Get the number of jobs belonging to the desired groupid with the desired status.
+    #
+    # @param groupid string a group identifier to record related job series 
+    # @param status string job status (waiting, running, finished, error)
+    # @return int
+    #
+    def getCountStatus(self, groupid, status):
+        qry = "SELECT count(jobname) FROM %s" % self._table
+        qry += " WHERE groupid='%s'" % groupid
+        qry += " AND status='%s';" % status
+        self._iDb.execute(qry)
+        res = self._iDb.fetchall()
+        return int(res[0][0])
+        
+        
+    ## Clean all job from a job group
+    #
+    # @param groupid: a group identifier to record related job series
+    #
+    def cleanJobGroup(self, groupid):
+        qry = "DELETE FROM %s WHERE groupid='%s';" % (self._table, groupid)
+        self._iDb.execute(qry)
+            
+            
+    ## Check if there is unfinished job from a job group.
+    #
+    # @param groupid string a group identifier to record related job series 
+    #        
+    def hasUnfinishedJob(self, groupid):
+        qry = "SELECT * FROM %s" % self._table
+        qry += " WHERE groupid='%s'" % groupid
+        qry += " and status!='finished';" 
+        self._iDb.execute(qry)
+        res = self._iDb.fetchall()
+        if len(res) == 0:
+            return False
+        return True
+    
+
+    ## Wait job finished status from a job group.
+    #  Job are re-launched if error (max. 3 times)
+    #
+    # @param groupid string a group identifier to record related job series
+    # @param checkInterval integer time laps in seconds between two checks (default = 5)
+    # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
+    # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
+    # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
+    #
+    def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
+        dJob2Err = {}
+        
+        # retrieve the total number of jobs belonging to the desired groupid
+        qry = "SELECT count(jobname) FROM %s WHERE groupid='%s';" % (self._table, groupid)
+        self._iDb.execute(qry)
+        totalNbJobs = int(self._iDb.fetchall()[0][0])
+        
+        nbTimeOuts = 0
+        
+        while True:
+            time.sleep(checkInterval)
+            # retrieve the finished jobs and stop if all jobs are finished
+            nbFinishedJobs = self.getCountStatus(groupid, "finished")
+            if nbFinishedJobs == totalNbJobs:
+                break
+
+            # retrieve the jobs in error and relaunch them if they are in error (max. 'maxRelaunch' times)
+            qry = "SELECT * FROM %s" % self._table
+            qry += " WHERE groupid='%s'" % groupid
+            qry += " AND status ='error';"
+            self._iDb.execute(qry)
+            lJobsInError = self._iDb.fetchall()
+            for job in lJobsInError:
+                jobName = job[1]
+                if not dJob2Err.has_key(jobName):
+                    dJob2Err[jobName] = 1
+                if dJob2Err[jobName] < maxRelaunch:
+                    print "job '%s' in error, re-submitting (%i)" % (job[1], dJob2Err[job[1]])
+                    sys.stdout.flush()
+                    lResources = job[5].replace("[", "").replace("]", "").replace("'", "").split(", ")
+                    newJob = Job(jobname=jobName, groupid=job[2], launcherFile=job[3], queue=job[4], lResources=lResources)
+                    self.submitJob(newJob)
+                    dJob2Err[jobName] += 1
+                else:
+                    dJob2Err[jobName] += 1
+                    cmd = "job '%s' in permanent error (>%i)" % (jobName, maxRelaunch)
+                    cmd += "\ngroupid = %s" % groupid
+                    cmd += "\nnb of jobs = %i" % totalNbJobs
+                    cmd += "\nnb of finished jobs = %i" % self.getCountStatus(groupid, "finished")
+                    cmd += "\nnb of waiting jobs = %i" % self.getCountStatus(groupid, "waiting")
+                    cmd += "\nnb of running jobs = %i" % self.getCountStatus(groupid, "running")
+                    cmd += "\nnb of jobs in error = %i" % self.getCountStatus(groupid, "error")
+                    sys.stdout.flush()
+                    if exitIfTooManyErrors:
+                        self.cleanJobGroup(groupid)
+                        sys.exit(1)
+                    else:
+                        checkInterval = 60
+            nbTimeOuts = self._checkIfJobsTableAndJobsManagerInfoAreConsistent(nbTimeOuts, timeOutPerJob, groupid)
+    
+    
+    ## Submit a job to a queue and record it in job table.
+    #
+    # @param job a job instance
+    # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
+    # @param checkInterval integer time laps in seconds between two checks (default = 30)
+    # @param verbose integer (default = 0)
+    #               
+    def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30):
+        if self.getJobStatus(job) in ["waiting", "running", "finished"]:
+            sys.stderr.write( "WARNING: job '%s' was already submitted\n" % job.jobname)
+            sys.stderr.flush()
+            self.cleanJobGroup(job.groupid)
+            sys.exit(1)
+            
+        while self.getCountStatus(job.groupid, "waiting") > maxNbWaitingJobs:
+            time.sleep(checkInterval)
+
+        self.recordJob(job)
+        cmd = self._getQsubCommand(job)
+        returnStatus = os.system(cmd)
+
+        if returnStatus == 0:
+            fileName = "jobid.stdout"
+            jobidFileHandler = open(fileName, "r")
+            jobid = self._getJobidFromJobManager(jobidFileHandler)
+            if verbose > 0:
+                print "job '%i %s' submitted" % (jobid, job.jobname)
+                sys.stdout.flush()
+            job.jobid = jobid
+            jobidFileHandler.close()
+            self.updateJobIdInDB(job, jobid)
+            os.remove(fileName)
+        return returnStatus
+
+
+    ## Get the list of nodes where jobs of one group were executed
+    #
+    # @param groupid string a group identifier of job series 
+    # @return lNodes list of nodes names without redundancy
+    #
+    def getNodesListByGroupId(self, groupId):
+        qry = "SELECT DISTINCT node FROM %s" % self._table
+        qry += " WHERE groupid='%s'" % groupId
+        self._iDb.execute(qry)
+        res = self._iDb.fetchall()
+        lNodes = []
+        for resTuple in res:
+            lNodes.append(resTuple[0])
+        return lNodes
+    
+    def checkJobTable(self):
+        if not self._iDb.doesTableExist(self._table):
+            self._iDb.createTable(self._table, "jobs")
+        else:
+            lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"])
+            lObsFields = sorted(self._iDb.getFieldList(self._table))
+            if lExpFields != lObsFields:
+                self._iDb.createTable(self._table, "jobs", overwrite = True)
+    
+    def close(self):
+        self._iDb.close() 
+    
+    def _getJobidAndNbJob(self, jobid) :
+        tab = jobid.split(".")
+        jobid = tab[0]
+        tab = tab[1].split(":")
+        nbJob = tab[0]
+        return jobid, nbJob
+    
+class TableJobAdaptatorSGE(TableJobAdaptator):
+        
+    def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid):
+        # retrieve the date and time at which the oldest, still-running job was submitted
+        sql = "SELECT jobid,jobname,time FROM %s WHERE groupid='%s' AND status='running' ORDER BY time DESC LIMIT 1" % (self._table, groupid)
+        self._iDb.execute( sql )
+        res = self._iDb.fetchall()
+        if len(res) > 0:
+            jobid = res[0][0]
+            jobname = res[0][1]
+            dateTimeOldestJob = res[0][2]
+            dateTimeCurrent = datetime.datetime.now()
+            # delta is time between (i) first job launched of the given groupid and still in running state and (ii) current time 
+            delta = dateTimeCurrent - dateTimeOldestJob
+            # check if delta is in an interval:  0 <= delta < 1h | 1h <= delta < 2h | 2h <= delta < 3h (timeOutPerJob = 1h)  
+            if delta.seconds >= nbTimeOuts * timeOutPerJob and delta.seconds < (nbTimeOuts+1) * timeOutPerJob:
+                return nbTimeOuts
+            # delta outside the interval: go to next interval (time out) 
+            if delta.seconds >= (nbTimeOuts+1) * timeOutPerJob:
+                nbTimeOuts += 1
+                # Job with 'running' status should be in qstat. Because status in DB is set at 'running' by the job launched.
+                if not self.isJobStillHandledBySge(jobid, jobname):
+                    # But if not, let time for the status update (in DB), if the job finished between the query execution and now.
+                    time.sleep( 5 )
+                # If no update at 'finished', exit
+                #TODO: check status in DB
+                if not self.isJobStillHandledBySge(jobid, jobname):
+                    msg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore" % ( jobid )
+                    msg += "\nit was launched the %s (> %.2f hours ago)" % ( dateTimeOldestJob, timeOutPerJob/3600.0 )
+                    msg += "\nthis problem can be due to:"
+                    msg += "\n* memory shortage, in that case, decrease the size of your jobs;"
+                    msg += "\n* timeout, in that case, decrease the size of your jobs;"
+                    msg += "\n* node failure or database error, in that case, launch the program again or ask your system administrator."
+                    sys.stderr.write("%s\n" % msg)
+                    sys.stderr.flush()
+                    self.cleanJobGroup(groupid)
+                    sys.exit(1)
+        return nbTimeOuts
+                        
+    ## Check if a job is still handled by SGE
+    #
+    # @param jobid string job identifier
+    # @param jobname string job name
+    #  
+    def isJobStillHandledBySge(self, jobid, jobname):
+        isJobInQstat = False
+        qstatFile = "qstat_stdout"
+        cmd = "qstat > %s" % qstatFile
+        returnStatus = os.system(cmd)
+        if returnStatus != 0:
+            msg = "ERROR while launching 'qstat'"
+            sys.stderr.write( "%s\n" % msg )
+            sys.exit(1)
+        qstatFileHandler = open(qstatFile, "r")
+        lLines = qstatFileHandler.readlines()
+        for line in lLines:
+            tokens = line.split()
+            if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
+                isJobInQstat = True
+                break
+        qstatFileHandler.close()
+        os.remove(qstatFile)
+        return isJobInQstat
+    
+    def _getQsubCommand(self, job):    
+        cmd = "echo '%s' | " % job.launcher
+        prg = "qsub"
+        cmd += prg
+        cmd += " -V"
+        cmd += " -N %s" % job.jobname
+        if job.queue != "":
+            cmd += " -q %s" % job.queue
+        cmd += " -cwd"
+        if job.lResources != []:
+            cmd += " -l \""
+            cmd += " ".join(job.lResources)
+            cmd += "\""
+        if job.parallelEnvironment != "":
+            cmd += " -pe " + job.parallelEnvironment
+        cmd += " > jobid.stdout"
+        return cmd
+    
+    def _getJobidFromJobManager(self, jobidFileHandler):
+        return int(jobidFileHandler.readline().split(" ")[2])
+    
+
+class TableJobAdaptatorTorque(TableJobAdaptator):  
+                        
+    def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid):
+        return nbTimeOuts
+        
+    def _getQsubCommand(self, job):    
+        cmd = "echo '%s' | " % job.launcher
+        prg = "qsub"
+        cmd += prg
+        cmd += " -V"
+        cmd += " -d %s" % os.getcwd()
+        cmd += " -N %s" % job.jobname
+        if job.queue != "":
+            cmd += " -q %s" % job.queue
+        if job.lResources != []:
+            cmd += " -l \""
+            cmd += " ".join(job.lResources).replace("mem_free","mem")
+            cmd += "\""
+        cmd += " > jobid.stdout"
+        return cmd
+
+    def _getJobidFromJobManager(self, jobidFileHandler):
+        return int(jobidFileHandler.readline().split(".")[0])