Mercurial > repos > proteore > proteore_get_unique_peptide_srm_method
comparison get_unique_srm.py @ 0:a2b06836de90 draft
planemo upload commit f9de6f4e3302c41e64c39d639bee780e5eafd84d-dirty
author | proteore |
---|---|
date | Fri, 12 Jul 2019 07:49:45 -0400 |
parents | |
children | b526dba9dc40 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a2b06836de90 |
---|---|
1 import argparse, csv, re | |
2 | |
3 def get_args(): | |
4 | |
5 parser = argparse.ArgumentParser() | |
6 parser.add_argument("--input_type", help="type of input (list of id or filename)", required=True) | |
7 parser.add_argument("-i", "--input", help="list of IDs (text or filename)", required=True) | |
8 parser.add_argument("--header", help="true/false if your file contains a header") | |
9 parser.add_argument("-c", "--column_number", help="list of IDs (text or filename)") | |
10 parser.add_argument("-f", "--features", help="Protein features to return from SRM Atlas", required=True) | |
11 parser.add_argument("-d", "--ref_file", help="path to reference file", required=True) | |
12 parser.add_argument("-o", "--output", help="output filename", required=True) | |
13 args = parser.parse_args() | |
14 return args | |
15 | |
16 #return the column number in int format | |
17 def nb_col_to_int(nb_col): | |
18 try : | |
19 nb_col = int(nb_col.replace("c", "")) - 1 | |
20 return nb_col | |
21 except : | |
22 sys.exit("Please specify the column where you would like to apply the filter with valid format") | |
23 | |
24 #replace all blank cells to NA | |
25 def blank_to_NA(csv_file) : | |
26 tmp=[] | |
27 for line in csv_file : | |
28 line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in line] | |
29 tmp.append(line) | |
30 | |
31 return tmp | |
32 | |
33 #convert string to boolean | |
34 def str2bool(v): | |
35 if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
36 return True | |
37 elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
38 return False | |
39 else: | |
40 raise argparse.ArgumentTypeError('Boolean value expected.') | |
41 | |
42 #return list of (unique) ids from string | |
43 def get_input_ids_from_string(input) : | |
44 | |
45 ids_list = list(set(re.split(r'\s+',input.replace("_SNP","").replace("d_","").replace("\r","").replace("\n"," ").replace("\t"," ")))) | |
46 if "" in ids_list : ids_list.remove("") | |
47 | |
48 return ids_list | |
49 | |
50 #return input_file and list of unique ids from input file path | |
51 def get_input_ids_from_file(input,nb_col,header) : | |
52 with open(input, "r") as csv_file : | |
53 input_file= list(csv.reader(csv_file, delimiter='\t')) | |
54 | |
55 input_file, ids_list = one_id_one_line(input_file,nb_col,header) | |
56 if "" in ids_list : ids_list.remove("") | |
57 | |
58 return input_file, ids_list | |
59 | |
60 #function to check if an id is an uniprot accession number : return True or False- | |
61 def check_uniprot (id): | |
62 uniprot_pattern = re.compile("[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}") | |
63 if uniprot_pattern.match(id) : | |
64 return True | |
65 else : | |
66 return False | |
67 | |
68 #return input file by adding lines when there are more than one id per line | |
69 def one_id_one_line(input_file,nb_col,header) : | |
70 | |
71 if header : | |
72 new_file = [input_file[0]] | |
73 input_file = input_file[1:] | |
74 else : | |
75 new_file=[] | |
76 ids_list=[] | |
77 | |
78 for line in input_file : | |
79 if line != [] and set(line) != {''}: | |
80 line[nb_col] = re.sub(r"\s+","",line[nb_col]) | |
81 if line[nb_col] == "" : line[nb_col]='NA' | |
82 if ";" in line[nb_col] : | |
83 ids = line[nb_col].split(";") | |
84 for id in ids : | |
85 new_file.append(line[:nb_col]+[id]+line[nb_col+1:]) | |
86 ids_list.append(id) | |
87 else : | |
88 new_file.append(line) | |
89 ids_list.append(line[nb_col]) | |
90 | |
91 ids_list=[e.replace("_SNP","").replace("d_","") for e in ids_list] | |
92 ids_list= list(set(ids_list)) | |
93 | |
94 return new_file, ids_list | |
95 | |
96 def create_srm_atlas_dictionary(features,srm_atlas_csv): | |
97 | |
98 srm_atlas={} | |
99 features_index = {"PeptideSeq" : 0, "SSRT" : 1 , "Length" : 2 , "type": 3 , "PA_AccNum" : 4, "MW" : 5 } | |
100 features_to_get = [features_index[feature] for feature in features] | |
101 for line in srm_atlas_csv[1:]: | |
102 id = line[9].replace("_SNP","").replace("d_","") | |
103 if id not in srm_atlas: | |
104 srm_atlas[id]=[[line[i] for i in features_to_get]] | |
105 else: | |
106 srm_atlas[id].append([line[i] for i in features_to_get]) | |
107 return srm_atlas | |
108 | |
109 def retrieve_srm_features(srm_atlas,ids): | |
110 | |
111 result_dict = {} | |
112 for id in ids: | |
113 if id in srm_atlas: | |
114 res = srm_atlas[id] | |
115 else : | |
116 res="" | |
117 result_dict[id]=res | |
118 return result_dict | |
119 | |
120 def create_header(input_file,ncol,features): | |
121 col_names = list(range(1,len(input_file[0])+1)) | |
122 col_names = ["col"+str(e) for e in col_names] | |
123 col_names[ncol]="Uniprot-AC" | |
124 col_names = col_names+features | |
125 return(col_names) | |
126 | |
127 def main(): | |
128 | |
129 #Get args from command line | |
130 args = get_args() | |
131 features=args.features.split(",") | |
132 header=False | |
133 if args.input_type=="file" : | |
134 column_number = nb_col_to_int(args.column_number) | |
135 header = str2bool(args.header) | |
136 | |
137 #Get reference file (Human SRM Atlas) | |
138 with open(args.ref_file, "r") as csv_file : | |
139 srm_atlas_csv = csv.reader(csv_file, delimiter='\t') | |
140 srm_atlas_csv = [line for line in srm_atlas_csv] | |
141 | |
142 #Create srm Atlas dictionary | |
143 srm_atlas = create_srm_atlas_dictionary(features,srm_atlas_csv) | |
144 | |
145 #Get file and/or ids from input | |
146 if args.input_type == "list" : | |
147 ids = get_input_ids_from_string(args.input) | |
148 elif args.input_type == "file" : | |
149 input_file, ids = get_input_ids_from_file(args.input,column_number,header) | |
150 | |
151 #Check Uniprot-AC | |
152 if not any([check_uniprot(id) for id in ids]): | |
153 print ("No Uniprot-AC found, please check your input") | |
154 exit() | |
155 | |
156 #retrieve features | |
157 result_dict = retrieve_srm_features(srm_atlas,ids) | |
158 | |
159 #write output | |
160 with open(args.output,"w") as output : | |
161 writer = csv.writer(output,delimiter="\t") | |
162 | |
163 #write header | |
164 if header : | |
165 writer.writerow(input_file[0]+features) | |
166 input_file = input_file[1:] | |
167 elif args.input_type=="file": | |
168 col_names = [create_header(input_file,column_number,features)] | |
169 writer.writerow(col_names) | |
170 else : | |
171 writer.writerow(["Uniprot-AC"]+features) | |
172 | |
173 #write lines | |
174 previous_line="" | |
175 if args.input_type=="file" : | |
176 for line in input_file : | |
177 for res in result_dict[line[column_number]]: | |
178 output_line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in line+res] | |
179 if previous_line != output_line : | |
180 writer.writerow(output_line) | |
181 previous_line=output_line | |
182 elif args.input_type=="list" : | |
183 for id in ids : | |
184 for res in result_dict[id]: | |
185 line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in [id]+res] | |
186 if previous_line != line : | |
187 writer.writerow(line) | |
188 previous_line=line | |
189 | |
190 | |
191 if __name__ == "__main__": | |
192 main() |