Mercurial > repos > dereeper > ragoo
comparison RaGOO/ragoo_utilities/utilities.py @ 13:b9a3aeb162ab draft default tip
Uploaded
author | dereeper |
---|---|
date | Mon, 26 Jul 2021 18:22:37 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
12:68a9ec9ce51e | 13:b9a3aeb162ab |
---|---|
1 import time | |
2 import subprocess | |
3 import operator | |
4 | |
5 from ragoo_utilities.SeqReader import SeqReader | |
6 | |
7 """ A collection of various helper functions""" | |
8 | |
9 complements = str.maketrans("ACGTNURYSWKMBVDHacgtnuryswkmbvdh", "TGCANAYRSWMKVBHDtgcanayrswmkvbhd") | |
10 | |
11 | |
12 def reverse_complement(seq): | |
13 """ | |
14 Reverse complement a nucleotide sequence. | |
15 :param seq: Sequence to be reverse complemented | |
16 :return: A reverse complemented sequence | |
17 """ | |
18 return seq.translate(complements)[::-1] | |
19 | |
20 | |
21 def run(cmnd): | |
22 """ Run command and report status. """ | |
23 log('Running : %s' % cmnd) | |
24 if subprocess.call(cmnd, shell=True, executable='/bin/bash') != 0: | |
25 raise RuntimeError('Failed : %s ' % cmnd) | |
26 | |
27 | |
28 def log(message): | |
29 """ Log messages to standard output. """ | |
30 print(time.ctime() + ' --- ' + message, flush=True) | |
31 | |
32 | |
33 def read_contigs(in_file): | |
34 d = dict() | |
35 x = SeqReader(in_file) | |
36 for header, seq in x.parse_fasta(): | |
37 d[header.replace('>', '').split(' ')[0]] = seq | |
38 return d | |
39 | |
40 | |
41 def read_gz_contigs(in_file): | |
42 d = dict() | |
43 x = SeqReader(in_file) | |
44 for header, seq in x.parse_gzip_fasta(): | |
45 d[header.replace('>', '').split(' ')[0]] = seq | |
46 return d | |
47 | |
48 | |
49 def binary_search(query, numbers, left, right): | |
50 """ | |
51 The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py" | |
52 written by Maria Nattestad. The original script can be found here: | |
53 | |
54 https://github.com/MariaNattestad/Assemblytics | |
55 | |
56 And the publication associated with Maria's work is here: | |
57 | |
58 Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a | |
59 web analytics tool for the detection of variants from an | |
60 assembly." Bioinformatics 32.19 (2016): 3021-3023. | |
61 """ | |
62 # Returns index of the matching element or the first element to the right | |
63 | |
64 if left >= right: | |
65 return right | |
66 mid = (right + left) // 2 | |
67 | |
68 if query == numbers[mid]: | |
69 return mid | |
70 elif query < numbers[mid]: | |
71 return binary_search(query, numbers, left, mid) | |
72 else: # if query > numbers[mid]: | |
73 return binary_search(query, numbers, mid + 1, right) | |
74 | |
75 | |
76 def summarize_planesweep(lines, unique_length_required, keep_small_uniques=False): | |
77 """ | |
78 The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py" | |
79 written by Maria Nattestad. The original script can be found here: | |
80 | |
81 https://github.com/MariaNattestad/Assemblytics | |
82 | |
83 And the publication associated with Maria's work is here: | |
84 | |
85 Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a | |
86 web analytics tool for the detection of variants from an | |
87 assembly." Bioinformatics 32.19 (2016): 3021-3023. | |
88 """ | |
89 alignments_to_keep = [] | |
90 # print len(lines) | |
91 | |
92 # If no alignments: | |
93 if len(lines) == 0: | |
94 return [] | |
95 | |
96 # If only one alignment: | |
97 if len(lines) == 1: | |
98 if keep_small_uniques == True or abs(lines[0][1] - lines[0][0]) >= unique_length_required: | |
99 return [0] | |
100 else: | |
101 return [] | |
102 | |
103 starts_and_stops = [] | |
104 for query_min, query_max in lines: | |
105 # print query_min, query_max | |
106 starts_and_stops.append((query_min, "start")) | |
107 starts_and_stops.append((query_max, "stop")) | |
108 | |
109 sorted_starts_and_stops = sorted(starts_and_stops, key=operator.itemgetter(0)) | |
110 | |
111 current_coverage = 0 | |
112 last_position = -1 | |
113 sorted_unique_intervals_left = [] | |
114 sorted_unique_intervals_right = [] | |
115 for pos, change in sorted_starts_and_stops: | |
116 | |
117 if current_coverage == 1: | |
118 # sorted_unique_intervals.append((last_position,pos)) | |
119 sorted_unique_intervals_left.append(last_position) | |
120 sorted_unique_intervals_right.append(pos) | |
121 | |
122 if change == "start": | |
123 current_coverage += 1 | |
124 else: | |
125 current_coverage -= 1 | |
126 last_position = pos | |
127 | |
128 linecounter = 0 | |
129 for query_min, query_max in lines: | |
130 | |
131 i = binary_search(query_min, sorted_unique_intervals_left, 0, len(sorted_unique_intervals_left)) | |
132 | |
133 exact_match = False | |
134 if sorted_unique_intervals_left[i] == query_min and sorted_unique_intervals_right[i] == query_max: | |
135 exact_match = True | |
136 sum_uniq = 0 | |
137 while i < len(sorted_unique_intervals_left) and sorted_unique_intervals_left[i] >= query_min and \ | |
138 sorted_unique_intervals_right[i] <= query_max: | |
139 sum_uniq += sorted_unique_intervals_right[i] - sorted_unique_intervals_left[i] | |
140 i += 1 | |
141 | |
142 # print query_min,query_max,sum_uniq | |
143 if sum_uniq >= unique_length_required: | |
144 alignments_to_keep.append(linecounter) | |
145 elif keep_small_uniques == True and exact_match == True: | |
146 alignments_to_keep.append(linecounter) | |
147 # print "Keeping small alignment:", query_min, query_max | |
148 # print sorted_unique_intervals_left[i-1],sorted_unique_intervals_right[i-1] | |
149 | |
150 linecounter += 1 | |
151 | |
152 return alignments_to_keep |