Mercurial > repos > dereeper > ragoo
diff RaGOO/ragoo_utilities/ReadCoverage.py @ 13:b9a3aeb162ab draft default tip
Uploaded
author | dereeper |
---|---|
date | Mon, 26 Jul 2021 18:22:37 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/RaGOO/ragoo_utilities/ReadCoverage.py Mon Jul 26 18:22:37 2021 +0000 @@ -0,0 +1,212 @@ +import bisect + +import numpy as np + + +class ReadCoverage: + + def __init__(self, in_paf): + self.paf = in_paf + self.glob_mean = None + self.glob_std = None + self.coverage_map = dict() + self.ctg_lens = dict() + self._make_coverage_map() + self._get_glob_mean() + + @staticmethod + def _tabulate_coverage(cov_list): + current_coverage = 0 + coverage_list = [] + seen = set() + + for header, pos in cov_list: + if header in seen: + current_coverage -= 1 + coverage_list.append((pos, current_coverage)) + seen.remove(header) + else: + current_coverage += 1 + coverage_list.append((pos, current_coverage)) + seen.add(header) + return coverage_list + + @staticmethod + def _smooth_supported_breaks(sup_breaks, break_types): + """ + If there are multiple low coverage breaks in close proximity, + merge it into one break at the lowest coverage point. + """ + i = 0 + j = 1 + while j < len(sup_breaks): + if break_types[i] == 'l' and break_types[j] == 'l': + if abs(sup_breaks[i][0] - sup_breaks[j][0]) < 100000: + # Merge these two break points + sup_breaks[i] = min([sup_breaks[i], sup_breaks[j]], key=lambda x: x[1]) + sup_breaks.pop(j) + else: + i += 1 + j += 1 + else: + i += 1 + j += 1 + + return [z[0] for z in sup_breaks] + + def _trim_ends(self, dist=25000): + """ Remove the ends of the contigs from the coverage map. """ + for i in self.coverage_map: + # Start with the beginning of the contig + start_idx = 0 + end_idx = 0 + for j in range(len(self.coverage_map[i])): + if self.coverage_map[i][j][0] < dist: + start_idx = j + + if self.coverage_map[i][j][0] > self.ctg_lens[i] - dist: + end_idx = j-1 + break + self.coverage_map[i] = self.coverage_map[i][start_idx:end_idx] + + # Remove contigs which don't have coverage info. + header_keys = list(self.coverage_map.keys()) + for i in header_keys: + if not self.coverage_map[i]: + self.coverage_map.pop(i) + + def _make_coverage_map(self): + """ + Populate self.coverage_map. This is a dictionary that associates each contig header with a list of alignment + positions and their coverage levels. + """ + # Associate with each contig header, a list of (query header, start), (query header, end) + alns_pos = dict() + with open(self.paf, 'r') as f: + for line in f: + L1 = line.rstrip().split('\t') + + # Only consider an alignment if at least 75% of the read aligned. + if abs((int(L1[3]) - int(L1[2])))/int(L1[1]) >= 0.75: + if L1[5] in alns_pos: + alns_pos[L1[5]].append((L1[0], int(L1[7]))) + alns_pos[L1[5]].append((L1[0], int(L1[8]))) + else: + alns_pos[L1[5]] = [(L1[0], int(L1[7])), (L1[0], int(L1[8]))] + self.ctg_lens[L1[5]] = int(L1[6]) + + # Sort these coverage positions and get the coverage map + for i in alns_pos: + self.coverage_map[i] = self._tabulate_coverage(sorted(alns_pos[i], key=lambda x: x[1])) + + self._trim_ends() + + def _get_glob_mean(self): + L1 = [] + for i in self.coverage_map: + + # In the case where we have multiple coverage values for one position, take the last one. + last_pos = 0 + curr_val = 0 + for j in self.coverage_map[i]: + if j[0] == last_pos: + curr_val = j[1] + else: + last_pos = j[0] + L1.append(curr_val) + cur_val = j[1] + + L1 = np.asarray(L1, dtype=np.int32) + self.glob_mean = np.median(L1) + self.glob_std = np.sqrt(self.glob_mean) + + def _get_index_range(self, header, start_ind, distance): + """ + Get the list of indices that are contained within a distance around the start index + """ + all_inds = [] + low_counter = 1 + + # Check if the start point is at the end of the contig + if start_ind == len(self.coverage_map[header]): + start_ind -= 1 + + start_pos = self.coverage_map[header][start_ind][0] + is_low = False + + # Get all coverage map indices representing regions 50kbp upstream of the start + while not is_low: + next_ind = start_ind-low_counter + if next_ind < 0: + is_low = True + else: + next_pos = self.coverage_map[header][next_ind][0] + if start_pos - next_pos > distance: + is_low = True + else: + all_inds.append(next_ind) + low_counter += 1 + + # Repeat for 50kbp downstream + high_counter = 1 + is_high = False + while not is_high: + next_ind = start_ind + high_counter + if next_ind >= len(self.coverage_map[header]): + is_high = True + else: + next_pos = self.coverage_map[header][next_ind][0] + if next_pos - start_pos > distance: + is_high = True + else: + all_inds.append(next_ind) + high_counter += 1 + + return sorted(all_inds) + + def check_break_cov(self, header, in_breaks, min_cov=None, max_cov=None): + """ + Given a list of potential break points, verify if those break points occur around low or high coverage + areas. If so, replace the candidate break point with the low/high coverage break point. + :param header: contig header for these breaks + :param in_breaks: list of candidate break points + :param min_cov: break at coverage levels below this value + :param max_cov: break at coverage levels above this value + :return: list of real break points, or empty list if not breaking is recommended + """ + # Check that we have coverage info for this contig. + if header not in self.coverage_map: + return [] + + if min_cov is None or max_cov is None: + # Automatically calculate min and max coverage + min_cov = max(self.glob_mean - (self.glob_std*3), 0) + max_cov = self.glob_mean + (self.glob_std*3) + + supported_breaks = [] + break_types = [] + + for i in in_breaks: + # Get the coverage for the position closest to this potential break point + ins_ind = bisect.bisect_left(self.coverage_map[header], (i, 0)) + # Get the coverage for positions within 50 kbp of the candidate coverage position + # Exclude positions near the ends of the sequence. + ind_range = self._get_index_range(header, ins_ind, 50000) + + if len(set(ind_range)) > 1: + # Check for low coverage + lowest_cov = min(self.coverage_map[header][ind_range[0]:ind_range[-1]], key=lambda x: x[1]) + if lowest_cov[1] < min_cov: + supported_breaks.append(lowest_cov) + break_types.append('l') + continue + + # Check for high coverage + highest_cov = max(self.coverage_map[header][ind_range[0]:ind_range[-1]], key=lambda x: x[1]) + if highest_cov[1] > max_cov: + supported_breaks.append(highest_cov) + break_types.append('h') + + return self._smooth_supported_breaks(supported_breaks, break_types) + +