comparison src/General_functions.py @ 0:4764dc6a1019 draft

"planemo upload for repository https://github.com/juliechevalier/GIANT/tree/master commit cb276a594444c8f32e9819fefde3a21f121d35df"
author vandelj
date Fri, 26 Jun 2020 09:51:15 -0400
parents
children 7a520f7169e1
comparison
equal deleted inserted replaced
-1:000000000000 0:4764dc6a1019
1 import re
2 import numpy as np
3
4 def get_column_names( file_path, toNotConsider=-1, each=1):
5 options=[]
6 inputfile = open(file_path)
7 firstLine = next(inputfile).strip().split("\t")
8 cpt=0
9 for i, field_component in enumerate( firstLine ):
10 if i!=toNotConsider:#to squeeze the first column
11 if cpt==0:
12 options.append( ( field_component, field_component, False ) )
13 cpt+=1
14 if cpt==each:
15 cpt=0
16 inputfile.close()
17 return options
18
19 def get_column_names_filteredList( file_path, toNotConsider=[], each=1):
20 options=[]
21 inputfile = open(file_path)
22 firstLine = next(inputfile).strip().split("\t")
23 cpt=0
24 for i, field_component in enumerate( firstLine ):
25 if i not in toNotConsider:#to squeeze the first columns
26 if cpt==0:
27 options.append( ( field_component, field_component, False ) )
28 cpt+=1
29 if cpt==each:
30 cpt=0
31 inputfile.close()
32 return options
33
34 def get_column_names_mergeNumber(file_path, numberToMerge=1, toNotConsider=[]):
35 options=[]
36 inputfile = open(file_path)
37 if int(numberToMerge)>0:
38 iHeader=0
39 for iCurrentLine in inputfile:
40 iHeader=iHeader+1
41 if iHeader>int(numberToMerge):
42 break
43 currentLine=iCurrentLine.strip().split("\t")
44 iOption=-1
45 for i, field_component in enumerate( currentLine ):
46 if i not in toNotConsider:#to squeeze specified columns
47 iOption=iOption+1
48 if iHeader==1:
49 options.append( ( str(field_component), str(field_component), False ) )
50 else:
51 options[iOption]=(options[iOption][0]+"_"+str(field_component),options[iOption][1]+"_"+str(field_component),False)
52 else:
53 currentLine = next(inputfile).strip().split("\t")
54 for i, field_component in enumerate( currentLine ):
55 if i not in toNotConsider:#to squeeze specified columns
56 options.append( ( "Column_"+str(i), "Column_"+str(i), False ) )
57 inputfile.close()
58 return options
59
60 def get_row_names( file_path, factorName ):
61 inputfile = open(file_path)
62 firstLine = next(inputfile).strip().split("\t")
63 iColumn=-1
64 for i, field_component in enumerate( firstLine ):
65 if field_component==factorName:#to test
66 iColumn=i
67 options=[]
68 if iColumn!=-1:
69 for nextLine in inputfile:
70 nextLine=nextLine.strip().split("\t")
71 if len(nextLine)>1:
72 if (nextLine[iColumn], nextLine[iColumn], False) not in options:
73 options.append( (nextLine[iColumn], nextLine[iColumn], False) )
74 inputfile.close()
75 return options
76
77 def get_condition_file_names( file_list, toNotConsider=-1, each=1):
78 options=[]
79 if not isinstance(file_list,list):#if input file is a tabular file, act as get_column_names
80 inputfile = open(file_list.file_name)
81 firstLine = next(inputfile).strip().split("\t")
82 cpt=0
83 for i, field_component in enumerate( firstLine ):
84 if i!=toNotConsider:#to squeeze the first column
85 if cpt==0:
86 options.append( ( field_component, field_component, False ) )
87 cpt+=1
88 if cpt==each:
89 cpt=0
90 inputfile.close()
91 else:#if input file is a .cel file list or a collection
92 if not hasattr(file_list[0],'collection'):#if it is not a collection, get name easily
93 for i, field_component in enumerate( file_list ):
94 options.append( ( field_component.name, field_component.name, False ) )
95 else:#if the file is a collection, have to get deeper in the corresponding HistoryDatasetCollectionAssociation object
96 for i, field_component in enumerate( file_list[0].collection.elements ):
97 options.append( ( field_component.element_identifier, field_component.element_identifier, False ) )
98 return options
99
100 def generateFactorFile( file_list, factor_list, outputFileName, logFile):
101 forbidenCharacters={"*",":",",","|"}
102 outputfile = open(outputFileName, 'w')
103 outputLog = open(logFile, 'w')
104 sampleList=[]
105 if not isinstance(file_list,list):
106 conditionNames=get_condition_file_names(file_list,0) #unique expression file, remove the first column (index=0)
107 else :
108 conditionNames=get_condition_file_names(file_list) #.CEL files
109 for iSample, sample_component in enumerate (conditionNames):
110 sampleList.append(str(sample_component[1]))
111 outputLog.write("[INFO] "+str(len(sampleList))+" sample are detected as input\n")
112 globalDict=dict()
113 factorNameList=[]
114 firstLine="Conditions"
115 if len(factor_list)==0:#check if there is at least one factor available
116 outputLog.write("[ERROR] no factor was defined !\n")
117 return 1
118 else:
119 for iFactor, factor_component in enumerate( factor_list ):
120 currentSampleList=list(sampleList)
121 currentFactor=str(factor_component['factorName'])
122 #check if factor name contains forbidden characters
123 for specialCharacter in forbidenCharacters:
124 if currentFactor.find(specialCharacter)!=-1:
125 outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in factor name : '"+currentFactor+"'\n")
126 return 4
127 #check if factor allready named like that
128 if not globalDict.get(currentFactor) is None:
129 outputLog.write("[ERROR] '"+currentFactor+"' is used several times as factor name\n")
130 return 3
131 globalDict[currentFactor]=dict()
132 firstLine=firstLine+"\t"+currentFactor
133 factorNameList.append(currentFactor)
134 if len(factor_component['valueList'])<=1:#check if there is at least two value available
135 outputLog.write("[ERROR] at least two different values are necessary for '"+currentFactor+"' factor\n")
136 return 1
137 else:
138 for iValue, value_component in enumerate( factor_component['valueList'] ):
139 currentValue=str(value_component['valueName'])
140 #check if factor name contains forbidden characters
141 for specialCharacter in forbidenCharacters:
142 if currentValue.find(specialCharacter)!=-1:
143 outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in value name : '"+currentValue+"'\n")
144 return 4
145 currentSample=str(value_component['valueConditions']).split(",")
146 for iSample, sample_component in enumerate (currentSample):
147 if not sample_component in currentSampleList:
148 outputLog.write("[ERROR] sample "+sample_component+" was assigned several times for factor '"+currentFactor+"'\n")
149 return 2
150 currentSampleList.remove(sample_component)
151 globalDict[currentFactor][sample_component]=currentValue
152 if(len(currentSampleList)>0):
153 outputLog.write("[ERROR] for factor '"+currentFactor+"'' sample "+str(currentSampleList)+" are not assigned to any value\n")
154 return 2
155 outputLog.write("[INFO] "+str(len(globalDict))+" factors are detected\n")
156 #start writing the factor file
157 outputfile.write(firstLine+"\n")
158 for iSample, sample_component in enumerate(sampleList):
159 newLine=sample_component
160 for iFactor, factor_component in enumerate(factorNameList):
161 newLine=newLine+"\t"+globalDict[factor_component][sample_component]
162 outputfile.write(newLine+"\n")
163 outputfile.close()
164 outputLog.close()
165 return 0
166
167 def selectSubSetTable(file_path,headerLine_number,columnsToAdd,columnNamesToKeep,outputFileName,logFile):
168 outputLog = open(logFile, 'w')
169 outputLog.write("[INFO] header line number : "+ headerLine_number+" lines\n")
170 availableColumnsTuple=get_column_names_mergeNumber(file_path, headerLine_number)
171 #convert tuple list as a simple array
172 availableColumns=[]
173 for iTuple, tuple_content in enumerate (availableColumnsTuple):
174 availableColumns.append(str(tuple_content[0]))
175 if len(availableColumns)==0:
176 outputLog.write("[ERROR] No detected columns in input file\n")
177 return 1
178 selectedColumns=list(columnsToAdd)
179 for iVolcano, volcano_content in enumerate(columnNamesToKeep):
180 selectedColumns.append(availableColumns.index(volcano_content['pvalColumn']))
181 if volcano_content['fdrColumn'] in availableColumns:
182 selectedColumns.append(availableColumns.index(volcano_content['fdrColumn']))
183 else:
184 selectedColumns.append(0)
185 selectedColumns.append(availableColumns.index(volcano_content['fcColumn']))
186 if len(selectedColumns)!=(3*len(columnNamesToKeep)+len(columnsToAdd)):
187 outputLog.write("[ERROR] matching between input file colnames and requested column names failed\n")
188 return 1
189 outputLog.write("[INFO] columns kept : "+str(selectedColumns)+"\n")
190 #start writting formatted file
191 inputfile = open(file_path)
192 outputfile = open(outputFileName, 'w')
193 iLineCpt=-1
194 for iCurrentLine in inputfile:
195 iLineCpt=iLineCpt+1
196 if iLineCpt>=int(headerLine_number):
197 currentLineFields=np.array(iCurrentLine.strip().split("\t"))
198 newLine="\t".join(currentLineFields[selectedColumns])
199 outputfile.write(newLine+"\n")
200 if iLineCpt<int(headerLine_number):
201 outputLog.write("[ERROR] not enough lines in input files ("+(iLineCpt+1)+" lines)\n")
202 return 1
203 inputfile.close()
204 outputfile.close()
205 outputLog.close()
206 return 0