Mercurial > repos > dereeper > ragoo
comparison RaGOO/ragoo_utilities/ContigAlignment.py @ 13:b9a3aeb162ab draft default tip
Uploaded
author | dereeper |
---|---|
date | Mon, 26 Jul 2021 18:22:37 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
12:68a9ec9ce51e | 13:b9a3aeb162ab |
---|---|
1 import operator | |
2 from collections import defaultdict | |
3 | |
4 from ragoo_utilities.utilities import summarize_planesweep, binary_search | |
5 | |
6 | |
7 class ContigAlignment: | |
8 """ | |
9 This object will represent a contig's alignment to a reference chromosome. | |
10 Specifically, this class will organize alignments in a one-to-many fashion, where | |
11 we will only have one object per contig, and each object will store all of the alignments | |
12 of that contig. | |
13 | |
14 | |
15 """ | |
16 | |
17 def __init__(self, in_contig): | |
18 """ | |
19 Each object will refer to a single contig via its header name. | |
20 | |
21 All other alignment metrics will be stored in lists, where the list offsets | |
22 correspond to the alignment number. | |
23 | |
24 Don't actually add any alignments through object instantiation. Only use the add_alignment method. | |
25 """ | |
26 self.contig = in_contig | |
27 | |
28 self.query_lens = [] | |
29 self.query_starts = [] | |
30 self.query_ends = [] | |
31 self.strands = [] | |
32 self.ref_headers = [] | |
33 self.ref_lens = [] | |
34 self.ref_starts = [] | |
35 self.ref_ends = [] | |
36 self.num_matches = [] | |
37 self.aln_lens = [] | |
38 self.mapqs = [] | |
39 | |
40 self._is_unique_anchor_filtered = False | |
41 self._is_merged = False | |
42 | |
43 def __repr__(self): | |
44 return '<ContigAlignment' + self.contig + '>' | |
45 | |
46 def __str__(self): | |
47 """ Return the alignments in sorted PAF format. """ | |
48 self.sort_by_query() | |
49 all_alns = [] | |
50 for i in range(len(self.ref_headers)): | |
51 all_alns.append( | |
52 "\t".join([ | |
53 self.contig, | |
54 str(self.query_lens[i]), | |
55 str(self.query_starts[i]), | |
56 str(self.query_ends[i]), | |
57 self.strands[i], | |
58 self.ref_headers[i], | |
59 str(self.ref_lens[i]), | |
60 str(self.ref_starts[i]), | |
61 str(self.ref_ends[i]), | |
62 str(self.num_matches[i]), | |
63 str(self.aln_lens[i]), | |
64 str(self.mapqs[i]) | |
65 ]) | |
66 ) | |
67 return "\n".join(all_alns) | |
68 | |
69 def get_attr_lens(self): | |
70 all_lens = [ | |
71 len(self.query_lens), | |
72 len(self.query_starts), | |
73 len(self.query_ends), | |
74 len(self.strands), | |
75 len(self.ref_headers), | |
76 len(self.ref_lens), | |
77 len(self.ref_starts), | |
78 len(self.ref_ends), | |
79 len(self.num_matches), | |
80 len(self.aln_lens), | |
81 len(self.mapqs) | |
82 ] | |
83 return all_lens | |
84 | |
85 def add_alignment(self, paf_line): | |
86 """ | |
87 The only way to add alignments for this contig. Only accepts a PAFLine type object which represents | |
88 as single line of a paf file. | |
89 """ | |
90 if not paf_line.contig == self.contig: | |
91 raise ValueError('Only can add alignments with query header %s' % self.contig) | |
92 | |
93 self.query_lens.append(paf_line.query_len) | |
94 self.query_starts.append(paf_line.query_start) | |
95 self.query_ends.append(paf_line.query_end) | |
96 self.strands.append(paf_line.strand) | |
97 self.ref_headers.append(paf_line.ref_header) | |
98 self.ref_lens.append(paf_line.ref_len) | |
99 self.ref_starts.append(paf_line.ref_start) | |
100 self.ref_ends.append(paf_line.ref_end) | |
101 self.num_matches.append(paf_line.num_match) | |
102 self.aln_lens.append(paf_line.aln_len) | |
103 self.mapqs.append(paf_line.mapq) | |
104 | |
105 # Check that all attribute lists have same length | |
106 all_lens = self.get_attr_lens() | |
107 assert len(set(all_lens)) == 1 | |
108 | |
109 def has_unique_chr_match(self): | |
110 s1 = set(self.ref_headers) | |
111 if len(s1) == 1: | |
112 return True | |
113 return False | |
114 | |
115 def count_chr_matches(self): | |
116 s1 = set(self.ref_headers) | |
117 return len(s1) | |
118 | |
119 def rearrange_alns(self, hits): | |
120 """ Generally re structure the alignments according to 'hits', an ordered list of indices. """ | |
121 self.query_lens = [self.query_lens[i] for i in hits] | |
122 self.query_starts = [self.query_starts[i] for i in hits] | |
123 self.query_ends = [self.query_ends[i] for i in hits] | |
124 self.strands = [self.strands[i] for i in hits] | |
125 self.ref_headers = [self.ref_headers[i] for i in hits] | |
126 self.ref_lens = [self.ref_lens[i] for i in hits] | |
127 self.ref_starts = [self.ref_starts[i] for i in hits] | |
128 self.ref_ends = [self.ref_ends[i] for i in hits] | |
129 self.num_matches = [self.num_matches[i] for i in hits] | |
130 self.aln_lens = [self.aln_lens[i] for i in hits] | |
131 self.mapqs = [self.mapqs[i] for i in hits] | |
132 | |
133 def filter_ref_chroms(self, in_chroms): | |
134 """ | |
135 Given a list of chromosomes, mutate this object so as to only include alignments to those chromosomes. | |
136 """ | |
137 hits = [] | |
138 for i in range(len(self.ref_headers)): | |
139 if self.ref_headers[i] in in_chroms: | |
140 hits.append(i) | |
141 | |
142 self.rearrange_alns(hits) | |
143 | |
144 def sort_by_ref(self): | |
145 ref_pos = [] | |
146 for i in range(len(self.ref_headers)): | |
147 ref_pos.append((self.ref_headers[i], self.ref_starts[i], self.ref_ends[i], i)) | |
148 hits = [i[3] for i in sorted(ref_pos)] | |
149 | |
150 self.rearrange_alns(hits) | |
151 | |
152 def sort_by_query(self): | |
153 q_pos = [] | |
154 for i in range(len(self.ref_headers)): | |
155 q_pos.append((self.query_starts[i], self.query_ends[i], i)) | |
156 hits = [i[2] for i in sorted(q_pos)] | |
157 | |
158 self.rearrange_alns(hits) | |
159 | |
160 def exclude_ref_chroms(self, exclude_list): | |
161 hits = [] | |
162 for i in range(len(self.ref_headers)): | |
163 if self.ref_headers[i] not in exclude_list: | |
164 hits.append(i) | |
165 | |
166 self.rearrange_alns(hits) | |
167 | |
168 def filter_lengths(self, l): | |
169 hits = [] | |
170 for i in range(len(self.ref_headers)): | |
171 if self.aln_lens[i] > l: | |
172 hits.append(i) | |
173 | |
174 self.rearrange_alns(hits) | |
175 | |
176 def unique_anchor_filter(self): | |
177 """ | |
178 The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py" | |
179 written by Maria Nattestad. The original script can be found here: | |
180 | |
181 https://github.com/MariaNattestad/Assemblytics | |
182 | |
183 And the publication associated with Maria's work is here: | |
184 | |
185 Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a | |
186 web analytics tool for the detection of variants from an | |
187 assembly." Bioinformatics 32.19 (2016): 3021-3023. | |
188 """ | |
189 | |
190 if not self._is_unique_anchor_filtered: | |
191 lines_by_query = [] | |
192 for i, j in zip(self.query_starts, self.query_ends): | |
193 lines_by_query.append((i, j)) | |
194 | |
195 hits = summarize_planesweep(lines_by_query, 10000) | |
196 self.rearrange_alns(hits) | |
197 self._is_unique_anchor_filtered = True | |
198 | |
199 def merge_alns(self, merge_dist=100000): | |
200 """ | |
201 Merge chains of alignments that are to the same target, have the same orientation, and are less than | |
202 merge_dist away from each other. | |
203 :param merge_dist: | |
204 :return: | |
205 """ | |
206 # Sort the alignments | |
207 self.sort_by_ref() | |
208 | |
209 # Might also want to filter out low identity alignments | |
210 | |
211 # Keep track of which alignments we are comparing | |
212 i = 0 | |
213 j = 1 | |
214 while j < len(self.ref_headers): | |
215 if all([ | |
216 self.ref_headers[i] == self.ref_headers[j], | |
217 self.strands[i] == self.strands[j], | |
218 self.ref_starts[j] - self.ref_ends[i] <= merge_dist | |
219 ]): | |
220 # Merge the alignments in place of the first alignment | |
221 self.ref_starts[i] = min(self.ref_starts[i], self.ref_starts[j]) | |
222 self.ref_ends[i] = max(self.ref_ends[i], self.ref_ends[j]) | |
223 self.query_starts[i] = min(self.query_starts[i], self.query_starts[j]) | |
224 self.query_ends[i] = max(self.query_ends[i], self.query_ends[j]) | |
225 | |
226 self.num_matches[i] += self.num_matches[j] | |
227 self.aln_lens[i] = self.ref_ends[i] - self.ref_starts[i] | |
228 self.mapqs[i] = (self.mapqs[i] + self.mapqs[j])//2 | |
229 | |
230 # Remove the redundant alignment | |
231 self.query_lens.pop(j) | |
232 self.query_starts.pop(j) | |
233 self.query_ends.pop(j) | |
234 self.strands.pop(j) | |
235 self.ref_headers.pop(j) | |
236 self.ref_lens.pop(j) | |
237 self.ref_starts.pop(j) | |
238 self.ref_ends.pop(j) | |
239 self.num_matches.pop(j) | |
240 self.aln_lens.pop(j) | |
241 self.mapqs.pop(j) | |
242 else: | |
243 i += 1 | |
244 j += 1 | |
245 | |
246 # remove contained alignments. contained w.r.t the contig | |
247 cont_inds = [] | |
248 for i in range(len(self.ref_headers)): | |
249 for j in range(len(self.ref_headers)): | |
250 # Check if j is contained by i | |
251 if i == j: | |
252 continue | |
253 if self.query_starts[i] <= self.query_starts[j] and self.query_ends[i] >= self.query_ends[j]: | |
254 cont_inds.append(j) | |
255 | |
256 hits = [i for i in range(len(self.ref_headers)) if i not in cont_inds] | |
257 self.rearrange_alns(hits) | |
258 | |
259 self._is_merged = True | |
260 | |
261 def get_break_candidates(self): | |
262 if not self._is_merged: | |
263 raise ValueError("Alignments must first be merged.") | |
264 | |
265 self.sort_by_query() | |
266 all_candidates = [] | |
267 for i in range(1, len(self.ref_headers)): | |
268 all_candidates.append(min(self.query_ends[i-1], self.query_starts[i])) | |
269 all_candidates.append(max(self.query_ends[i-1], self.query_starts[i])) | |
270 | |
271 return all_candidates | |
272 | |
273 | |
274 | |
275 class UniqueContigAlignment: | |
276 """ | |
277 A class representing the reference chromosome to which a particular contig will be assigned to. | |
278 At first, the incoming alignments may be to multiple chromosomes, but this class will assign a single | |
279 reference chromosome to an input contig. | |
280 """ | |
281 | |
282 def __init__(self, in_contig_aln): | |
283 if not isinstance(in_contig_aln, ContigAlignment): | |
284 message = 'Can only make a unique alignment from a ContigAlignment object. Got type %s instead.' % str(type(in_contig_aln)) | |
285 raise ValueError(message) | |
286 | |
287 self.contig = in_contig_aln.contig | |
288 | |
289 # Find the chromosome which was covered the most in these alignments. | |
290 self.ref_chrom = None | |
291 self.confidence = 0.0 | |
292 self._get_best_ref_cov(in_contig_aln) | |
293 | |
294 def __str__(self): | |
295 return '<UniqueContigAlignment:' + self.contig + ' - ' + self.ref_chrom + '>' | |
296 | |
297 def _get_best_ref_cov(self, alns): | |
298 # Get list of all references chromosomes | |
299 all_chroms = set(alns.ref_headers) | |
300 if len(all_chroms) == 1: | |
301 self.ref_chrom = alns.ref_headers[0] | |
302 self.confidence = 1 | |
303 return | |
304 | |
305 # Initialize coverage counts for each chromosome | |
306 ranges = dict() | |
307 for i in all_chroms: | |
308 ranges[i] = 0 | |
309 | |
310 # Get all the ranges in reference to all chromosomes | |
311 all_intervals = defaultdict(list) | |
312 for i in range(len(alns.ref_headers)): | |
313 this_range = (alns.ref_starts[i], alns.ref_ends[i]) | |
314 this_chrom = alns.ref_headers[i] | |
315 all_intervals[this_chrom].append(this_range) | |
316 | |
317 # For each chromosome, sort the range and get the union interval length. | |
318 # Algorithm and pseudocode given by Michael Kirsche | |
319 for i in all_intervals.keys(): | |
320 sorted_intervals = sorted(all_intervals[i], key=lambda tup: tup[0]) | |
321 max_end = -1 | |
322 for j in sorted_intervals: | |
323 start_new_terr = max(j[0], max_end) | |
324 ranges[i] += max(0, j[1] - start_new_terr) | |
325 max_end = max(max_end, j[1]) | |
326 | |
327 assert ranges | |
328 | |
329 # I convert to a list and sort the ranges.items() in order to have ties broken in a deterministic way. | |
330 max_chrom = max(sorted(list(ranges.items())), key=operator.itemgetter(1))[0] | |
331 self.ref_chrom = max_chrom | |
332 | |
333 # Now get the confidence of this chromosome assignment | |
334 # Equal to the max range over all ranges | |
335 self.confidence = ranges[max_chrom]/sum(ranges.values()) | |
336 assert self.confidence >= 0 | |
337 assert self.confidence <= 1 | |
338 | |
339 | |
340 class LongestContigAlignment: | |
341 """ | |
342 Given a ContigAlignment object, find the longest alignment with respect to a given references chromosome. | |
343 | |
344 1. Filter out any alignments that are not aligned to specified chromosome. | |
345 2. Find longest alignment remaining. | |
346 """ | |
347 | |
348 def __init__(self, in_contig_aln): | |
349 self.best_index = self._get_best_index(in_contig_aln) | |
350 self.contig = in_contig_aln.contig | |
351 self.ref_start = in_contig_aln.ref_starts[self.best_index] | |
352 self.ref_end = in_contig_aln.ref_ends[self.best_index] | |
353 self.aln_len = in_contig_aln.aln_lens[self.best_index] | |
354 self.strand = in_contig_aln.strands[self.best_index] | |
355 self.interval = (self.ref_start, self.ref_end) | |
356 | |
357 def _get_best_index(self, aln): | |
358 max_index = -1 | |
359 max_len = -1 | |
360 for i in range(len(aln.aln_lens)): | |
361 if aln.aln_lens[i] > max_len: | |
362 max_len = aln.aln_lens[i] | |
363 max_index = i | |
364 return max_index |