Mercurial > repos > proteore > proteore_filter_keywords_values
comparison filter_kw_val.py @ 0:a55e8b137c6b draft
planemo upload commit 688c456ca57914a63c20eba942ec5fe81e896099-dirty
author | proteore |
---|---|
date | Wed, 19 Sep 2018 05:01:15 -0400 |
parents | |
children | 52a7afd01c6d |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a55e8b137c6b |
---|---|
1 import argparse, re, csv | |
2 | |
3 def options(): | |
4 """ | |
5 Parse options: | |
6 -i, --input Input filename and boolean value if the file contains header ["filename,true/false"] | |
7 --kw Keyword to be filtered, the column number where this filter applies, | |
8 boolean value if the keyword should be filtered in exact ["keyword,ncol,true/false"]. | |
9 This option can be repeated: --kw "kw1,c1,true" --kw "kw2,c1,false" --kw "kw3,c2,true" | |
10 --kwfile A file that contains keywords to be filter, the column where this filter applies and | |
11 boolean value if the keyword should be filtered in exact ["filename,ncol,true/false"] | |
12 --value The value to be filtered, the column number where this filter applies and the | |
13 operation symbol ["value,ncol,=/>/>=/</<=/!="] | |
14 --values_range range of values to be keep, example : --values_range 5 20 c1 true | |
15 --operator The operator used to filter with several keywords/values : AND or OR | |
16 --o --output The output filename | |
17 --filtered_file The file contains removed lines | |
18 -s --sort_col Used column to sort the file, ",true" for reverse sorting, ",false" otherwise example : c1,false | |
19 """ | |
20 parser = argparse.ArgumentParser() | |
21 parser.add_argument("-i", "--input", help="Input file", required=True) | |
22 parser.add_argument("--kw", nargs="+", action="append", help="") | |
23 parser.add_argument("--kw_file", nargs="+", action="append", help="") | |
24 parser.add_argument("--value", nargs="+", action="append", help="") | |
25 parser.add_argument("--values_range", nargs="+", action="append", help="") | |
26 parser.add_argument("--operator", default="OR", type=str, choices=['AND','OR'],help='') | |
27 parser.add_argument("-o", "--output", default="output.txt") | |
28 parser.add_argument("--filtered_file", default="filtered_output.txt") | |
29 parser.add_argument("-s","--sort_col", help="") | |
30 | |
31 args = parser.parse_args() | |
32 filters(args) | |
33 | |
34 def str_to_bool(v): | |
35 if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
36 return True | |
37 elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
38 return False | |
39 else: | |
40 raise argparse.ArgumentTypeError('Boolean value expected.') | |
41 | |
42 #Check if a variable is a float or an integer | |
43 def is_number(number_format, n): | |
44 float_format = re.compile(r"^[-]?[0-9][0-9]*.?[0-9]+$") | |
45 int_format = re.compile(r"^[-]?[0-9][0-9]*$") | |
46 test = "" | |
47 if number_format == "int": | |
48 test = re.match(int_format, n) | |
49 elif number_format == "float": | |
50 test = re.match(float_format, n) | |
51 if test: | |
52 return True | |
53 | |
54 #Filter the document | |
55 def filters(args): | |
56 filename = args.input.split(",")[0] | |
57 header = str_to_bool(args.input.split(",")[1]) | |
58 csv_file = read_file(filename) | |
59 results_dict = {} | |
60 | |
61 if args.kw: | |
62 keywords = args.kw | |
63 for k in keywords: | |
64 results_dict=filter_keyword(csv_file, header, results_dict, k[0], k[1], k[2]) | |
65 | |
66 if args.kw_file: | |
67 key_files = args.kw_file | |
68 for kf in key_files: | |
69 keywords = read_option(kf[0]) | |
70 results_dict=filter_keyword(csv_file, header, results_dict, keywords, kf[1], kf[2]) | |
71 | |
72 if args.value: | |
73 for v in args.value: | |
74 if is_number("float", v[0]): | |
75 results_dict = filter_value(csv_file, header, results_dict, v[0], v[1], v[2]) | |
76 else: | |
77 raise ValueError("Please enter a number in filter by value") | |
78 | |
79 if args.values_range: | |
80 for vr in args.values_range: | |
81 if (is_number("float", vr[0]) or is_number("int", vr[0])) and (is_number("float",vr[1]) or is_number("int",vr[1])): | |
82 results_dict = filter_values_range(csv_file, header, results_dict, vr[0], vr[1], vr[2], vr[3]) | |
83 | |
84 remaining_lines=[] | |
85 filtered_lines=[] | |
86 | |
87 if header is True : | |
88 remaining_lines.append(csv_file[0]) | |
89 filtered_lines.append(csv_file[0]) | |
90 | |
91 for id_line,line in enumerate(csv_file) : | |
92 if id_line in results_dict : #skip header and empty lines | |
93 if args.operator == 'OR' : | |
94 if any(results_dict[id_line]) : | |
95 filtered_lines.append(line) | |
96 else : | |
97 remaining_lines.append(line) | |
98 | |
99 elif args.operator == "AND" : | |
100 if all(results_dict[id_line]) : | |
101 filtered_lines.append(line) | |
102 else : | |
103 remaining_lines.append(line) | |
104 | |
105 #sort of results by column | |
106 if args.sort_col : | |
107 sort_col=args.sort_col.split(",")[0] | |
108 sort_col=column_from_txt(sort_col) | |
109 reverse=str_to_bool(args.sort_col.split(",")[1]) | |
110 remaining_lines= sort_by_column(remaining_lines,sort_col,reverse,header) | |
111 filtered_lines = sort_by_column(filtered_lines,sort_col,reverse,header) | |
112 | |
113 # Write results to output | |
114 with open(args.output,"w") as output : | |
115 writer = csv.writer(output,delimiter="\t") | |
116 writer.writerows(remaining_lines) | |
117 | |
118 # Write filtered lines to filtered_output | |
119 with open(args.filtered_file,"w") as filtered_output : | |
120 writer = csv.writer(filtered_output,delimiter="\t") | |
121 writer.writerows(filtered_lines) | |
122 | |
123 #function to sort the csv_file by value in a specific column | |
124 def sort_by_column(tab,sort_col,reverse,header): | |
125 | |
126 if len(tab) > 1 : #if there's more than just a header or 1 row | |
127 if header is True : | |
128 head=tab[0] | |
129 tab=tab[1:] | |
130 | |
131 if is_number("int",tab[0][sort_col]) : | |
132 tab = sorted(tab, key=lambda row: int(row[sort_col]), reverse=reverse) | |
133 elif is_number("float",tab[0][sort_col]) : | |
134 tab = sorted(tab, key=lambda row: float(row[sort_col]), reverse=reverse) | |
135 else : | |
136 tab = sorted(tab, key=lambda row: row[sort_col], reverse=reverse) | |
137 | |
138 if header is True : tab = [head]+tab | |
139 | |
140 return tab | |
141 | |
142 #Read the keywords file to extract the list of keywords | |
143 def read_option(filename): | |
144 with open(filename, "r") as f: | |
145 filter_list=f.read().splitlines() | |
146 filter_list=[key for key in filter_list if len(key.replace(' ',''))!=0] | |
147 filters=";".join(filter_list) | |
148 | |
149 return filters | |
150 | |
151 # Read input file | |
152 def read_file(filename): | |
153 with open(filename,"r") as f : | |
154 reader=csv.reader(f,delimiter="\t") | |
155 tab=list(reader) | |
156 | |
157 # Remove empty lines (contain only space or new line or "") | |
158 #[tab.remove(blank) for blank in tab if blank.isspace() or blank == ""] | |
159 tab=[line for line in tab if len("".join(line).replace(" ","")) !=0 ] | |
160 | |
161 return tab | |
162 | |
163 #seek for keywords in rows of csvfile, return a dictionary of boolean (true if keyword found, false otherwise) | |
164 def filter_keyword(csv_file, header, results_dict, keywords, ncol, match): | |
165 match=str_to_bool(match) | |
166 ncol=column_from_txt(ncol) | |
167 | |
168 keywords = keywords.upper().split(";") # Split list of filter keyword | |
169 [keywords.remove(blank) for blank in keywords if blank.isspace() or blank == ""] # Remove blank keywords | |
170 keywords = [k.strip() for k in keywords] # Remove space from 2 heads of keywords | |
171 | |
172 for id_line,line in enumerate(csv_file): | |
173 if header is True and id_line == 0 : continue | |
174 #line = line.replace("\n", "") | |
175 keyword_inline = line[ncol].replace('"', "").split(";") | |
176 #line = line + "\n" | |
177 | |
178 #Perfect match or not | |
179 if match is True : | |
180 found_in_line = any(pid.upper() in keywords for pid in keyword_inline) | |
181 else: | |
182 found_in_line = any(ft in pid.upper() for pid in keyword_inline for ft in keywords) | |
183 | |
184 #if the keyword is found in line | |
185 if id_line in results_dict : results_dict[id_line].append(found_in_line) | |
186 else : results_dict[id_line]=[found_in_line] | |
187 | |
188 return results_dict | |
189 | |
190 #filter ba determined value in rows of csvfile, return a dictionary of boolean (true if value filtered, false otherwise) | |
191 def filter_value(csv_file, header, results_dict, filter_value, ncol, opt): | |
192 | |
193 filter_value = float(filter_value) | |
194 ncol=column_from_txt(ncol) | |
195 | |
196 for id_line,line in enumerate(csv_file): | |
197 if header is True and id_line == 0 : continue | |
198 value = line[ncol].replace('"', "").strip() | |
199 if value.replace(".", "", 1).isdigit(): | |
200 to_filter=value_compare(value,filter_value,opt) | |
201 | |
202 #adding the result to the dictionary | |
203 if id_line in results_dict : results_dict[id_line].append(to_filter) | |
204 else : results_dict[id_line]=[to_filter] | |
205 | |
206 return results_dict | |
207 | |
208 #filter ba determined value in rows of csvfile, return a dictionary of boolean (true if value filtered, false otherwise) | |
209 def filter_values_range(csv_file, header, results_dict, bottom_value, top_value, ncol, inclusive): | |
210 inclusive=str_to_bool(inclusive) | |
211 bottom_value = float(bottom_value) | |
212 top_value=float(top_value) | |
213 ncol=column_from_txt(ncol) | |
214 | |
215 for id_line, line in enumerate(csv_file): | |
216 if header is True and id_line == 0 : continue | |
217 value = line[ncol].replace('"', "").strip() | |
218 if value.replace(".", "", 1).isdigit(): | |
219 value=float(value) | |
220 if inclusive is True: | |
221 in_range = not (bottom_value <= value <= top_value) | |
222 else : | |
223 in_range = not (bottom_value < value < top_value) | |
224 | |
225 #adding the result to the dictionary | |
226 if id_line in results_dict : results_dict[id_line].append(in_range) | |
227 else : results_dict[id_line]=[in_range] | |
228 | |
229 return results_dict | |
230 | |
231 def column_from_txt(ncol): | |
232 if is_number("int", ncol.replace("c", "")): | |
233 ncol = int(ncol.replace("c", "")) - 1 | |
234 else: | |
235 raise ValueError("Please specify the column where " | |
236 "you would like to apply the filter " | |
237 "with valid format") | |
238 return ncol | |
239 | |
240 #return True if value is in the determined values, false otherwise | |
241 def value_compare(value,filter_value,opt): | |
242 test_value=False | |
243 | |
244 if opt == "<": | |
245 if float(value) < filter_value: | |
246 test_value = True | |
247 elif opt == "<=": | |
248 if float(value) <= filter_value: | |
249 test_value = True | |
250 elif opt == ">": | |
251 if float(value) > filter_value: | |
252 test_value = True | |
253 elif opt == ">=": | |
254 if float(value) >= filter_value: | |
255 test_value = True | |
256 elif opt == "=": | |
257 if float(value) == filter_value: | |
258 test_value = True | |
259 elif opt == "!=": | |
260 if float(value) != filter_value: | |
261 test_value = True | |
262 | |
263 return test_value | |
264 | |
265 if __name__ == "__main__": | |
266 options() |