Mercurial > repos > vandelj > giant_factor_generator
comparison src/General_functions.py @ 0:4764dc6a1019 draft
"planemo upload for repository https://github.com/juliechevalier/GIANT/tree/master commit cb276a594444c8f32e9819fefde3a21f121d35df"
author | vandelj |
---|---|
date | Fri, 26 Jun 2020 09:51:15 -0400 |
parents | |
children | 7a520f7169e1 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4764dc6a1019 |
---|---|
1 import re | |
2 import numpy as np | |
3 | |
4 def get_column_names( file_path, toNotConsider=-1, each=1): | |
5 options=[] | |
6 inputfile = open(file_path) | |
7 firstLine = next(inputfile).strip().split("\t") | |
8 cpt=0 | |
9 for i, field_component in enumerate( firstLine ): | |
10 if i!=toNotConsider:#to squeeze the first column | |
11 if cpt==0: | |
12 options.append( ( field_component, field_component, False ) ) | |
13 cpt+=1 | |
14 if cpt==each: | |
15 cpt=0 | |
16 inputfile.close() | |
17 return options | |
18 | |
19 def get_column_names_filteredList( file_path, toNotConsider=[], each=1): | |
20 options=[] | |
21 inputfile = open(file_path) | |
22 firstLine = next(inputfile).strip().split("\t") | |
23 cpt=0 | |
24 for i, field_component in enumerate( firstLine ): | |
25 if i not in toNotConsider:#to squeeze the first columns | |
26 if cpt==0: | |
27 options.append( ( field_component, field_component, False ) ) | |
28 cpt+=1 | |
29 if cpt==each: | |
30 cpt=0 | |
31 inputfile.close() | |
32 return options | |
33 | |
34 def get_column_names_mergeNumber(file_path, numberToMerge=1, toNotConsider=[]): | |
35 options=[] | |
36 inputfile = open(file_path) | |
37 if int(numberToMerge)>0: | |
38 iHeader=0 | |
39 for iCurrentLine in inputfile: | |
40 iHeader=iHeader+1 | |
41 if iHeader>int(numberToMerge): | |
42 break | |
43 currentLine=iCurrentLine.strip().split("\t") | |
44 iOption=-1 | |
45 for i, field_component in enumerate( currentLine ): | |
46 if i not in toNotConsider:#to squeeze specified columns | |
47 iOption=iOption+1 | |
48 if iHeader==1: | |
49 options.append( ( str(field_component), str(field_component), False ) ) | |
50 else: | |
51 options[iOption]=(options[iOption][0]+"_"+str(field_component),options[iOption][1]+"_"+str(field_component),False) | |
52 else: | |
53 currentLine = next(inputfile).strip().split("\t") | |
54 for i, field_component in enumerate( currentLine ): | |
55 if i not in toNotConsider:#to squeeze specified columns | |
56 options.append( ( "Column_"+str(i), "Column_"+str(i), False ) ) | |
57 inputfile.close() | |
58 return options | |
59 | |
60 def get_row_names( file_path, factorName ): | |
61 inputfile = open(file_path) | |
62 firstLine = next(inputfile).strip().split("\t") | |
63 iColumn=-1 | |
64 for i, field_component in enumerate( firstLine ): | |
65 if field_component==factorName:#to test | |
66 iColumn=i | |
67 options=[] | |
68 if iColumn!=-1: | |
69 for nextLine in inputfile: | |
70 nextLine=nextLine.strip().split("\t") | |
71 if len(nextLine)>1: | |
72 if (nextLine[iColumn], nextLine[iColumn], False) not in options: | |
73 options.append( (nextLine[iColumn], nextLine[iColumn], False) ) | |
74 inputfile.close() | |
75 return options | |
76 | |
77 def get_condition_file_names( file_list, toNotConsider=-1, each=1): | |
78 options=[] | |
79 if not isinstance(file_list,list):#if input file is a tabular file, act as get_column_names | |
80 inputfile = open(file_list.file_name) | |
81 firstLine = next(inputfile).strip().split("\t") | |
82 cpt=0 | |
83 for i, field_component in enumerate( firstLine ): | |
84 if i!=toNotConsider:#to squeeze the first column | |
85 if cpt==0: | |
86 options.append( ( field_component, field_component, False ) ) | |
87 cpt+=1 | |
88 if cpt==each: | |
89 cpt=0 | |
90 inputfile.close() | |
91 else:#if input file is a .cel file list or a collection | |
92 if not hasattr(file_list[0],'collection'):#if it is not a collection, get name easily | |
93 for i, field_component in enumerate( file_list ): | |
94 options.append( ( field_component.name, field_component.name, False ) ) | |
95 else:#if the file is a collection, have to get deeper in the corresponding HistoryDatasetCollectionAssociation object | |
96 for i, field_component in enumerate( file_list[0].collection.elements ): | |
97 options.append( ( field_component.element_identifier, field_component.element_identifier, False ) ) | |
98 return options | |
99 | |
100 def generateFactorFile( file_list, factor_list, outputFileName, logFile): | |
101 forbidenCharacters={"*",":",",","|"} | |
102 outputfile = open(outputFileName, 'w') | |
103 outputLog = open(logFile, 'w') | |
104 sampleList=[] | |
105 if not isinstance(file_list,list): | |
106 conditionNames=get_condition_file_names(file_list,0) #unique expression file, remove the first column (index=0) | |
107 else : | |
108 conditionNames=get_condition_file_names(file_list) #.CEL files | |
109 for iSample, sample_component in enumerate (conditionNames): | |
110 sampleList.append(str(sample_component[1])) | |
111 outputLog.write("[INFO] "+str(len(sampleList))+" sample are detected as input\n") | |
112 globalDict=dict() | |
113 factorNameList=[] | |
114 firstLine="Conditions" | |
115 if len(factor_list)==0:#check if there is at least one factor available | |
116 outputLog.write("[ERROR] no factor was defined !\n") | |
117 return 1 | |
118 else: | |
119 for iFactor, factor_component in enumerate( factor_list ): | |
120 currentSampleList=list(sampleList) | |
121 currentFactor=str(factor_component['factorName']) | |
122 #check if factor name contains forbidden characters | |
123 for specialCharacter in forbidenCharacters: | |
124 if currentFactor.find(specialCharacter)!=-1: | |
125 outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in factor name : '"+currentFactor+"'\n") | |
126 return 4 | |
127 #check if factor allready named like that | |
128 if not globalDict.get(currentFactor) is None: | |
129 outputLog.write("[ERROR] '"+currentFactor+"' is used several times as factor name\n") | |
130 return 3 | |
131 globalDict[currentFactor]=dict() | |
132 firstLine=firstLine+"\t"+currentFactor | |
133 factorNameList.append(currentFactor) | |
134 if len(factor_component['valueList'])<=1:#check if there is at least two value available | |
135 outputLog.write("[ERROR] at least two different values are necessary for '"+currentFactor+"' factor\n") | |
136 return 1 | |
137 else: | |
138 for iValue, value_component in enumerate( factor_component['valueList'] ): | |
139 currentValue=str(value_component['valueName']) | |
140 #check if factor name contains forbidden characters | |
141 for specialCharacter in forbidenCharacters: | |
142 if currentValue.find(specialCharacter)!=-1: | |
143 outputLog.write("[ERROR] '"+specialCharacter+"' character is forbidden in value name : '"+currentValue+"'\n") | |
144 return 4 | |
145 currentSample=str(value_component['valueConditions']).split(",") | |
146 for iSample, sample_component in enumerate (currentSample): | |
147 if not sample_component in currentSampleList: | |
148 outputLog.write("[ERROR] sample "+sample_component+" was assigned several times for factor '"+currentFactor+"'\n") | |
149 return 2 | |
150 currentSampleList.remove(sample_component) | |
151 globalDict[currentFactor][sample_component]=currentValue | |
152 if(len(currentSampleList)>0): | |
153 outputLog.write("[ERROR] for factor '"+currentFactor+"'' sample "+str(currentSampleList)+" are not assigned to any value\n") | |
154 return 2 | |
155 outputLog.write("[INFO] "+str(len(globalDict))+" factors are detected\n") | |
156 #start writing the factor file | |
157 outputfile.write(firstLine+"\n") | |
158 for iSample, sample_component in enumerate(sampleList): | |
159 newLine=sample_component | |
160 for iFactor, factor_component in enumerate(factorNameList): | |
161 newLine=newLine+"\t"+globalDict[factor_component][sample_component] | |
162 outputfile.write(newLine+"\n") | |
163 outputfile.close() | |
164 outputLog.close() | |
165 return 0 | |
166 | |
167 def selectSubSetTable(file_path,headerLine_number,columnsToAdd,columnNamesToKeep,outputFileName,logFile): | |
168 outputLog = open(logFile, 'w') | |
169 outputLog.write("[INFO] header line number : "+ headerLine_number+" lines\n") | |
170 availableColumnsTuple=get_column_names_mergeNumber(file_path, headerLine_number) | |
171 #convert tuple list as a simple array | |
172 availableColumns=[] | |
173 for iTuple, tuple_content in enumerate (availableColumnsTuple): | |
174 availableColumns.append(str(tuple_content[0])) | |
175 if len(availableColumns)==0: | |
176 outputLog.write("[ERROR] No detected columns in input file\n") | |
177 return 1 | |
178 selectedColumns=list(columnsToAdd) | |
179 for iVolcano, volcano_content in enumerate(columnNamesToKeep): | |
180 selectedColumns.append(availableColumns.index(volcano_content['pvalColumn'])) | |
181 if volcano_content['fdrColumn'] in availableColumns: | |
182 selectedColumns.append(availableColumns.index(volcano_content['fdrColumn'])) | |
183 else: | |
184 selectedColumns.append(0) | |
185 selectedColumns.append(availableColumns.index(volcano_content['fcColumn'])) | |
186 if len(selectedColumns)!=(3*len(columnNamesToKeep)+len(columnsToAdd)): | |
187 outputLog.write("[ERROR] matching between input file colnames and requested column names failed\n") | |
188 return 1 | |
189 outputLog.write("[INFO] columns kept : "+str(selectedColumns)+"\n") | |
190 #start writting formatted file | |
191 inputfile = open(file_path) | |
192 outputfile = open(outputFileName, 'w') | |
193 iLineCpt=-1 | |
194 for iCurrentLine in inputfile: | |
195 iLineCpt=iLineCpt+1 | |
196 if iLineCpt>=int(headerLine_number): | |
197 currentLineFields=np.array(iCurrentLine.strip().split("\t")) | |
198 newLine="\t".join(currentLineFields[selectedColumns]) | |
199 outputfile.write(newLine+"\n") | |
200 if iLineCpt<int(headerLine_number): | |
201 outputLog.write("[ERROR] not enough lines in input files ("+(iLineCpt+1)+" lines)\n") | |
202 return 1 | |
203 inputfile.close() | |
204 outputfile.close() | |
205 outputLog.close() | |
206 return 0 |