diff RaGOO/ragoo_utilities/ContigAlignment.py @ 13:b9a3aeb162ab draft default tip

Uploaded
author dereeper
date Mon, 26 Jul 2021 18:22:37 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/RaGOO/ragoo_utilities/ContigAlignment.py	Mon Jul 26 18:22:37 2021 +0000
@@ -0,0 +1,364 @@
+import operator
+from collections import defaultdict
+
+from ragoo_utilities.utilities import summarize_planesweep, binary_search
+
+
+class ContigAlignment:
+    """
+    This object will represent a contig's alignment to a reference chromosome.
+    Specifically, this class will organize alignments in a one-to-many fashion, where
+    we will only have one object per contig, and each object will store all of the alignments
+    of that contig.
+
+
+    """
+
+    def __init__(self, in_contig):
+        """
+        Each object will refer to a single contig via its header name.
+
+        All other alignment metrics will be stored in lists, where the list offsets
+        correspond to the alignment number.
+
+        Don't actually add any alignments through object instantiation. Only use the add_alignment method.
+        """
+        self.contig = in_contig
+
+        self.query_lens = []
+        self.query_starts = []
+        self.query_ends = []
+        self.strands = []
+        self.ref_headers = []
+        self.ref_lens = []
+        self.ref_starts = []
+        self.ref_ends = []
+        self.num_matches = []
+        self.aln_lens = []
+        self.mapqs = []
+
+        self._is_unique_anchor_filtered = False
+        self._is_merged = False
+
+    def __repr__(self):
+        return '<ContigAlignment' + self.contig + '>'
+
+    def __str__(self):
+        """ Return the alignments in sorted PAF format. """
+        self.sort_by_query()
+        all_alns = []
+        for i in range(len(self.ref_headers)):
+            all_alns.append(
+                "\t".join([
+                    self.contig,
+                    str(self.query_lens[i]),
+                    str(self.query_starts[i]),
+                    str(self.query_ends[i]),
+                    self.strands[i],
+                    self.ref_headers[i],
+                    str(self.ref_lens[i]),
+                    str(self.ref_starts[i]),
+                    str(self.ref_ends[i]),
+                    str(self.num_matches[i]),
+                    str(self.aln_lens[i]),
+                    str(self.mapqs[i])
+                ])
+            )
+        return "\n".join(all_alns)
+
+    def get_attr_lens(self):
+        all_lens = [
+            len(self.query_lens),
+            len(self.query_starts),
+            len(self.query_ends),
+            len(self.strands),
+            len(self.ref_headers),
+            len(self.ref_lens),
+            len(self.ref_starts),
+            len(self.ref_ends),
+            len(self.num_matches),
+            len(self.aln_lens),
+            len(self.mapqs)
+        ]
+        return all_lens
+
+    def add_alignment(self, paf_line):
+        """
+        The only way to add alignments for this contig. Only accepts a PAFLine type object which represents
+        as single line of a paf file.
+        """
+        if not paf_line.contig == self.contig:
+            raise ValueError('Only can add alignments with query header %s' % self.contig)
+
+        self.query_lens.append(paf_line.query_len)
+        self.query_starts.append(paf_line.query_start)
+        self.query_ends.append(paf_line.query_end)
+        self.strands.append(paf_line.strand)
+        self.ref_headers.append(paf_line.ref_header)
+        self.ref_lens.append(paf_line.ref_len)
+        self.ref_starts.append(paf_line.ref_start)
+        self.ref_ends.append(paf_line.ref_end)
+        self.num_matches.append(paf_line.num_match)
+        self.aln_lens.append(paf_line.aln_len)
+        self.mapqs.append(paf_line.mapq)
+
+        # Check that all attribute lists have same length
+        all_lens = self.get_attr_lens()
+        assert len(set(all_lens)) == 1
+
+    def has_unique_chr_match(self):
+        s1 = set(self.ref_headers)
+        if len(s1) == 1:
+            return True
+        return False
+
+    def count_chr_matches(self):
+        s1 = set(self.ref_headers)
+        return len(s1)
+
+    def rearrange_alns(self, hits):
+        """ Generally re structure the alignments according to 'hits', an ordered list of indices. """
+        self.query_lens = [self.query_lens[i] for i in hits]
+        self.query_starts = [self.query_starts[i] for i in hits]
+        self.query_ends = [self.query_ends[i] for i in hits]
+        self.strands = [self.strands[i] for i in hits]
+        self.ref_headers = [self.ref_headers[i] for i in hits]
+        self.ref_lens = [self.ref_lens[i] for i in hits]
+        self.ref_starts = [self.ref_starts[i] for i in hits]
+        self.ref_ends = [self.ref_ends[i] for i in hits]
+        self.num_matches = [self.num_matches[i] for i in hits]
+        self.aln_lens = [self.aln_lens[i] for i in hits]
+        self.mapqs = [self.mapqs[i] for i in hits]
+
+    def filter_ref_chroms(self, in_chroms):
+        """
+        Given a list of chromosomes, mutate this object so as to only include alignments to those chromosomes.
+        """
+        hits = []
+        for i in range(len(self.ref_headers)):
+            if self.ref_headers[i] in in_chroms:
+                hits.append(i)
+
+        self.rearrange_alns(hits)
+
+    def sort_by_ref(self):
+        ref_pos = []
+        for i in range(len(self.ref_headers)):
+            ref_pos.append((self.ref_headers[i], self.ref_starts[i], self.ref_ends[i], i))
+        hits = [i[3] for i in sorted(ref_pos)]
+
+        self.rearrange_alns(hits)
+
+    def sort_by_query(self):
+        q_pos = []
+        for i in range(len(self.ref_headers)):
+            q_pos.append((self.query_starts[i], self.query_ends[i], i))
+        hits = [i[2] for i in sorted(q_pos)]
+
+        self.rearrange_alns(hits)
+
+    def exclude_ref_chroms(self, exclude_list):
+        hits = []
+        for i in range(len(self.ref_headers)):
+            if self.ref_headers[i] not in exclude_list:
+                hits.append(i)
+
+        self.rearrange_alns(hits)
+
+    def filter_lengths(self, l):
+        hits = []
+        for i in range(len(self.ref_headers)):
+            if self.aln_lens[i] > l:
+                hits.append(i)
+
+        self.rearrange_alns(hits)
+
+    def unique_anchor_filter(self):
+        """
+        The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py"
+        written by Maria Nattestad. The original script can be found here:
+
+        https://github.com/MariaNattestad/Assemblytics
+
+        And the publication associated with Maria's work is here:
+
+        Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a
+        web analytics tool for the detection of variants from an
+        assembly." Bioinformatics 32.19 (2016): 3021-3023.
+        """
+
+        if not self._is_unique_anchor_filtered:
+            lines_by_query = []
+            for i, j in zip(self.query_starts, self.query_ends):
+                lines_by_query.append((i, j))
+
+            hits = summarize_planesweep(lines_by_query, 10000)
+            self.rearrange_alns(hits)
+            self._is_unique_anchor_filtered = True
+
+    def merge_alns(self, merge_dist=100000):
+        """
+        Merge chains of alignments that are to the same target, have the same orientation, and are less than
+        merge_dist away from each other.
+        :param merge_dist:
+        :return:
+        """
+        # Sort the alignments
+        self.sort_by_ref()
+
+        # Might also want to filter out low identity alignments
+
+        # Keep track of which alignments we are comparing
+        i = 0
+        j = 1
+        while j < len(self.ref_headers):
+            if all([
+                self.ref_headers[i] == self.ref_headers[j],
+                self.strands[i] == self.strands[j],
+                self.ref_starts[j] - self.ref_ends[i] <= merge_dist
+            ]):
+                # Merge the alignments in place of the first alignment
+                self.ref_starts[i] = min(self.ref_starts[i], self.ref_starts[j])
+                self.ref_ends[i] = max(self.ref_ends[i], self.ref_ends[j])
+                self.query_starts[i] = min(self.query_starts[i], self.query_starts[j])
+                self.query_ends[i] = max(self.query_ends[i], self.query_ends[j])
+
+                self.num_matches[i] += self.num_matches[j]
+                self.aln_lens[i] = self.ref_ends[i] - self.ref_starts[i]
+                self.mapqs[i] = (self.mapqs[i] + self.mapqs[j])//2
+
+                # Remove the redundant alignment
+                self.query_lens.pop(j)
+                self.query_starts.pop(j)
+                self.query_ends.pop(j)
+                self.strands.pop(j)
+                self.ref_headers.pop(j)
+                self.ref_lens.pop(j)
+                self.ref_starts.pop(j)
+                self.ref_ends.pop(j)
+                self.num_matches.pop(j)
+                self.aln_lens.pop(j)
+                self.mapqs.pop(j)
+            else:
+                i += 1
+                j += 1
+
+        # remove contained alignments. contained w.r.t the contig
+        cont_inds = []
+        for i in range(len(self.ref_headers)):
+            for j in range(len(self.ref_headers)):
+                # Check if j is contained by i
+                if i == j:
+                    continue
+                if self.query_starts[i] <= self.query_starts[j] and self.query_ends[i] >= self.query_ends[j]:
+                    cont_inds.append(j)
+
+        hits = [i for i in range(len(self.ref_headers)) if i not in cont_inds]
+        self.rearrange_alns(hits)
+
+        self._is_merged = True
+
+    def get_break_candidates(self):
+        if not self._is_merged:
+            raise ValueError("Alignments must first be merged.")
+
+        self.sort_by_query()
+        all_candidates = []
+        for i in range(1, len(self.ref_headers)):
+            all_candidates.append(min(self.query_ends[i-1], self.query_starts[i]))
+            all_candidates.append(max(self.query_ends[i-1], self.query_starts[i]))
+
+        return all_candidates
+
+
+
+class UniqueContigAlignment:
+    """
+    A class representing the reference chromosome to which a particular contig will be assigned to.
+    At first, the incoming alignments may be to multiple chromosomes, but this class will assign a single
+    reference chromosome to an input contig.
+    """
+
+    def __init__(self, in_contig_aln):
+        if not isinstance(in_contig_aln, ContigAlignment):
+            message = 'Can only make a unique alignment from a ContigAlignment object. Got type %s instead.' % str(type(in_contig_aln))
+            raise ValueError(message)
+
+        self.contig = in_contig_aln.contig
+
+        # Find the chromosome which was covered the most in these alignments.
+        self.ref_chrom = None
+        self.confidence = 0.0
+        self._get_best_ref_cov(in_contig_aln)
+
+    def __str__(self):
+        return '<UniqueContigAlignment:' + self.contig + ' - ' + self.ref_chrom + '>'
+
+    def _get_best_ref_cov(self, alns):
+        # Get list of all references chromosomes
+        all_chroms = set(alns.ref_headers)
+        if len(all_chroms) == 1:
+            self.ref_chrom = alns.ref_headers[0]
+            self.confidence = 1
+            return
+
+        # Initialize coverage counts for each chromosome
+        ranges = dict()
+        for i in all_chroms:
+            ranges[i] = 0
+
+        # Get all the ranges in reference to all chromosomes
+        all_intervals = defaultdict(list)
+        for i in range(len(alns.ref_headers)):
+            this_range = (alns.ref_starts[i], alns.ref_ends[i])
+            this_chrom = alns.ref_headers[i]
+            all_intervals[this_chrom].append(this_range)
+
+        # For each chromosome, sort the range and get the union interval length.
+        # Algorithm and pseudocode given by Michael Kirsche
+        for i in all_intervals.keys():
+            sorted_intervals = sorted(all_intervals[i], key=lambda tup: tup[0])
+            max_end = -1
+            for j in sorted_intervals:
+                start_new_terr = max(j[0], max_end)
+                ranges[i] += max(0, j[1] - start_new_terr)
+                max_end = max(max_end, j[1])
+
+        assert ranges
+
+        # I convert to a list and sort the ranges.items() in order to have ties broken in a deterministic way.
+        max_chrom = max(sorted(list(ranges.items())), key=operator.itemgetter(1))[0]
+        self.ref_chrom = max_chrom
+
+        # Now get the confidence of this chromosome assignment
+        # Equal to the max range over all ranges
+        self.confidence = ranges[max_chrom]/sum(ranges.values())
+        assert self.confidence >= 0
+        assert self.confidence <= 1
+
+
+class LongestContigAlignment:
+    """
+    Given a ContigAlignment object, find the longest alignment with respect to a given references chromosome.
+
+    1. Filter out any alignments that are not aligned to specified chromosome.
+    2. Find longest alignment remaining.
+    """
+
+    def __init__(self, in_contig_aln):
+        self.best_index = self._get_best_index(in_contig_aln)
+        self.contig = in_contig_aln.contig
+        self.ref_start = in_contig_aln.ref_starts[self.best_index]
+        self.ref_end = in_contig_aln.ref_ends[self.best_index]
+        self.aln_len = in_contig_aln.aln_lens[self.best_index]
+        self.strand = in_contig_aln.strands[self.best_index]
+        self.interval = (self.ref_start, self.ref_end)
+
+    def _get_best_index(self, aln):
+        max_index = -1
+        max_len = -1
+        for i in range(len(aln.aln_lens)):
+            if aln.aln_lens[i] > max_len:
+                max_len = aln.aln_lens[i]
+                max_index = i
+        return max_index
\ No newline at end of file