diff commons/core/sql/RepetJob.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/sql/RepetJob.py	Fri Jan 18 04:54:14 2013 -0500
@@ -0,0 +1,252 @@
+# Copyright INRA (Institut National de la Recherche Agronomique)
+# http://www.inra.fr
+# http://urgi.versailles.inra.fr
+#
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software.  You can  use, 
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info". 
+#
+# As a counterpart to the access to the source code and  rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty  and the software's author,  the holder of the
+# economic rights,  and the successive licensors  have only  limited
+# liability. 
+#
+# In this respect, the user's attention is drawn to the risks associated
+# with loading,  using,  modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean  that it is complicated to manipulate,  and  that  also
+# therefore means  that it is reserved for developers  and  experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or 
+# data to be ensured and,  more generally, to use and operate it in the 
+# same conditions as regards security. 
+#
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+
+
+import os
+import time
+import sys
+from commons.core.sql.DbMySql import DbMySql
+from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
+
+#TODO: to remove... => replace all RepetJob() by TableJobAdaptator()...
+## Methods for Job persistence 
+#
+class RepetJob( DbMySql ):
+        
+        
+    ## Record a job
+    #
+    # @param job Job instance with the job informations
+    #
+    def recordJob( self, job ):
+        self.removeJob( job )
+        sqlCmd = "INSERT INTO %s" % ( job.tablename )
+        sqlCmd += " VALUES ("
+        sqlCmd += " \"%s\"," % ( job.jobid )
+        sqlCmd += " \"%s\"," % ( job.jobname )
+        sqlCmd += " \"%s\"," % ( job.groupid )
+        sqlCmd += " \"%s\"," % ( job.command.replace("\"","\'") )
+        sqlCmd += " \"%s\"," % ( job.launcher )
+        sqlCmd += " \"%s\"," % ( job.queue )
+        sqlCmd += " \"waiting\","
+        sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
+        sqlCmd += " \"?\" );"
+        self.execute( sqlCmd )
+        
+        
+    ## Remove a job from the job table
+    #
+    #  @param job: job instance to remove
+    #
+    def removeJob( self, job ):
+        qry = "DELETE FROM %s" % ( job.tablename )
+        qry += " WHERE groupid='%s'" % ( job.groupid )
+        qry += " AND jobname='%s'" % ( job.jobname )
+        qry += " AND queue='%s';" % ( job.queue )
+        self.execute( qry )
+            
+            
+    ## Set the jobid of a job with the id of SGE
+    #
+    # @param job job instance
+    # @param jobid integer
+    #
+    def setJobIdFromSge( self, job, jobid ):
+        qry = "UPDATE %s" % ( job.tablename )
+        qry += " SET jobid='%i'" % ( int(jobid) )
+        qry += " WHERE jobname='%s'" % ( job.jobname )
+        qry += " AND groupid='%s'" % ( job.groupid )
+        qry += " AND queue='%s';" % ( job.queue )
+        self.execute( qry )
+        
+        
+    ## Get a job status
+    #
+    # @param job: a Job instance with the job informations
+    #
+    def getJobStatus( self, job ):
+        if job.jobid != 0 and job.jobname == "":
+            job.jobname = job.jobid
+            job.jobid = 0
+        qry = "SELECT status FROM %s" % ( job.tablename )
+        qry += " WHERE groupid='%s'" % ( job.groupid )
+        qry += " AND jobname='%s'" % ( job.jobname )
+        qry += " AND queue='%s';" % ( job.queue )
+        self.execute( qry )
+        res = self.fetchall()
+        if len(res) > 1:
+            msg = "ERROR while getting job status: non-unique jobs"
+            sys.stderr.write( "%s\n" % msg )
+            sys.stderr.flush()
+            sys.exit(1)
+        if res == None or len(res) == 0:
+            return "unknown"
+        return res[0][0]
+    
+    
+    ## Change a job status
+    #
+    # @param job: a Job instance with the job informations
+    # @param status: the new status (waiting,finished,error)
+    # @param method: db or file
+    #
+    def changeJobStatus( self, job, status, method=""):
+        sqlCmd = "UPDATE %s" % ( job.tablename )
+        sqlCmd += " SET status='%s'" % ( status )
+        sqlCmd += ",node='%s'" % ( job.node )
+        sqlCmd += " WHERE groupid='%s'" % ( job.groupid )
+        sqlCmd += " AND jobname='%s'" % ( job.jobname )
+        sqlCmd += " AND queue='%s';" % ( job.queue )
+        self.execute( sqlCmd )
+        
+        
+    ## Get the number of jobs belonging to the desired groupid with the desired status.
+    #
+    # @param tablename string table name to record the jobs   
+    # @param groupid string a group identifier to record related job series 
+    # @param status string job status (waiting, running, finished, error)
+    # @return int
+    #
+    def getCountStatus( self, tablename, groupid, status ):
+        qry = "SELECT count(jobname) FROM %s" % ( tablename )
+        qry += " WHERE groupid='%s'" % ( groupid )
+        qry += " AND status='%s';" % ( status )
+        self.execute( qry )
+        res = self.fetchall()
+        return int( res[0][0] )
+        
+        
+    ## Clean all job from a job group
+    #
+    # @param tablename table name to record the jobs
+    # @param groupid: a group identifier to record related job series
+    #
+    def cleanJobGroup( self, tablename, groupid ):
+        if self.doesTableExist( tablename ):
+            qry = "DELETE FROM %s WHERE groupid='%s';" % ( tablename, groupid )
+            self.execute( qry )
+            
+            
+    ## Check if there is unfinished job from a job group.
+    #
+    # @param tablename string table name to record the jobs
+    # @param groupid string a group identifier to record related job series 
+    #        
+    def hasUnfinishedJob( self, tablename, groupid ):
+        if not self.doesTableExist( tablename ):
+            return False
+        qry = "SELECT * FROM %s" % ( tablename )
+        qry += " WHERE groupid='%s'" % ( groupid )
+        qry += " and status!='finished';" 
+        self.execute( qry )
+        res = self.fetchall()
+        if len(res) == 0:
+            return False
+        return True
+    
+         
+    ## Check if a job is still handled by SGE
+    #
+    # @param jobid string job identifier
+    # @param jobname string job name
+    #  
+    def isJobStillHandledBySge( self, jobid, jobname ):
+        isJobInQstat = False
+        qstatFile = "qstat_stdout"
+        cmd = "qstat > %s" % ( qstatFile )
+        returnStatus = os.system( cmd )
+        if returnStatus != 0:
+            msg = "ERROR while launching 'qstat'"
+            sys.stderr.write( "%s\n" % msg )
+            sys.exit(1)
+        qstatFileHandler = open( qstatFile, "r" )
+        lLines = qstatFileHandler.readlines()
+        for line in lLines:
+            tokens = line.split()
+            if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
+                isJobInQstat = True
+                break
+        qstatFileHandler.close()
+        os.remove( qstatFile )
+        return isJobInQstat
+    
+    
+    ## Wait job finished status from a job group.
+    #  Job are re-launched if error (max. 3 times)
+    #
+    # @param tableName string table name to record the jobs
+    # @param groupid string a group identifier to record related job series
+    # @param checkInterval integer time laps in seconds between two checks (default = 5)
+    # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
+    # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
+    # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
+    #
+    def waitJobGroup(self, tableName, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
+        iTJA = TableJobAdaptatorFactory.createInstance(self, tableName)
+        iTJA.waitJobGroup(groupid, checkInterval, maxRelaunch, exitIfTooManyErrors, timeOutPerJob)
+                        
+    ## Submit a job to a queue and record it in job table.
+    #
+    # @param job a job instance
+    # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
+    # @param checkInterval integer time laps in seconds between two checks (default = 30)
+    # @param verbose integer (default = 0)
+    #               
+    def submitJob( self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30 ):
+        iTJA = TableJobAdaptatorFactory.createInstance(self, job.tablename)
+        return iTJA.submitJob(job, verbose, maxNbWaitingJobs, checkInterval)
+                        
+        
+    ## Get the list of nodes where jobs of one group were executed
+    #
+    # @param tablename string table name where jobs are recored   
+    # @param groupid string a group identifier of job series 
+    # @return lNodes list of nodes names
+    #
+    def getNodesListByGroupId( self, tableName, groupId ):
+        qry = "SELECT node FROM %s" % tableName
+        qry += " WHERE groupid='%s'" % groupId
+        self.execute( qry )
+        res = self.fetchall()
+        lNodes = []
+        for resTuple in res:
+            lNodes.append(resTuple[0])
+        return lNodes
+    
+    def getDbName(self):
+        return "DbMySql"
+    
+    def _getJobidAndNbJob(self, jobid) :
+        tab = []
+        tab = jobid.split(".")
+        jobid = tab[0]
+        tab = tab[1].split(":")
+        nbJob = tab[0]
+        return jobid, nbJob