Mercurial > repos > vandelj > giant_factor_generator
diff src/General_functions.py @ 0:4764dc6a1019 draft
"planemo upload for repository https://github.com/juliechevalier/GIANT/tree/master commit cb276a594444c8f32e9819fefde3a21f121d35df"
author | vandelj |
---|---|
date | Fri, 26 Jun 2020 09:51:15 -0400 |
parents | |
children | 7a520f7169e1 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/General_functions.py Fri Jun 26 09:51:15 2020 -0400 @@ -0,0 +1,206 @@ +import re +import numpy as np + +def get_column_names( file_path, toNotConsider=-1, each=1): + options=[] + inputfile = open(file_path) + firstLine = next(inputfile).strip().split("\t") + cpt=0 + for i, field_component in enumerate( firstLine ): + if i!=toNotConsider:#to squeeze the first column + if cpt==0: + options.append( ( field_component, field_component, False ) ) + cpt+=1 + if cpt==each: + cpt=0 + inputfile.close() + return options + +def get_column_names_filteredList( file_path, toNotConsider=[], each=1): + options=[] + inputfile = open(file_path) + firstLine = next(inputfile).strip().split("\t") + cpt=0 + for i, field_component in enumerate( firstLine ): + if i not in toNotConsider:#to squeeze the first columns + if cpt==0: + options.append( ( field_component, field_component, False ) ) + cpt+=1 + if cpt==each: + cpt=0 + inputfile.close() + return options + +def get_column_names_mergeNumber(file_path, numberToMerge=1, toNotConsider=[]): + options=[] + inputfile = open(file_path) + if int(numberToMerge)>0: + iHeader=0 + for iCurrentLine in inputfile: + iHeader=iHeader+1 + if iHeader>int(numberToMerge): + break + currentLine=iCurrentLine.strip().split("\t") + iOption=-1 + for i, field_component in enumerate( currentLine ): + if i not in toNotConsider:#to squeeze specified columns + iOption=iOption+1 + if iHeader==1: + options.append( ( str(field_component), str(field_component), False ) ) + else: + options[iOption]=(options[iOption][0]+"_"+str(field_component),options[iOption][1]+"_"+str(field_component),False) + else: + currentLine = next(inputfile).strip().split("\t") + for i, field_component in enumerate( currentLine ): + if i not in toNotConsider:#to squeeze specified columns + options.append( ( "Column_"+str(i), "Column_"+str(i), False ) ) + inputfile.close() + return options + +def get_row_names( file_path, factorName ): + inputfile = open(file_path) + firstLine = next(inputfile).strip().split("\t") + iColumn=-1 + for i, field_component in enumerate( firstLine ): + if field_component==factorName:#to test + iColumn=i + options=[] + if iColumn!=-1: + for nextLine in inputfile: + nextLine=nextLine.strip().split("\t") + if len(nextLine)>1: + if (nextLine[iColumn], nextLine[iColumn], False) not in options: + options.append( (nextLine[iColumn], nextLine[iColumn], False) ) + inputfile.close() + return options + +def get_condition_file_names( file_list, toNotConsider=-1, each=1): + options=[] + if not isinstance(file_list,list):#if input file is a tabular file, act as get_column_names + inputfile = open(file_list.file_name) + firstLine = next(inputfile).strip().split("\t") + cpt=0 + for i, field_component in enumerate( firstLine ): + if i!=toNotConsider:#to squeeze the first column + if cpt==0: + options.append( ( field_component, field_component, False ) ) + cpt+=1 + if cpt==each: + cpt=0 + inputfile.close() + else:#if input file is a .cel file list or a collection + if not hasattr(file_list[0],'collection'):#if it is not a collection, get name easily + for i, field_component in enumerate( file_list ): + options.append( ( field_component.name, field_component.name, False ) ) + else:#if the file is a collection, have to get deeper in the corresponding HistoryDatasetCollectionAssociation object + for i, field_component in enumerate( file_list[0].collection.elements ): + options.append( ( field_component.element_identifier, field_component.element_identifier, False ) ) + return options + +def generateFactorFile( file_list, factor_list, outputFileName, logFile): + forbidenCharacters={"*",":",",","|"} + outputfile = open(outputFileName, 'w') + outputLog = open(logFile, 'w') + sampleList=[] + if not isinstance(file_list,list): + conditionNames=get_condition_file_names(file_list,0) #unique expression file, remove the first column (index=0) + else : + conditionNames=get_condition_file_names(file_list) #.CEL files + for iSample, sample_component in enumerate (conditionNames): + sampleList.append(str(sample_component[1])) + outputLog.write("[INFO] "+str(len(sampleList))+" sample are detected as input\n") + globalDict=dict() + factorNameList=[] + firstLine="Conditions" + if len(factor_list)==0:#check if there is at least one factor available + outputLog.write("[ERROR] no factor was defined !\n") + return 1 + else: + for iFactor, factor_component in enumerate( factor_list ): + currentSampleList=list(sampleList) + currentFactor=str(factor_component['factorName']) + #check if factor name contains forbidden characters + for specialCharacter in forbidenCharacters: + if currentFactor.find(specialCharacter)!=-1: + outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in factor name : '"+currentFactor+"'\n") + return 4 + #check if factor allready named like that + if not globalDict.get(currentFactor) is None: + outputLog.write("[ERROR] '"+currentFactor+"' is used several times as factor name\n") + return 3 + globalDict[currentFactor]=dict() + firstLine=firstLine+"\t"+currentFactor + factorNameList.append(currentFactor) + if len(factor_component['valueList'])<=1:#check if there is at least two value available + outputLog.write("[ERROR] at least two different values are necessary for '"+currentFactor+"' factor\n") + return 1 + else: + for iValue, value_component in enumerate( factor_component['valueList'] ): + currentValue=str(value_component['valueName']) + #check if factor name contains forbidden characters + for specialCharacter in forbidenCharacters: + if currentValue.find(specialCharacter)!=-1: + outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in value name : '"+currentValue+"'\n") + return 4 + currentSample=str(value_component['valueConditions']).split(",") + for iSample, sample_component in enumerate (currentSample): + if not sample_component in currentSampleList: + outputLog.write("[ERROR] sample "+sample_component+" was assigned several times for factor '"+currentFactor+"'\n") + return 2 + currentSampleList.remove(sample_component) + globalDict[currentFactor][sample_component]=currentValue + if(len(currentSampleList)>0): + outputLog.write("[ERROR] for factor '"+currentFactor+"'' sample "+str(currentSampleList)+" are not assigned to any value\n") + return 2 + outputLog.write("[INFO] "+str(len(globalDict))+" factors are detected\n") + #start writing the factor file + outputfile.write(firstLine+"\n") + for iSample, sample_component in enumerate(sampleList): + newLine=sample_component + for iFactor, factor_component in enumerate(factorNameList): + newLine=newLine+"\t"+globalDict[factor_component][sample_component] + outputfile.write(newLine+"\n") + outputfile.close() + outputLog.close() + return 0 + +def selectSubSetTable(file_path,headerLine_number,columnsToAdd,columnNamesToKeep,outputFileName,logFile): + outputLog = open(logFile, 'w') + outputLog.write("[INFO] header line number : "+ headerLine_number+" lines\n") + availableColumnsTuple=get_column_names_mergeNumber(file_path, headerLine_number) + #convert tuple list as a simple array + availableColumns=[] + for iTuple, tuple_content in enumerate (availableColumnsTuple): + availableColumns.append(str(tuple_content[0])) + if len(availableColumns)==0: + outputLog.write("[ERROR] No detected columns in input file\n") + return 1 + selectedColumns=list(columnsToAdd) + for iVolcano, volcano_content in enumerate(columnNamesToKeep): + selectedColumns.append(availableColumns.index(volcano_content['pvalColumn'])) + if volcano_content['fdrColumn'] in availableColumns: + selectedColumns.append(availableColumns.index(volcano_content['fdrColumn'])) + else: + selectedColumns.append(0) + selectedColumns.append(availableColumns.index(volcano_content['fcColumn'])) + if len(selectedColumns)!=(3*len(columnNamesToKeep)+len(columnsToAdd)): + outputLog.write("[ERROR] matching between input file colnames and requested column names failed\n") + return 1 + outputLog.write("[INFO] columns kept : "+str(selectedColumns)+"\n") + #start writting formatted file + inputfile = open(file_path) + outputfile = open(outputFileName, 'w') + iLineCpt=-1 + for iCurrentLine in inputfile: + iLineCpt=iLineCpt+1 + if iLineCpt>=int(headerLine_number): + currentLineFields=np.array(iCurrentLine.strip().split("\t")) + newLine="\t".join(currentLineFields[selectedColumns]) + outputfile.write(newLine+"\n") + if iLineCpt<int(headerLine_number): + outputLog.write("[ERROR] not enough lines in input files ("+(iLineCpt+1)+" lines)\n") + return 1 + inputfile.close() + outputfile.close() + outputLog.close() + return 0 \ No newline at end of file