Mercurial > repos > proteore > proteore_get_unique_peptide_srm_method
comparison get_unique_srm.py @ 2:b526dba9dc40 draft default tip
"planemo upload commit 7592fb20f8029142757d5e5fdb8f04ff6d5ed5cd-dirty"
author | proteore |
---|---|
date | Mon, 10 May 2021 13:56:03 +0000 |
parents | a2b06836de90 |
children |
comparison
equal
deleted
inserted
replaced
1:b72ece649392 | 2:b526dba9dc40 |
---|---|
1 import argparse, csv, re | 1 import argparse |
2 import csv | |
3 import re | |
4 | |
2 | 5 |
3 def get_args(): | 6 def get_args(): |
4 | 7 |
5 parser = argparse.ArgumentParser() | 8 parser = argparse.ArgumentParser() |
6 parser.add_argument("--input_type", help="type of input (list of id or filename)", required=True) | 9 parser.add_argument("--input_type", help="type of input (list of id or filename)", required=True) # noqa 501 |
7 parser.add_argument("-i", "--input", help="list of IDs (text or filename)", required=True) | 10 parser.add_argument("-i", "--input", help="list of IDs (text or filename)", required=True) # noqa 501 |
8 parser.add_argument("--header", help="true/false if your file contains a header") | 11 parser.add_argument("--header", help="true/false if your file contains a header") # noqa 501 |
9 parser.add_argument("-c", "--column_number", help="list of IDs (text or filename)") | 12 parser.add_argument("-c", "--column_number", help="list of IDs (text or filename)") # noqa 501 |
10 parser.add_argument("-f", "--features", help="Protein features to return from SRM Atlas", required=True) | 13 parser.add_argument("-f", "--features", help="Protein features to return from SRM Atlas", required=True) # noqa 501 |
11 parser.add_argument("-d", "--ref_file", help="path to reference file", required=True) | 14 parser.add_argument("-d", "--ref_file", help="path to reference file", required=True) # noqa 501 |
12 parser.add_argument("-o", "--output", help="output filename", required=True) | 15 parser.add_argument("-o", "--output", help="output filename", required=True) # noqa 501 |
13 args = parser.parse_args() | 16 args = parser.parse_args() |
14 return args | 17 return args |
15 | 18 |
16 #return the column number in int format | 19 # return the column number in int format |
20 | |
21 | |
17 def nb_col_to_int(nb_col): | 22 def nb_col_to_int(nb_col): |
18 try : | 23 try: |
19 nb_col = int(nb_col.replace("c", "")) - 1 | 24 nb_col = int(nb_col.replace("c", "")) - 1 |
20 return nb_col | 25 return nb_col |
21 except : | 26 except: # noqa 722 |
22 sys.exit("Please specify the column where you would like to apply the filter with valid format") | 27 sys.exit("Please specify the column where you would like to apply the filter with valid format") # noqa 501, 821 |
23 | 28 |
24 #replace all blank cells to NA | 29 # replace all blank cells to NA |
25 def blank_to_NA(csv_file) : | 30 |
26 tmp=[] | 31 |
27 for line in csv_file : | 32 def blank_to_NA(csv_file): |
28 line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in line] | 33 tmp = [] |
34 for line in csv_file: | |
35 line = ["NA" if cell == "" or cell == " " or cell == "NaN" else cell for cell in line] # noqa 501 | |
29 tmp.append(line) | 36 tmp.append(line) |
30 | 37 |
31 return tmp | 38 return tmp |
32 | 39 |
33 #convert string to boolean | 40 # convert string to boolean |
41 | |
42 | |
34 def str2bool(v): | 43 def str2bool(v): |
35 if v.lower() in ('yes', 'true', 't', 'y', '1'): | 44 if v.lower() in ('yes', 'true', 't', 'y', '1'): |
36 return True | 45 return True |
37 elif v.lower() in ('no', 'false', 'f', 'n', '0'): | 46 elif v.lower() in ('no', 'false', 'f', 'n', '0'): |
38 return False | 47 return False |
39 else: | 48 else: |
40 raise argparse.ArgumentTypeError('Boolean value expected.') | 49 raise argparse.ArgumentTypeError('Boolean value expected.') |
41 | 50 |
42 #return list of (unique) ids from string | 51 # return list of (unique) ids from string |
43 def get_input_ids_from_string(input) : | 52 |
44 | 53 |
45 ids_list = list(set(re.split(r'\s+',input.replace("_SNP","").replace("d_","").replace("\r","").replace("\n"," ").replace("\t"," ")))) | 54 def get_input_ids_from_string(input): |
46 if "" in ids_list : ids_list.remove("") | 55 |
56 ids_list = list(set(re.split(r'\s+', input.replace("_SNP", "").replace("d_", "").replace("\r", "").replace("\n", " ").replace("\t", " ")))) # noqa 501 | |
57 if "" in ids_list: | |
58 ids_list.remove("") | |
47 | 59 |
48 return ids_list | 60 return ids_list |
49 | 61 |
50 #return input_file and list of unique ids from input file path | 62 # return input_file and list of unique ids from input file path |
51 def get_input_ids_from_file(input,nb_col,header) : | 63 |
52 with open(input, "r") as csv_file : | 64 |
53 input_file= list(csv.reader(csv_file, delimiter='\t')) | 65 def get_input_ids_from_file(input, nb_col, header): |
54 | 66 with open(input, "r") as csv_file: |
55 input_file, ids_list = one_id_one_line(input_file,nb_col,header) | 67 input_file = list(csv.reader(csv_file, delimiter='\t')) |
56 if "" in ids_list : ids_list.remove("") | 68 |
69 input_file, ids_list = one_id_one_line(input_file, nb_col, header) | |
70 if "" in ids_list: | |
71 ids_list.remove("") | |
57 | 72 |
58 return input_file, ids_list | 73 return input_file, ids_list |
59 | 74 |
60 #function to check if an id is an uniprot accession number : return True or False- | 75 # function to check if an id is an uniprot accession number: |
61 def check_uniprot (id): | 76 # return True or False |
62 uniprot_pattern = re.compile("[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}") | 77 |
63 if uniprot_pattern.match(id) : | 78 |
79 def check_uniprot(id): | |
80 uniprot_pattern = re.compile("[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}") # noqa 501 | |
81 if uniprot_pattern.match(id): | |
64 return True | 82 return True |
65 else : | 83 else: |
66 return False | 84 return False |
67 | 85 |
68 #return input file by adding lines when there are more than one id per line | 86 # return input file by adding lines when there are more than one id per line |
69 def one_id_one_line(input_file,nb_col,header) : | 87 |
70 | 88 |
71 if header : | 89 def one_id_one_line(input_file, nb_col, header): |
90 | |
91 if header: | |
72 new_file = [input_file[0]] | 92 new_file = [input_file[0]] |
73 input_file = input_file[1:] | 93 input_file = input_file[1:] |
74 else : | 94 else: |
75 new_file=[] | 95 new_file = [] |
76 ids_list=[] | 96 ids_list = [] |
77 | 97 |
78 for line in input_file : | 98 for line in input_file: |
79 if line != [] and set(line) != {''}: | 99 if line != [] and set(line) != {''}: |
80 line[nb_col] = re.sub(r"\s+","",line[nb_col]) | 100 line[nb_col] = re.sub(r"\s+", "", line[nb_col]) |
81 if line[nb_col] == "" : line[nb_col]='NA' | 101 if line[nb_col] == "": |
82 if ";" in line[nb_col] : | 102 line[nb_col] = 'NA' |
103 if ";" in line[nb_col]: | |
83 ids = line[nb_col].split(";") | 104 ids = line[nb_col].split(";") |
84 for id in ids : | 105 for id in ids: |
85 new_file.append(line[:nb_col]+[id]+line[nb_col+1:]) | 106 new_file.append(line[:nb_col]+[id]+line[nb_col+1:]) |
86 ids_list.append(id) | 107 ids_list.append(id) |
87 else : | 108 else: |
88 new_file.append(line) | 109 new_file.append(line) |
89 ids_list.append(line[nb_col]) | 110 ids_list.append(line[nb_col]) |
90 | 111 |
91 ids_list=[e.replace("_SNP","").replace("d_","") for e in ids_list] | 112 ids_list = [e.replace("_SNP", "").replace("d_", "") for e in ids_list] |
92 ids_list= list(set(ids_list)) | 113 ids_list = list(set(ids_list)) |
93 | 114 |
94 return new_file, ids_list | 115 return new_file, ids_list |
95 | 116 |
96 def create_srm_atlas_dictionary(features,srm_atlas_csv): | 117 |
97 | 118 def create_srm_atlas_dictionary(features, srm_atlas_csv): |
98 srm_atlas={} | 119 |
99 features_index = {"PeptideSeq" : 0, "SSRT" : 1 , "Length" : 2 , "type": 3 , "PA_AccNum" : 4, "MW" : 5 } | 120 srm_atlas = {} |
121 features_index = {"PeptideSeq": 0, "SSRT": 1, "Length": 2, "type":3, "PA_AccNum": 4, "MW": 5} # noqa 501 | |
100 features_to_get = [features_index[feature] for feature in features] | 122 features_to_get = [features_index[feature] for feature in features] |
101 for line in srm_atlas_csv[1:]: | 123 for line in srm_atlas_csv[1:]: |
102 id = line[9].replace("_SNP","").replace("d_","") | 124 id = line[9].replace("_SNP", "").replace("d_", "") |
103 if id not in srm_atlas: | 125 if id not in srm_atlas: |
104 srm_atlas[id]=[[line[i] for i in features_to_get]] | 126 srm_atlas[id] = [[line[i] for i in features_to_get]] |
105 else: | 127 else: |
106 srm_atlas[id].append([line[i] for i in features_to_get]) | 128 srm_atlas[id].append([line[i] for i in features_to_get]) |
107 return srm_atlas | 129 return srm_atlas |
108 | 130 |
109 def retrieve_srm_features(srm_atlas,ids): | 131 |
132 def retrieve_srm_features(srm_atlas, ids): | |
110 | 133 |
111 result_dict = {} | 134 result_dict = {} |
112 for id in ids: | 135 for id in ids: |
113 if id in srm_atlas: | 136 if id in srm_atlas: |
114 res = srm_atlas[id] | 137 res = srm_atlas[id] |
115 else : | 138 else: |
116 res="" | 139 res = "" |
117 result_dict[id]=res | 140 result_dict[id] = res |
118 return result_dict | 141 return result_dict |
119 | 142 |
120 def create_header(input_file,ncol,features): | 143 |
121 col_names = list(range(1,len(input_file[0])+1)) | 144 def create_header(input_file, ncol, features): |
145 col_names = list(range(1, len(input_file[0])+1)) | |
122 col_names = ["col"+str(e) for e in col_names] | 146 col_names = ["col"+str(e) for e in col_names] |
123 col_names[ncol]="Uniprot-AC" | 147 col_names[ncol] = "Uniprot-AC" |
124 col_names = col_names+features | 148 col_names = col_names+features |
125 return(col_names) | 149 return(col_names) |
126 | 150 |
151 | |
127 def main(): | 152 def main(): |
128 | 153 |
129 #Get args from command line | 154 # Get args from command line |
130 args = get_args() | 155 args = get_args() |
131 features=args.features.split(",") | 156 features = args.features.split(",") |
132 header=False | 157 header = False |
133 if args.input_type=="file" : | 158 if args.input_type == "file": |
134 column_number = nb_col_to_int(args.column_number) | 159 column_number = nb_col_to_int(args.column_number) |
135 header = str2bool(args.header) | 160 header = str2bool(args.header) |
136 | 161 |
137 #Get reference file (Human SRM Atlas) | 162 # Get reference file (Human SRM Atlas) |
138 with open(args.ref_file, "r") as csv_file : | 163 with open(args.ref_file, "r") as csv_file: |
139 srm_atlas_csv = csv.reader(csv_file, delimiter='\t') | 164 srm_atlas_csv = csv.reader(csv_file, delimiter='\t') |
140 srm_atlas_csv = [line for line in srm_atlas_csv] | 165 srm_atlas_csv = [line for line in srm_atlas_csv] |
141 | 166 |
142 #Create srm Atlas dictionary | 167 # Create srm Atlas dictionary |
143 srm_atlas = create_srm_atlas_dictionary(features,srm_atlas_csv) | 168 srm_atlas = create_srm_atlas_dictionary(features, srm_atlas_csv) |
144 | 169 |
145 #Get file and/or ids from input | 170 # Get file and/or ids from input |
146 if args.input_type == "list" : | 171 if args.input_type == "list": |
147 ids = get_input_ids_from_string(args.input) | 172 ids = get_input_ids_from_string(args.input) |
148 elif args.input_type == "file" : | 173 elif args.input_type == "file": |
149 input_file, ids = get_input_ids_from_file(args.input,column_number,header) | 174 input_file, ids = get_input_ids_from_file(args.input, |
150 | 175 column_number, header) |
151 #Check Uniprot-AC | 176 |
177 # Check Uniprot-AC | |
152 if not any([check_uniprot(id) for id in ids]): | 178 if not any([check_uniprot(id) for id in ids]): |
153 print ("No Uniprot-AC found, please check your input") | 179 print("No Uniprot-AC found, please check your input") |
154 exit() | 180 exit() |
155 | 181 |
156 #retrieve features | 182 # retrieve features |
157 result_dict = retrieve_srm_features(srm_atlas,ids) | 183 result_dict = retrieve_srm_features(srm_atlas, ids) |
158 | 184 |
159 #write output | 185 # write output |
160 with open(args.output,"w") as output : | 186 with open(args.output, "w") as output: |
161 writer = csv.writer(output,delimiter="\t") | 187 writer = csv.writer(output, delimiter="\t") |
162 | 188 |
163 #write header | 189 # write header |
164 if header : | 190 if header: |
165 writer.writerow(input_file[0]+features) | 191 writer.writerow(input_file[0]+features) |
166 input_file = input_file[1:] | 192 input_file = input_file[1:] |
167 elif args.input_type=="file": | 193 elif args.input_type == "file": |
168 col_names = [create_header(input_file,column_number,features)] | 194 col_names = [create_header(input_file, column_number, features)] |
169 writer.writerow(col_names) | 195 writer.writerow(col_names) |
170 else : | 196 else: |
171 writer.writerow(["Uniprot-AC"]+features) | 197 writer.writerow(["Uniprot-AC"]+features) |
172 | 198 |
173 #write lines | 199 # write lines |
174 previous_line="" | 200 previous_line = "" |
175 if args.input_type=="file" : | 201 if args.input_type == "file": |
176 for line in input_file : | 202 for line in input_file: |
177 for res in result_dict[line[column_number]]: | 203 for res in result_dict[line[column_number]]: |
178 output_line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in line+res] | 204 output_line = ["NA" if cell == "" or cell == " " or cell == "NaN" else cell for cell in line+res] # noqa 501 |
179 if previous_line != output_line : | 205 if previous_line != output_line: |
180 writer.writerow(output_line) | 206 writer.writerow(output_line) |
181 previous_line=output_line | 207 previous_line = output_line |
182 elif args.input_type=="list" : | 208 elif args.input_type == "list": |
183 for id in ids : | 209 for id in ids: |
184 for res in result_dict[id]: | 210 for res in result_dict[id]: |
185 line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in [id]+res] | 211 line = ["NA" if cell == "" or cell == " " or cell == "NaN" else cell for cell in [id]+res] # noqa 501 |
186 if previous_line != line : | 212 if previous_line != line: |
187 writer.writerow(line) | 213 writer.writerow(line) |
188 previous_line=line | 214 previous_line = line |
189 | 215 |
190 | 216 |
191 if __name__ == "__main__": | 217 if __name__ == "__main__": |
192 main() | 218 main() |