view commons/core/sql/JobAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line source

# Copyright INRA (Institut National de la Recherche Agronomique)
# http://www.inra.fr
# http://urgi.versailles.inra.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software.  You can  use, 
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info". 
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability. 
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or 
# data to be ensured and,  more generally, to use and operate it in the 
# same conditions as regards security. 
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.


import os
import time
import sys
import tempfile
import subprocess
from commons.core.sql.Job import Job

## Methods for Job persistence 
#
class JobAdaptator(object):
    
    def __init__(self, lJob = [], table = "" ):
        self._lJobID = lJob
        self._table = table
        self._acronym = ""
    ## Record a job
    #
    # @param job Job instance with the job informations
    #
    def recordJob(self, job):
        self._lJobID.append(job)
    
    ## Remove a job from the job table
    #
    #  @param job: job instance to remove
    #
    def removeJob(self, job):
        pass         
            
    ## Set the jobid of a job with the id of SGE
    #
    # @param job job instance
    # @param jobid integer
    #
    def updateJobIdInDB(self, job, jobid):
        pass
        
    ## Get a job status
    #
    # @param job: a Job instance with the job informations
    #
    def getJobStatus(self, job):
        pass
    
    
    ## Change a job status
    #
    # @param job: a Job instance with the job informations
    # @param status: the new status (waiting,finished,error)
    #
    def changeJobStatus(self, job, status):
        pass
        
    ## Get the number of jobs belonging to the desired groupid with the desired status.
    #
    # @param groupid string a group identifier to record related job series 
    # @param status string job status (waiting, running, finished, error)
    # @return int
    #
    def getCountStatus(self, groupid, status):
        pass
        
    ## Clean all job from a job group
    #
    # @param groupid: a group identifier to record related job series
    #
    def cleanJobGroup(self, groupid):
        pass            
            
    ## Check if there is unfinished job from a job group.
    #
    # @param groupid string a group identifier to record related job series 
    #        
    def hasUnfinishedJob(self, groupid):
        pass

    def _getJobIDListFromQstat(self):
        lJobIDFromQstat = []
        tmp = tempfile.NamedTemporaryFile(delete=False)
        cmd ="qstat | grep %s" % self._acronym
        process = subprocess.Popen(cmd, shell=True,stdout=tmp)
        process.communicate()
        tmp.close()
        if process.returncode == 0:
            fileName = tmp.name
            jobidFileHandler = open(fileName, "r")        
            for line in jobidFileHandler:
                line2 = line.lstrip(" ")
                lJobIDFromQstat.append(line2.split(" ")[0])
            jobidFileHandler.close()
            os.remove(fileName)
        return lJobIDFromQstat     
     
    def _areJobsStillRunning(self,lJobID,lJobIDFromQstat):
        sorted(lJobID)  
        sorted(lJobIDFromQstat)
        for i in lJobID:
            for j in lJobIDFromQstat:
                if int(i)== int(j):
                    return True
        return False
                
        
    ## Wait job finished status from a job group.
    #  Job are re-launched if error (max. 3 times)
    #
    # @param groupid string a group identifier to record related job series
    # @param checkInterval integer time laps in seconds between two checks (default = 5)
    # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
    # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
    # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
    #
    def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
        
        while True:
            time.sleep(checkInterval)
            lJobIDFromQstat = self._getJobIDListFromQstat()
            if self._areJobsStillRunning(self._lJobID, lJobIDFromQstat) == False:
                break
    
    ## Submit a job to a queue and record it in job table.
    #
    # @param job a job instance
    # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
    # @param checkInterval integer time laps in seconds between two checks (default = 30)
    # @param verbose integer (default = 0)
    #               
    def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30):
        cmd = self._getQsubCommand(job)
        tmp = tempfile.NamedTemporaryFile(delete=False)
        process = subprocess.Popen(cmd, shell=True,stdout=tmp)
        process.communicate()
        tmp.close()
        if process.returncode == 0:
            fileName = tmp.name
            jobidFileHandler = open(fileName, "r")
            jobid = self._getJobidFromJobManager(jobidFileHandler)
            if verbose > 0:
                print "job '%i %s' submitted" % (jobid, job.jobname)
                sys.stdout.flush()
            job.jobid = jobid
            #newJob= Job(job.jobid, job.jobname, job.groupid, job.queue, job.command, job.launcher, job.node, job.lResources, job.parallelEnvironment)
            self._acronym = job.jobname.split("_")[0][:10]
            self.recordJob(job.jobid)
            jobidFileHandler.close()
            os.remove(fileName)
        return process.returncode


    ## Get the list of nodes where jobs of one group were executed
    #
    # @param groupid string a group identifier of job series 
    # @return lNodes list of nodes names without redundancy
    #
    def getNodesListByGroupId(self, groupId):
        pass
    
    def checkJobTable(self):
        pass
    
    def close(self):
        pass
    
    def _getJobidAndNbJob(self, jobid) :
        tab = jobid.split(".")
        jobid = tab[0]
        tab = tab[1].split(":")
        nbJob = tab[0]
        return jobid, nbJob
    
class JobAdaptatorSGE(JobAdaptator):

   ## Check if a job is still handled by SGE
    #
    # @param jobid string job identifier
    # @param jobname string job name
    #  
    def isJobStillHandledBySge(self, jobid, jobname):
        isJobInQstat = False
        tmp = tempfile.NamedTemporaryFile(delete=False)
        cmd = "qstat"
        process = subprocess.Popen(cmd, shell=True,stdout=tmp)
        process.communicate()
        tmp.close()
        qstatFile = tmp.name
        if process.returncode  != 0:
            msg = "ERROR while launching 'qstat'"
            sys.stderr.write( "%s\n" % msg )
            sys.exit(1)
        qstatFileHandler = open(qstatFile, "r")
        lLines = qstatFileHandler.readlines()
        for line in lLines:
            tokens = line.split()
            if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
                isJobInQstat = True
                break
        qstatFileHandler.close()
        os.remove(qstatFile)
        return isJobInQstat
    
    def _getQsubCommand(self, job):    
        cmd = "echo '%s' | " % job.launcher
        prg = "qsub"
        cmd += prg
        cmd += " -V"
        cmd += " -N %s" % job.jobname
        if job.queue != "":
            cmd += " -q %s" % job.queue
        cmd += " -cwd"
        if job.lResources != []:
            cmd += " -l \""
            cmd += " ".join(job.lResources)
            cmd += "\""
        if job.parallelEnvironment != "":
            cmd += " -pe " + job.parallelEnvironment
        return cmd
    
    def _getJobidFromJobManager(self, jobidFileHandler):
        return int(jobidFileHandler.readline().split(" ")[2])
    

class JobAdaptatorTorque(JobAdaptator):  
        
    def _getQsubCommand(self, job):    
        cmd = "echo '%s' | " % job.launcher
        prg = "qsub"
        cmd += prg
        cmd += " -V"
        cmd += " -d %s" % os.getcwd()
        cmd += " -N %s" % job.jobname
        if job.queue != "":
            cmd += " -q %s" % job.queue
        if job.lResources != []:
            cmd += " -l \""
            cmd += " ".join(job.lResources).replace("mem_free","mem")
            cmd += "\""
        return cmd

    def _getJobidFromJobManager(self, jobidFileHandler):
        return int(jobidFileHandler.readline().split(".")[0])