comparison get_unique_srm.py @ 2:b526dba9dc40 draft default tip

"planemo upload commit 7592fb20f8029142757d5e5fdb8f04ff6d5ed5cd-dirty"
author proteore
date Mon, 10 May 2021 13:56:03 +0000
parents a2b06836de90
children
comparison
equal deleted inserted replaced
1:b72ece649392 2:b526dba9dc40
1 import argparse, csv, re 1 import argparse
2 import csv
3 import re
4
2 5
3 def get_args(): 6 def get_args():
4 7
5 parser = argparse.ArgumentParser() 8 parser = argparse.ArgumentParser()
6 parser.add_argument("--input_type", help="type of input (list of id or filename)", required=True) 9 parser.add_argument("--input_type", help="type of input (list of id or filename)", required=True) # noqa 501
7 parser.add_argument("-i", "--input", help="list of IDs (text or filename)", required=True) 10 parser.add_argument("-i", "--input", help="list of IDs (text or filename)", required=True) # noqa 501
8 parser.add_argument("--header", help="true/false if your file contains a header") 11 parser.add_argument("--header", help="true/false if your file contains a header") # noqa 501
9 parser.add_argument("-c", "--column_number", help="list of IDs (text or filename)") 12 parser.add_argument("-c", "--column_number", help="list of IDs (text or filename)") # noqa 501
10 parser.add_argument("-f", "--features", help="Protein features to return from SRM Atlas", required=True) 13 parser.add_argument("-f", "--features", help="Protein features to return from SRM Atlas", required=True) # noqa 501
11 parser.add_argument("-d", "--ref_file", help="path to reference file", required=True) 14 parser.add_argument("-d", "--ref_file", help="path to reference file", required=True) # noqa 501
12 parser.add_argument("-o", "--output", help="output filename", required=True) 15 parser.add_argument("-o", "--output", help="output filename", required=True) # noqa 501
13 args = parser.parse_args() 16 args = parser.parse_args()
14 return args 17 return args
15 18
16 #return the column number in int format 19 # return the column number in int format
20
21
17 def nb_col_to_int(nb_col): 22 def nb_col_to_int(nb_col):
18 try : 23 try:
19 nb_col = int(nb_col.replace("c", "")) - 1 24 nb_col = int(nb_col.replace("c", "")) - 1
20 return nb_col 25 return nb_col
21 except : 26 except: # noqa 722
22 sys.exit("Please specify the column where you would like to apply the filter with valid format") 27 sys.exit("Please specify the column where you would like to apply the filter with valid format") # noqa 501, 821
23 28
24 #replace all blank cells to NA 29 # replace all blank cells to NA
25 def blank_to_NA(csv_file) : 30
26 tmp=[] 31
27 for line in csv_file : 32 def blank_to_NA(csv_file):
28 line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in line] 33 tmp = []
34 for line in csv_file:
35 line = ["NA" if cell == "" or cell == " " or cell == "NaN" else cell for cell in line] # noqa 501
29 tmp.append(line) 36 tmp.append(line)
30 37
31 return tmp 38 return tmp
32 39
33 #convert string to boolean 40 # convert string to boolean
41
42
34 def str2bool(v): 43 def str2bool(v):
35 if v.lower() in ('yes', 'true', 't', 'y', '1'): 44 if v.lower() in ('yes', 'true', 't', 'y', '1'):
36 return True 45 return True
37 elif v.lower() in ('no', 'false', 'f', 'n', '0'): 46 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
38 return False 47 return False
39 else: 48 else:
40 raise argparse.ArgumentTypeError('Boolean value expected.') 49 raise argparse.ArgumentTypeError('Boolean value expected.')
41 50
42 #return list of (unique) ids from string 51 # return list of (unique) ids from string
43 def get_input_ids_from_string(input) : 52
44 53
45 ids_list = list(set(re.split(r'\s+',input.replace("_SNP","").replace("d_","").replace("\r","").replace("\n"," ").replace("\t"," ")))) 54 def get_input_ids_from_string(input):
46 if "" in ids_list : ids_list.remove("") 55
56 ids_list = list(set(re.split(r'\s+', input.replace("_SNP", "").replace("d_", "").replace("\r", "").replace("\n", " ").replace("\t", " ")))) # noqa 501
57 if "" in ids_list:
58 ids_list.remove("")
47 59
48 return ids_list 60 return ids_list
49 61
50 #return input_file and list of unique ids from input file path 62 # return input_file and list of unique ids from input file path
51 def get_input_ids_from_file(input,nb_col,header) : 63
52 with open(input, "r") as csv_file : 64
53 input_file= list(csv.reader(csv_file, delimiter='\t')) 65 def get_input_ids_from_file(input, nb_col, header):
54 66 with open(input, "r") as csv_file:
55 input_file, ids_list = one_id_one_line(input_file,nb_col,header) 67 input_file = list(csv.reader(csv_file, delimiter='\t'))
56 if "" in ids_list : ids_list.remove("") 68
69 input_file, ids_list = one_id_one_line(input_file, nb_col, header)
70 if "" in ids_list:
71 ids_list.remove("")
57 72
58 return input_file, ids_list 73 return input_file, ids_list
59 74
60 #function to check if an id is an uniprot accession number : return True or False- 75 # function to check if an id is an uniprot accession number:
61 def check_uniprot (id): 76 # return True or False
62 uniprot_pattern = re.compile("[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}") 77
63 if uniprot_pattern.match(id) : 78
79 def check_uniprot(id):
80 uniprot_pattern = re.compile("[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}") # noqa 501
81 if uniprot_pattern.match(id):
64 return True 82 return True
65 else : 83 else:
66 return False 84 return False
67 85
68 #return input file by adding lines when there are more than one id per line 86 # return input file by adding lines when there are more than one id per line
69 def one_id_one_line(input_file,nb_col,header) : 87
70 88
71 if header : 89 def one_id_one_line(input_file, nb_col, header):
90
91 if header:
72 new_file = [input_file[0]] 92 new_file = [input_file[0]]
73 input_file = input_file[1:] 93 input_file = input_file[1:]
74 else : 94 else:
75 new_file=[] 95 new_file = []
76 ids_list=[] 96 ids_list = []
77 97
78 for line in input_file : 98 for line in input_file:
79 if line != [] and set(line) != {''}: 99 if line != [] and set(line) != {''}:
80 line[nb_col] = re.sub(r"\s+","",line[nb_col]) 100 line[nb_col] = re.sub(r"\s+", "", line[nb_col])
81 if line[nb_col] == "" : line[nb_col]='NA' 101 if line[nb_col] == "":
82 if ";" in line[nb_col] : 102 line[nb_col] = 'NA'
103 if ";" in line[nb_col]:
83 ids = line[nb_col].split(";") 104 ids = line[nb_col].split(";")
84 for id in ids : 105 for id in ids:
85 new_file.append(line[:nb_col]+[id]+line[nb_col+1:]) 106 new_file.append(line[:nb_col]+[id]+line[nb_col+1:])
86 ids_list.append(id) 107 ids_list.append(id)
87 else : 108 else:
88 new_file.append(line) 109 new_file.append(line)
89 ids_list.append(line[nb_col]) 110 ids_list.append(line[nb_col])
90 111
91 ids_list=[e.replace("_SNP","").replace("d_","") for e in ids_list] 112 ids_list = [e.replace("_SNP", "").replace("d_", "") for e in ids_list]
92 ids_list= list(set(ids_list)) 113 ids_list = list(set(ids_list))
93 114
94 return new_file, ids_list 115 return new_file, ids_list
95 116
96 def create_srm_atlas_dictionary(features,srm_atlas_csv): 117
97 118 def create_srm_atlas_dictionary(features, srm_atlas_csv):
98 srm_atlas={} 119
99 features_index = {"PeptideSeq" : 0, "SSRT" : 1 , "Length" : 2 , "type": 3 , "PA_AccNum" : 4, "MW" : 5 } 120 srm_atlas = {}
121 features_index = {"PeptideSeq": 0, "SSRT": 1, "Length": 2, "type":3, "PA_AccNum": 4, "MW": 5} # noqa 501
100 features_to_get = [features_index[feature] for feature in features] 122 features_to_get = [features_index[feature] for feature in features]
101 for line in srm_atlas_csv[1:]: 123 for line in srm_atlas_csv[1:]:
102 id = line[9].replace("_SNP","").replace("d_","") 124 id = line[9].replace("_SNP", "").replace("d_", "")
103 if id not in srm_atlas: 125 if id not in srm_atlas:
104 srm_atlas[id]=[[line[i] for i in features_to_get]] 126 srm_atlas[id] = [[line[i] for i in features_to_get]]
105 else: 127 else:
106 srm_atlas[id].append([line[i] for i in features_to_get]) 128 srm_atlas[id].append([line[i] for i in features_to_get])
107 return srm_atlas 129 return srm_atlas
108 130
109 def retrieve_srm_features(srm_atlas,ids): 131
132 def retrieve_srm_features(srm_atlas, ids):
110 133
111 result_dict = {} 134 result_dict = {}
112 for id in ids: 135 for id in ids:
113 if id in srm_atlas: 136 if id in srm_atlas:
114 res = srm_atlas[id] 137 res = srm_atlas[id]
115 else : 138 else:
116 res="" 139 res = ""
117 result_dict[id]=res 140 result_dict[id] = res
118 return result_dict 141 return result_dict
119 142
120 def create_header(input_file,ncol,features): 143
121 col_names = list(range(1,len(input_file[0])+1)) 144 def create_header(input_file, ncol, features):
145 col_names = list(range(1, len(input_file[0])+1))
122 col_names = ["col"+str(e) for e in col_names] 146 col_names = ["col"+str(e) for e in col_names]
123 col_names[ncol]="Uniprot-AC" 147 col_names[ncol] = "Uniprot-AC"
124 col_names = col_names+features 148 col_names = col_names+features
125 return(col_names) 149 return(col_names)
126 150
151
127 def main(): 152 def main():
128 153
129 #Get args from command line 154 # Get args from command line
130 args = get_args() 155 args = get_args()
131 features=args.features.split(",") 156 features = args.features.split(",")
132 header=False 157 header = False
133 if args.input_type=="file" : 158 if args.input_type == "file":
134 column_number = nb_col_to_int(args.column_number) 159 column_number = nb_col_to_int(args.column_number)
135 header = str2bool(args.header) 160 header = str2bool(args.header)
136 161
137 #Get reference file (Human SRM Atlas) 162 # Get reference file (Human SRM Atlas)
138 with open(args.ref_file, "r") as csv_file : 163 with open(args.ref_file, "r") as csv_file:
139 srm_atlas_csv = csv.reader(csv_file, delimiter='\t') 164 srm_atlas_csv = csv.reader(csv_file, delimiter='\t')
140 srm_atlas_csv = [line for line in srm_atlas_csv] 165 srm_atlas_csv = [line for line in srm_atlas_csv]
141 166
142 #Create srm Atlas dictionary 167 # Create srm Atlas dictionary
143 srm_atlas = create_srm_atlas_dictionary(features,srm_atlas_csv) 168 srm_atlas = create_srm_atlas_dictionary(features, srm_atlas_csv)
144 169
145 #Get file and/or ids from input 170 # Get file and/or ids from input
146 if args.input_type == "list" : 171 if args.input_type == "list":
147 ids = get_input_ids_from_string(args.input) 172 ids = get_input_ids_from_string(args.input)
148 elif args.input_type == "file" : 173 elif args.input_type == "file":
149 input_file, ids = get_input_ids_from_file(args.input,column_number,header) 174 input_file, ids = get_input_ids_from_file(args.input,
150 175 column_number, header)
151 #Check Uniprot-AC 176
177 # Check Uniprot-AC
152 if not any([check_uniprot(id) for id in ids]): 178 if not any([check_uniprot(id) for id in ids]):
153 print ("No Uniprot-AC found, please check your input") 179 print("No Uniprot-AC found, please check your input")
154 exit() 180 exit()
155 181
156 #retrieve features 182 # retrieve features
157 result_dict = retrieve_srm_features(srm_atlas,ids) 183 result_dict = retrieve_srm_features(srm_atlas, ids)
158 184
159 #write output 185 # write output
160 with open(args.output,"w") as output : 186 with open(args.output, "w") as output:
161 writer = csv.writer(output,delimiter="\t") 187 writer = csv.writer(output, delimiter="\t")
162 188
163 #write header 189 # write header
164 if header : 190 if header:
165 writer.writerow(input_file[0]+features) 191 writer.writerow(input_file[0]+features)
166 input_file = input_file[1:] 192 input_file = input_file[1:]
167 elif args.input_type=="file": 193 elif args.input_type == "file":
168 col_names = [create_header(input_file,column_number,features)] 194 col_names = [create_header(input_file, column_number, features)]
169 writer.writerow(col_names) 195 writer.writerow(col_names)
170 else : 196 else:
171 writer.writerow(["Uniprot-AC"]+features) 197 writer.writerow(["Uniprot-AC"]+features)
172 198
173 #write lines 199 # write lines
174 previous_line="" 200 previous_line = ""
175 if args.input_type=="file" : 201 if args.input_type == "file":
176 for line in input_file : 202 for line in input_file:
177 for res in result_dict[line[column_number]]: 203 for res in result_dict[line[column_number]]:
178 output_line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in line+res] 204 output_line = ["NA" if cell == "" or cell == " " or cell == "NaN" else cell for cell in line+res] # noqa 501
179 if previous_line != output_line : 205 if previous_line != output_line:
180 writer.writerow(output_line) 206 writer.writerow(output_line)
181 previous_line=output_line 207 previous_line = output_line
182 elif args.input_type=="list" : 208 elif args.input_type == "list":
183 for id in ids : 209 for id in ids:
184 for res in result_dict[id]: 210 for res in result_dict[id]:
185 line = ["NA" if cell=="" or cell==" " or cell=="NaN" else cell for cell in [id]+res] 211 line = ["NA" if cell == "" or cell == " " or cell == "NaN" else cell for cell in [id]+res] # noqa 501
186 if previous_line != line : 212 if previous_line != line:
187 writer.writerow(line) 213 writer.writerow(line)
188 previous_line=line 214 previous_line = line
189 215
190 216
191 if __name__ == "__main__": 217 if __name__ == "__main__":
192 main() 218 main()