Mercurial > repos > urgi-team > teiso
view TEisotools-1.0/commons/core/utils/FileUtils.py @ 6:20ec0d14798e draft
Uploaded
author | urgi-team |
---|---|
date | Wed, 20 Jul 2016 05:00:24 -0400 |
parents | |
children |
line wrap: on
line source
# Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. import os import re import sys import math import glob import shutil import subprocess from operator import itemgetter try: import hashlib except: pass class FileUtils( object ): ## Return the number of lines in the given file # @staticmethod def getNbLinesInSingleFile( fileName ): cmd = "wc -l %s" % fileName r = subprocess.Popen(cmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0] nbLines = int(r.split()[0]) toAdd = 0 if nbLines: cmd = "tail -1 %s" % fileName r = subprocess.Popen(cmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0] if r == '\n': toAdd -= 1 elif '\n' not in r: toAdd += 1 return nbLines + toAdd ## Return the number of lines in the files in the given list # @staticmethod def getNbLinesInFileList( lFileNames ): count = 0 for fileName in lFileNames: count += FileUtils.getNbLinesInSingleFile( fileName ) return count ## Return True if the given file exists, False otherwise # @staticmethod def isRessourceExists( fileName ): return os.path.exists( fileName ) ## Return True if the given file is empty, False otherwise # @staticmethod def isEmpty( fileName ): return 0 == FileUtils.getNbLinesInSingleFile( fileName ) ## Return True if both files are identical, False otherwise # @staticmethod def are2FilesIdentical( file1, file2 ): tmpFile = "diff_%s_%s" % ( os.path.basename(file1), os.path.basename(file2) ) cmd = "diff %s %s >> %s" % ( file1, file2, tmpFile ) returnStatus = os.system( cmd ) if returnStatus != 0: print "WARNING: 'diff' returned '%i'" % returnStatus os.remove( tmpFile ) return False if FileUtils.isEmpty( tmpFile ): os.remove( tmpFile ) return True else: os.remove( tmpFile ) return False ## Return a string with all the content of the files in the given list # @staticmethod def getFileContent( lFiles ): content = "" lFiles.sort() for fileName in lFiles: currentFile = open( fileName, "r" ) content += currentFile.read() currentFile.close() return content ## Save content of the given file after having sorted it # @staticmethod def sortFileContent( inFile, outFile="" ): inFileHandler = open(inFile, "r" ) lines = inFileHandler.readlines() inFileHandler.close() lines.sort() if outFile == "": outFile = inFile outFileHandler = open( outFile, "w" ) outFileHandler.writelines( lines ) outFileHandler.close() ## Add end-of-line symbol to the given file content if necessary # @staticmethod def addNewLineAtTheEndOfFileContent( fileContent ): if not fileContent.endswith('\n') and len(fileContent) != 0: fileContent += '\n' return fileContent ## Concatenate files in the given list # @staticmethod def catFilesFromList( lFiles, outFile, sort=True, skipHeaders = False, separator = "" ): if sort: lFiles.sort() outFileHandler = open( outFile, "a" ) isFirstFile = True for singleFile in lFiles: if not isFirstFile: outFileHandler.write(separator) isFirstFile = False singleFileHandler = open( singleFile, "r" ) if skipHeaders: singleFileHandler.readline() line = singleFileHandler.readline() while line: outFileHandler.write(line) line = singleFileHandler.readline() singleFileHandler.close() outFileHandler.close() ## Concatenate files according to the given pattern # @staticmethod def catFilesByPattern( pattern, outFile, skipHeaders = False, separator = "" ): lFiles = glob.glob( pattern ) FileUtils.catFilesFromList( lFiles, outFile, skipHeaders = skipHeaders, separator = separator ) ## Cat all files of a given directory # # @param dir string directory name # @param outFileName string output file name # @staticmethod def catFilesOfDir(directory, outFileName): FileUtils.catFilesByPattern("%s/*" % directory, outFileName) ## Remove files listed according to the given pattern # # @example prefix="/home/tmp/dummy*.txt" # @staticmethod def removeFilesByPattern( prefix ): lFiles = glob.glob( prefix ) for f in lFiles: os.remove( f ) ## Remove files listed according to the suffixes in the given list # @staticmethod def removeFilesBySuffixList( targetPath, lSuffixes ): if targetPath[-1] == "/": targetPath = targetPath[:-1] for suffix in lSuffixes: pattern = "%s/*%s" % ( targetPath, suffix ) FileUtils.removeFilesByPattern( pattern ) ## Remove repeated blanks in the given file # @staticmethod def removeRepeatedBlanks( inFile, outFile="" ): if outFile == "": outFile = inFile tmpFile = "tr_%s_%s" % ( inFile, outFile ) cmd = "tr -s ' ' < %s > %s" % ( inFile, tmpFile ) os.system( cmd ) os.rename( tmpFile, outFile ) ## Remove files in the given list # @staticmethod def removeFilesFromList(lFiles): for f in lFiles: os.remove(f) ## Remove files in the given list if exist # @staticmethod def removeFilesFromListIfExist(lFiles): for fileName in lFiles: if FileUtils.isRessourceExists(fileName): os.remove(fileName) ## Append the content of a file to another file # # @param inFile string name of the input file # @param outFile string name of the output file # @staticmethod def appendFileContent( inFile, outFile ): outFileHandler = open( outFile, "a" ) inFileHandler = open( inFile, "r" ) shutil.copyfileobj( inFileHandler, outFileHandler ) inFileHandler.close() outFileHandler.close() ## Replace Windows end-of-line by Unix end-of-line # @staticmethod def fromWindowsToUnixEof( inFile ): tmpFile = "%s.tmp" % ( inFile ) shutil.copyfile( inFile, tmpFile ) os.remove( inFile ) tmpFileHandler = open( tmpFile, "r" ) inFileHandler = open( inFile, "w" ) while True: line = tmpFileHandler.readline() if line == "": break inFileHandler.write( line.replace("\r\n","\n") ) tmpFileHandler.close() inFileHandler.close() os.remove( tmpFile ) ## Remove duplicated lines in a file # # @note it preserves the initial order and handles blank lines # @staticmethod def removeDuplicatedLines( inFile ): tmpFile = "%s.tmp" % ( inFile ) shutil.copyfile( inFile, tmpFile ) os.remove( inFile ) tmpFileHandler = open( tmpFile, "r" ) lLines = list( tmpFileHandler.read().split("\n") ) if lLines[-1] == "": del lLines[-1] sLines = set( lLines ) tmpFileHandler.close() os.remove( tmpFile ) inFileHandler = open( inFile, "w" ) for line in lLines: if line in sLines: inFileHandler.write( "%s\n" % ( line ) ) sLines.remove( line ) inFileHandler.close() ## Write a list of lines in a given file # @staticmethod def writeLineListInFile( inFile, lLines ): inFileHandler = open( inFile, "w" ) for line in lLines: inFileHandler.write( line ) inFileHandler.close() ## Give the list of absolute path of each directory in the given directory # # @param rootPath string absolute path of the given directory # # @return lDirPath list of absolute directory path # @staticmethod def getAbsoluteDirectoryPathList(rootPath): lDirPath = [] lPaths = glob.glob(rootPath + "/*") for ressource in lPaths: if os.path.isdir(ressource) : lDirPath.append(ressource) return lDirPath ## Get a sublist of which each element matches/doesn't match a pattern # # @param lPath string list of paths # # @param pattern string pattern # # @param match bool # # @return lPathMatching list of path matching pattern # @staticmethod def getSubListAccordingToPattern(lPath, pattern, match = True): lPathMatching = [] for path in lPath: if match: if re.match(".*%s.*" % pattern, path): lPathMatching.append(path) else: if not re.match(".*%s.*" % pattern, path): lPathMatching.append(path) return lPathMatching ## Give the list of file names found in the given directory # # @param dirPath string absolute path of the given directory # # @return lFilesInDir list of file names # @staticmethod def getFileNamesList( dirPath, patternFileFilter = ".*" ): lFilesInDir = [] lPaths = glob.glob( dirPath + "/*" ) for ressource in lPaths: if os.path.isfile( ressource ): fileName = os.path.basename( ressource ) if re.match(patternFileFilter, fileName): lFilesInDir.append( fileName ) return lFilesInDir ## Return the MD5 sum of a file # @staticmethod def getMd5SecureHash( inFile ): if "hashlib" in sys.modules: md5 = hashlib.md5() inFileHandler = open( inFile, "r" ) while True: line = inFileHandler.readline() if line == "": break md5.update( line ) inFileHandler.close() return md5.hexdigest() else: return "" ## Return True if size file > 0 octet # # @param fileName string file name # @staticmethod def isSizeNotNull(fileName): size = os.path.getsize(fileName) if size > 0: return True return False ## Split one file into N Files by lines # # @param fileName string file name # @param N int number of files to create # @staticmethod def splitFileIntoNFiles(fileName, N): nbLine = FileUtils.getNbLinesInSingleFile(fileName) nbLinesInEachFile = nbLine if N > nbLine: N = nbLine if N != 0: nbLinesInEachFile = math.ceil(float(nbLine) / N) else: N = 1 filePrefix, fileExt = os.path.splitext(os.path.basename(fileName)) fileHandler = open(fileName, "r") for i in range(1,N+1): with open("%s-%s%s" %(filePrefix, i, fileExt), "w") as f: j = 0 while j < nbLinesInEachFile: j += 1 f.write(fileHandler.readline()) fileHandler.close() ## Split one file into files of N lines # # @param fileName string input file name # @param N int lines number per files # @staticmethod def splitFileAccordingToLineNumber(fileName, N): filePrefix, fileExt = os.path.splitext(os.path.basename(fileName)) with open(fileName) as inF: fileNb = 1 line = inF.readline() if not line or N == 0: outFileName = "%s-%s%s" %(filePrefix, fileNb, fileExt) f = open(outFileName, "wb") shutil.copyfileobj(open(fileName, "rb"), f) f.close() else: while line: outFileName = "%s-%s%s" %(filePrefix, fileNb, fileExt) with open(outFileName, "w") as outF: lineNb = 1 while lineNb <= N and line: outF.write(line) line = inF.readline() lineNb += 1 fileNb += 1 ## Concatenates names from a list, using a given separator and a given extension. # # @param lNames list of file names # @param sep separator used to join names # @param ext extension of the return file name. If None, the most represented extension in lNames is used. # If there is several, the first extension of theses several in alphabetical order is used # # @return concatName name concatenated # @staticmethod def concatenateFileNamesFromList(lNames, sep = "_", ext = None): concatName = "" if lNames: lNames.sort() tBaseNames, tExt = zip(*[os.path.splitext(os.path.basename(name)) for name in lNames]) if ext is None: dtExtToNb = {} for extension in set(tExt): dtExtToNb[extension] = tExt.count(extension) items = sorted(dtExtToNb.items(), key = itemgetter(0)) items.sort(key = itemgetter(1), reverse = True) ext = items[0][0] if ext and ext[0] != '.': ext = ".%s" % ext concatName = "%s%s" % (sep.join(tBaseNames), ext) return concatName ## Concatenates names from a string, using a given separator and a given extension. Names are split from the string using splitSep # # @param filesNames list of file names # @param splitSep separator used to split names from the input string # @param joinSep separator used to join names # @param ext extension of the return file name. If None, the most represented extension in lNames is used. # If there is several, the first extension of theses several in alphabetical order is used # # @return concatName,lFilesNames name concatenated and split files list sorted alphabetically. Return original name if splitSep is empty. # @staticmethod def concatenateFileNamesFromString(filesNames, splitSep = ",", joinSep = "_", ext = None): if splitSep: lFilesNames = filesNames.split(splitSep) return FileUtils.concatenateFileNamesFromList(lFilesNames, joinSep, ext), lFilesNames else: print "WARNING: no split separator provided, returning input string" return filesNames, [filesNames]