Mercurial > repos > dereeper > ragoo
diff RaGOO/ragoo_utilities/utilities.py @ 13:b9a3aeb162ab draft default tip
Uploaded
author | dereeper |
---|---|
date | Mon, 26 Jul 2021 18:22:37 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/RaGOO/ragoo_utilities/utilities.py Mon Jul 26 18:22:37 2021 +0000 @@ -0,0 +1,152 @@ +import time +import subprocess +import operator + +from ragoo_utilities.SeqReader import SeqReader + +""" A collection of various helper functions""" + +complements = str.maketrans("ACGTNURYSWKMBVDHacgtnuryswkmbvdh", "TGCANAYRSWMKVBHDtgcanayrswmkvbhd") + + +def reverse_complement(seq): + """ + Reverse complement a nucleotide sequence. + :param seq: Sequence to be reverse complemented + :return: A reverse complemented sequence + """ + return seq.translate(complements)[::-1] + + +def run(cmnd): + """ Run command and report status. """ + log('Running : %s' % cmnd) + if subprocess.call(cmnd, shell=True, executable='/bin/bash') != 0: + raise RuntimeError('Failed : %s ' % cmnd) + + +def log(message): + """ Log messages to standard output. """ + print(time.ctime() + ' --- ' + message, flush=True) + + +def read_contigs(in_file): + d = dict() + x = SeqReader(in_file) + for header, seq in x.parse_fasta(): + d[header.replace('>', '').split(' ')[0]] = seq + return d + + +def read_gz_contigs(in_file): + d = dict() + x = SeqReader(in_file) + for header, seq in x.parse_gzip_fasta(): + d[header.replace('>', '').split(' ')[0]] = seq + return d + + +def binary_search(query, numbers, left, right): + """ + The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py" + written by Maria Nattestad. The original script can be found here: + + https://github.com/MariaNattestad/Assemblytics + + And the publication associated with Maria's work is here: + + Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a + web analytics tool for the detection of variants from an + assembly." Bioinformatics 32.19 (2016): 3021-3023. + """ + # Returns index of the matching element or the first element to the right + + if left >= right: + return right + mid = (right + left) // 2 + + if query == numbers[mid]: + return mid + elif query < numbers[mid]: + return binary_search(query, numbers, left, mid) + else: # if query > numbers[mid]: + return binary_search(query, numbers, mid + 1, right) + + +def summarize_planesweep(lines, unique_length_required, keep_small_uniques=False): + """ + The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py" + written by Maria Nattestad. The original script can be found here: + + https://github.com/MariaNattestad/Assemblytics + + And the publication associated with Maria's work is here: + + Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a + web analytics tool for the detection of variants from an + assembly." Bioinformatics 32.19 (2016): 3021-3023. + """ + alignments_to_keep = [] + # print len(lines) + + # If no alignments: + if len(lines) == 0: + return [] + + # If only one alignment: + if len(lines) == 1: + if keep_small_uniques == True or abs(lines[0][1] - lines[0][0]) >= unique_length_required: + return [0] + else: + return [] + + starts_and_stops = [] + for query_min, query_max in lines: + # print query_min, query_max + starts_and_stops.append((query_min, "start")) + starts_and_stops.append((query_max, "stop")) + + sorted_starts_and_stops = sorted(starts_and_stops, key=operator.itemgetter(0)) + + current_coverage = 0 + last_position = -1 + sorted_unique_intervals_left = [] + sorted_unique_intervals_right = [] + for pos, change in sorted_starts_and_stops: + + if current_coverage == 1: + # sorted_unique_intervals.append((last_position,pos)) + sorted_unique_intervals_left.append(last_position) + sorted_unique_intervals_right.append(pos) + + if change == "start": + current_coverage += 1 + else: + current_coverage -= 1 + last_position = pos + + linecounter = 0 + for query_min, query_max in lines: + + i = binary_search(query_min, sorted_unique_intervals_left, 0, len(sorted_unique_intervals_left)) + + exact_match = False + if sorted_unique_intervals_left[i] == query_min and sorted_unique_intervals_right[i] == query_max: + exact_match = True + sum_uniq = 0 + while i < len(sorted_unique_intervals_left) and sorted_unique_intervals_left[i] >= query_min and \ + sorted_unique_intervals_right[i] <= query_max: + sum_uniq += sorted_unique_intervals_right[i] - sorted_unique_intervals_left[i] + i += 1 + + # print query_min,query_max,sum_uniq + if sum_uniq >= unique_length_required: + alignments_to_keep.append(linecounter) + elif keep_small_uniques == True and exact_match == True: + alignments_to_keep.append(linecounter) + # print "Keeping small alignment:", query_min, query_max + # print sorted_unique_intervals_left[i-1],sorted_unique_intervals_right[i-1] + + linecounter += 1 + + return alignments_to_keep