Mercurial > repos > urgi-team > teiso
diff TEisotools-1.1.a/commons/core/checker/CheckerUtils.py @ 16:836ce3d9d47a draft default tip
Uploaded
author | urgi-team |
---|---|
date | Thu, 21 Jul 2016 07:42:47 -0400 |
parents | 255c852351c5 |
children |
line wrap: on
line diff
--- a/TEisotools-1.1.a/commons/core/checker/CheckerUtils.py Thu Jul 21 07:36:44 2016 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,315 +0,0 @@ -# Copyright INRA (Institut National de la Recherche Agronomique) -# http://www.inra.fr -# http://urgi.versailles.inra.fr -# -# This software is governed by the CeCILL license under French law and -# abiding by the rules of distribution of free software. You can use, -# modify and/ or redistribute the software under the terms of the CeCILL -# license as circulated by CEA, CNRS and INRIA at the following URL -# "http://www.cecill.info". -# -# As a counterpart to the access to the source code and rights to copy, -# modify and redistribute granted by the license, users are provided only -# with a limited warranty and the software's author, the holder of the -# economic rights, and the successive licensors have only limited -# liability. -# -# In this respect, the user's attention is drawn to the risks associated -# with loading, using, modifying and/or developing or reproducing the -# software by the user in light of its specific status of free software, -# that may mean that it is complicated to manipulate, and that also -# therefore means that it is reserved for developers and experienced -# professionals having in-depth computer knowledge. Users are therefore -# encouraged to load and test the software's suitability as regards their -# requirements in conditions enabling the security of their systems and/or -# data to be ensured and, more generally, to use and operate it in the -# same conditions as regards security. -# -# The fact that you are presently reading this means that you have had -# knowledge of the CeCILL license and that you accept its terms. - - -import os -import sys -import re -import glob -from ConfigParser import NoOptionError -from ConfigParser import NoSectionError -from commons.core.checker.CheckerException import CheckerException - - -## A set of static methods used to perform checks. -# -# -class CheckerUtils( object ): - - ## Check if blastName param is in ["blastn", "blastp", "blastx", "tblastn", "tblastx"] - # - # @param blastName name to check - # @return True if name is in list False otherwise - # - def isBlastNameNotInBlastValues( blastName ): - blastValuesSet = set( ["blastn", "blastp", "blastx", "tblastn", "tblastx"] ) - blastNameSet = set( [ blastName ] ) - return not blastNameSet.issubset( blastValuesSet ) - - isBlastNameNotInBlastValues = staticmethod( isBlastNameNotInBlastValues ) - - - ## Check if param is NOT "TRUE" and NOT false "FALSE" - # - # @param param str to check - # @return True if param is not eq to "TRUE" AND not eq to "FALSE", false otherwise - # - def isNotTRUEisNotFALSE( param ): - return param != "TRUE" and param != "FALSE" - - isNotTRUEisNotFALSE = staticmethod( isNotTRUEisNotFALSE ) - - - ## Check if resource (file or dir) do NOT exists - # - # @param resource file or dir to check - # @return True if resource exists False otherwise - # - def isRessourceNotExits( resource ): - return not os.path.exists( resource ) - - isRessourceNotExits = staticmethod( isRessourceNotExits ) - - - ## Check a specific E-value format: de-dd - # - # @param param E-value to check - # @return True if format is de-dd False otherwise - # - def isNotAeValueWithOneDigit2DecimalsAtLeast( param ): - # \d\d stands for 2 digits and more ??? - return not re.match( "\de\-\d\d", param ) - - isNotAeValueWithOneDigit2DecimalsAtLeast = staticmethod( isNotAeValueWithOneDigit2DecimalsAtLeast ) - - - ## Check a number format - # - # @param param value to check - # @return True if param is a number (d+) False otherwise - # - def isNotANumber( param ): - return not re.match( "\d+", param ) - - isNotANumber = staticmethod( isNotANumber ) - - - ## Check if an executable is in the user's PATH - # - # @param exeName name of the executable - # @return True if executable in user's PATH, False otherwise - # - def isExecutableInUserPath( exeName ): - dirPathList = os.environ["PATH"].split(":") - for dirPath in dirPathList: - if os.path.isdir( dirPath ): - try: - binPathList = glob.glob( dirPath + "/*" ) - except OSError, e: - continue - for binPath in binPathList: - bin = os.path.basename( binPath ) - if bin == exeName: - return True - return False - - isExecutableInUserPath = staticmethod( isExecutableInUserPath ) - - - ## Return the full path of a given executable - # - def getFullPathFromExecutable( exeName ): - lDirFromUserPath = os.environ["PATH"].split(":") - for dir in lDirFromUserPath: - if os.path.isdir( dir ): - try: - lExecutables = glob.glob( "%s/*" % ( dir ) ) - except OSError, e: - continue - for exe in lExecutables: - path, exe = os.path.split( exe ) - if exe == exeName: - return path - return "" - - getFullPathFromExecutable = staticmethod( getFullPathFromExecutable ) - - - #TODO: to remove ? - ## Check if a queue Name is valid. Warning: Only with the queue manager SGE - # - # @param fullQueueName name of the queue to test (with or without parameters) - # @return True if queue name is valid, False otherwise - # - def isQueueNameValid( fullQueueName ): - queueName = fullQueueName.split()[0] - if queueName == "none": - return True - queueFile = "queueName.txt" - if not CheckerUtils.isExecutableInUserPath( "qconf" ): - msg = "executable 'qconf' can't be found" - sys.stderr.write( "%s\n" % ( msg ) ) - return False - cmd = "qconf -sql > " + queueFile - os.system( cmd ) - queueFileHandler = open( queueFile, "r" ) - lQueueNames = queueFileHandler.readlines() - queueFileHandler.close() - os.remove( queueFile ) - queueNameValid = False - for qName in lQueueNames: - qName = qName.strip() - if qName == queueName: - queueNameValid = True - break - return queueNameValid - - isQueueNameValid = staticmethod( isQueueNameValid ) - - - ## Check if a string length is lower or equal than 15 - # - # @param strName any string - # @return True if string length is <= 15, False otherwise - # - def isMax15Char( strName ): - return (len(strName) <= 15 ) - - isMax15Char = staticmethod( isMax15Char ) - - - ## Check if a string is made with only alphanumeric or underscore character - # - # @param strName any string - # @return True if string is with alphanumeric or underscore, False otherwise - # - def isCharAlphanumOrUnderscore( strName ): - # authorized ALPHABET [a-z,A-Z,0-9,_] - p = re.compile('\W') - errList=p.findall(strName) - if len( errList ) > 0 : - return False - else: - return True - - isCharAlphanumOrUnderscore = staticmethod( isCharAlphanumOrUnderscore ) - - - ## Check if sectionName is in the configuration file - # - # @param config filehandle of configuration file - # @param sectionName string of section name to check - # @exception NoSectionError: if section not found raise a NoSectionError - # - def checkSectionInConfigFile( config, sectionName ): - if not (config.has_section(sectionName)): - raise NoSectionError(sectionName) - - checkSectionInConfigFile = staticmethod( checkSectionInConfigFile ) - - - ## Check if an option is in a specified section in the configuration file - # - # @param config filehandle of configuration file - # @param sectionName string of section name - # @param optionName string of option name to check - # @exception NoOptionError: if option not found raise a NoOptionError - # - def checkOptionInSectionInConfigFile( config, sectionName, optionName ): - config.get( sectionName, optionName ) - - checkOptionInSectionInConfigFile = staticmethod( checkOptionInSectionInConfigFile ) - - - ## Check version number coherency between configFile and CHANGELOG - # - # @param config ConfigParser Instance of configuration file - # @param changeLogFileHandle CHANGELOG file handle - # @exception NoOptionError: if option not found raise a NoOptionError - # - def checkConfigVersion( changeLogFileHandle, config ): - line = changeLogFileHandle.readline() - while not line.startswith("REPET release "): - line = changeLogFileHandle.readline() - numVersionChangeLog = line.split()[2] - - numVersionConfig = config.get("repet_env", "repet_version") - - if not numVersionChangeLog == numVersionConfig: - message = "*** Error: wrong config file version. Expected version num is " + numVersionChangeLog + " but actual in config file is " + numVersionConfig - raise CheckerException(message) - - checkConfigVersion = staticmethod( checkConfigVersion ) - - - ## Get version number from CHANGELOG - # - # @param changeLogFile CHANGELOG file name - # - def getVersionFromChangelogFile(changeLogFileName): - with open(changeLogFileName) as changeLogFileHandle: - line = changeLogFileHandle.readline() - while not line.startswith("REPET release "): - line = changeLogFileHandle.readline() - numVersionChangeLog = line.split()[2] - return numVersionChangeLog - - - getVersionFromChangelogFile = staticmethod( getVersionFromChangelogFile ) - - - ## Check if headers of an input file contain only alpha numeric characters and "_ : . -" - # - # @param fileHandler file handle - # @exception CheckerException if bad header raise a CheckerException - # - def checkHeaders( fileHandler ): - lHeaders = CheckerUtils._getHeaderFromFastaFile(fileHandler) - p = re.compile('[^a-zA-Z0-9_:\.\-]', re.IGNORECASE) - lWrongHeaders = [] - for header in lHeaders: - errList=p.findall(header) - if len( errList ) > 0 : - lWrongHeaders.append(header) - if lWrongHeaders != []: - exception = CheckerException() - exception.setMessages(lWrongHeaders) - raise exception - - checkHeaders = staticmethod( checkHeaders ) - - - def _getHeaderFromFastaFile( inFile ): - lHeaders = [] - while True: - line = inFile.readline() - if line == "": - break - if line[0] == ">": - lHeaders.append( line[1:-1] ) - return lHeaders - - _getHeaderFromFastaFile = staticmethod( _getHeaderFromFastaFile ) - - - ## Return True if an option is in a specified section in the configuration file, False otherwise - # - # @param config handler of configuration file - # @param sectionName string of section name - # @param optionName string of option name to check - # - def isOptionInSectionInConfig( configHandler, section, option ): - try: - CheckerUtils.checkOptionInSectionInConfigFile( configHandler, section, option ) - except NoOptionError: - return False - return True - - isOptionInSectionInConfig = staticmethod( isOptionInSectionInConfig )