Mercurial > repos > iss > eurl_vtec_wgs_pt
comparison scripts/ReMatCh/modules/checkMLST.py @ 0:c6bab5103a14 draft
"planemo upload commit 6abf3e299d82d07e6c3cf8642bdea80e96df64c3-dirty"
author | iss |
---|---|
date | Mon, 21 Mar 2022 15:23:09 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c6bab5103a14 |
---|---|
1 import sys | |
2 import os | |
3 import urllib.request | |
4 import csv | |
5 from glob import glob | |
6 import re | |
7 import functools | |
8 try: | |
9 import xml.etree.cElementTree as ET | |
10 except ImportError: | |
11 import xml.etree.ElementTree as ET | |
12 from . import utils | |
13 from . import rematch_module | |
14 | |
15 | |
16 def determine_species(species): | |
17 species = species.lower().split(' ') | |
18 | |
19 if len(species) >= 2: | |
20 species = species[:2] | |
21 if species[1] in ('spp', 'spp.', 'complex'): | |
22 species = [species[0]] | |
23 | |
24 return species | |
25 | |
26 | |
27 def check_existing_schema(species, schema_number, script_path): | |
28 species = determine_species(species) | |
29 | |
30 if schema_number is None: | |
31 schema_number = '' | |
32 else: | |
33 schema_number = '#' + str(schema_number) | |
34 | |
35 mlst_schemas_folder = os.path.join(os.path.dirname(script_path), 'modules', 'mlst_schemas', '') | |
36 reference = [] | |
37 files = [f for f in os.listdir(mlst_schemas_folder) if not f.startswith('.') and os.path.isfile(os.path.join(mlst_schemas_folder, f))] | |
38 for file_found in files: | |
39 file_path = os.path.join(mlst_schemas_folder, file_found) | |
40 if file_found.startswith('_'.join(species) + schema_number) and file_found.endswith('.fasta'): | |
41 reference = file_path | |
42 | |
43 if len(reference) > 1: | |
44 if schema_number == '': | |
45 schema_number = '#1' | |
46 for scheme in reference: | |
47 if os.path.splitext(scheme)[0].endswith(schema_number): | |
48 reference = [scheme] | |
49 break | |
50 if len(reference) == 0: | |
51 reference = None | |
52 elif len(reference) == 1: | |
53 reference = reference[0] | |
54 return reference | |
55 | |
56 | |
57 def write_mlst_reference(species, mlst_sequences, outdir, time_str): | |
58 print('Writing MLST alleles as reference_sequences' + '\n') | |
59 reference_file = os.path.join(outdir, str(species.replace('/', '_').replace(' ', '_') + '.' + time_str + '.fasta')) | |
60 with open(reference_file, 'wt') as writer: | |
61 for header, sequence in list(mlst_sequences.items()): | |
62 writer.write('>' + header + '\n') | |
63 fasta_sequence_lines = rematch_module.chunkstring(sequence, 80) | |
64 for line in fasta_sequence_lines: | |
65 writer.write(line + '\n') | |
66 return reference_file | |
67 | |
68 | |
69 def get_st(mlst_dicts, dict_sequences): | |
70 SequenceDict = mlst_dicts[0] | |
71 STdict = mlst_dicts[1] | |
72 lociOrder = mlst_dicts[2] | |
73 | |
74 alleles_profile = ['-'] * len(lociOrder) | |
75 for x, sequence_data in list(dict_sequences.items()): | |
76 if sequence_data['header'] not in SequenceDict: | |
77 print(sequence_data['header'] + ' not found between consensus sequences!') | |
78 break | |
79 if sequence_data['sequence'] in list(SequenceDict[sequence_data['header']].keys()): | |
80 allele_number = SequenceDict[sequence_data['header']][sequence_data['sequence']] | |
81 alleles_profile[lociOrder.index(sequence_data['header'])] = allele_number | |
82 else: | |
83 for sequence_st, allele_number in list(SequenceDict[sequence_data['header']].items()): | |
84 if sequence_st in sequence_data['sequence']: | |
85 alleles_profile[lociOrder.index(sequence_data['header'])] = allele_number | |
86 | |
87 alleles_profile = ','.join(alleles_profile) | |
88 st = '-' | |
89 if alleles_profile in STdict: | |
90 st = STdict[alleles_profile] | |
91 | |
92 return st, alleles_profile | |
93 | |
94 | |
95 downloadPubMLST = functools.partial(utils.timer, name='Download PubMLST module') | |
96 | |
97 | |
98 @downloadPubMLST | |
99 def download_pub_mlst_xml(originalSpecies, schema_number, outdir): | |
100 print('Searching MLST database for ' + originalSpecies) | |
101 | |
102 xmlURL = 'http://pubmlst.org/data/dbases.xml' | |
103 try: | |
104 content = urllib.request.urlopen(xmlURL) | |
105 xml = content.read() | |
106 tree = ET.fromstring(xml) | |
107 except: | |
108 print("Ooops! There might be a problem with the PubMLST service, try later or check if the xml is well formated" | |
109 " at " + xmlURL) | |
110 raise | |
111 | |
112 xmlData = {} | |
113 | |
114 if schema_number is None: | |
115 schema_number = 1 | |
116 | |
117 success = 0 | |
118 for scheme in tree.findall('species'): | |
119 species_scheme = scheme.text.rstrip('\r\n').rsplit('#', 1) | |
120 number_scheme = species_scheme[1] if len(species_scheme) == 2 else 1 | |
121 species_scheme = species_scheme[0] | |
122 if determine_species(species_scheme) == determine_species(originalSpecies): | |
123 if schema_number == number_scheme: | |
124 success += 1 | |
125 xmlData[scheme.text.strip()] = {} | |
126 for info in scheme: # mlst | |
127 for database in info: # database | |
128 for retrievedDate in database.findall('retrieved'): | |
129 retrieved = retrievedDate.text | |
130 xmlData[scheme.text.strip()][retrieved] = [] | |
131 for profile in database.findall('profiles'): | |
132 profileURl = profile.find('url').text | |
133 xmlData[scheme.text.strip()][retrieved].append(profileURl) | |
134 for lociScheme in database.findall('loci'): | |
135 loci = {} | |
136 for locus in lociScheme: | |
137 locusID = locus.text | |
138 for locusInfo in locus: | |
139 locusUrl = locusInfo.text | |
140 loci[locusID.strip()] = locusUrl | |
141 xmlData[scheme.text.strip()][retrieved].append(loci) | |
142 if success == 0: | |
143 sys.exit("\tError. No schema found for %s. Please refer to https://pubmlst.org/databases/" % (originalSpecies)) | |
144 elif success > 1: | |
145 keys = list(xmlData.keys()) | |
146 keys = sorted(keys) | |
147 print("\tWarning. More than one schema found for %s. only keeping the first" | |
148 " one... %s" % (originalSpecies, keys[0])) | |
149 for key in keys[1:]: | |
150 del xmlData[key] | |
151 | |
152 pubmlst_dir = os.path.join(outdir, 'pubmlst', '') | |
153 if not os.path.isdir(pubmlst_dir): | |
154 os.makedirs(pubmlst_dir) | |
155 | |
156 for SchemaName, info in list(xmlData.items()): | |
157 STdict = {} | |
158 SequenceDict = {} | |
159 mlst_sequences = {} | |
160 | |
161 species_name = '_'.join(determine_species(SchemaName)).replace('/', '_') | |
162 | |
163 for RetrievalDate, URL in list(info.items()): | |
164 schema_date = species_name + '_' + RetrievalDate | |
165 outDit = os.path.join(pubmlst_dir, schema_date) # compatible with windows? See if it already exists, if so, break | |
166 | |
167 if os.path.isdir(outDit): | |
168 pickle = os.path.join(outDit, str(schema_date + '.pkl')) | |
169 if os.path.isfile(pickle): | |
170 print("\tschema files already exist for %s" % (SchemaName)) | |
171 mlst_dicts = utils.extract_variable_from_pickle(pickle) | |
172 SequenceDict = mlst_dicts[0] | |
173 for lociName, alleleSequences in list(SequenceDict.items()): | |
174 for sequence in alleleSequences: | |
175 if lociName not in list(mlst_sequences.keys()): | |
176 mlst_sequences[lociName] = sequence | |
177 else: | |
178 break | |
179 return mlst_dicts, mlst_sequences | |
180 | |
181 elif any(species_name in x for x in os.listdir(pubmlst_dir)): | |
182 print("Older version of %s's scheme found! Deleting..." % (SchemaName)) | |
183 for directory in glob(str(pubmlst_dir + str(species_name + '_*'))): | |
184 utils.remove_directory(directory) | |
185 os.makedirs(outDit) | |
186 else: | |
187 os.makedirs(outDit) | |
188 | |
189 contentProfile = urllib.request.urlopen(URL[0]) | |
190 header = next(contentProfile).decode("utf8").strip().split('\t') # skip header | |
191 try: | |
192 indexCC = header.index('clonal_complex') if 'clonal_complex' in header else header.index('CC') | |
193 except: | |
194 indexCC = len(header) | |
195 lociOrder = header[1:indexCC] | |
196 for row in contentProfile: | |
197 row = row.decode("utf8").strip().split('\t') | |
198 ST = row[0] | |
199 alleles = ','.join(row[1:indexCC]) | |
200 STdict[alleles] = ST | |
201 for lociName, lociURL in list(URL[1].items()): | |
202 if lociName not in list(SequenceDict.keys()): | |
203 SequenceDict[lociName] = {} | |
204 url_file = os.path.join(outDit, lociURL.rsplit('/', 1)[1]) | |
205 urllib.request.urlretrieve(lociURL, url_file) | |
206 sequences, ignore, ignore = rematch_module.get_sequence_information(url_file, 0) | |
207 for key in list(sequences.keys()): | |
208 header = re.sub("\D", "", sequences[key]['header']) | |
209 sequence = sequences[key]['sequence'].upper() | |
210 SequenceDict[lociName][sequence] = header | |
211 if lociName not in list(mlst_sequences.keys()): | |
212 mlst_sequences[lociName] = sequence | |
213 os.remove(url_file) | |
214 mlst_dicts = [SequenceDict, STdict, lociOrder] | |
215 utils.save_variable_to_pickle(mlst_dicts, outDit, schema_date) | |
216 return mlst_dicts, mlst_sequences |