comparison scripts/validate_fasta.py @ 16:f9eb041c518c draft

planemo upload for repository https://github.com/usegalaxy-au/tools-au commit ee77734f1800350fa2a6ef28b2b8eade304a456f-dirty
author galaxy-australia
date Mon, 03 Apr 2023 01:00:42 +0000
parents
children e4a053d67e24
comparison
equal deleted inserted replaced
15:a58f7eb0df2c 16:f9eb041c518c
1 """Validate input FASTA sequence."""
2
3 import argparse
4 import re
5 import sys
6 from typing import List
7
8 MULTIMER_MAX_SEQUENCE_COUNT = 10
9
10
11 class Fasta:
12 def __init__(self, header_str: str, seq_str: str):
13 self.header = header_str
14 self.aa_seq = seq_str
15
16
17 class FastaLoader:
18 def __init__(self, fasta_path: str):
19 """Initialize from FASTA file."""
20 self.fastas = []
21 self.load(fasta_path)
22
23 def load(self, fasta_path: str):
24 """Load bare or FASTA formatted sequence."""
25 with open(fasta_path, 'r') as f:
26 self.content = f.read()
27
28 if "__cn__" in self.content:
29 # Pasted content with escaped characters
30 self.newline = '__cn__'
31 self.read_caret = '__gt__'
32 else:
33 # Uploaded file with normal content
34 self.newline = '\n'
35 self.read_caret = '>'
36
37 self.lines = self.content.split(self.newline)
38
39 if not self.lines[0].startswith(self.read_caret):
40 # Fasta is headless, load as single sequence
41 self.update_fastas(
42 '', ''.join(self.lines)
43 )
44
45 else:
46 header = None
47 sequence = None
48 for line in self.lines:
49 if line.startswith(self.read_caret):
50 if header:
51 self.update_fastas(header, sequence)
52 header = '>' + self.strip_header(line)
53 sequence = ''
54 else:
55 sequence += line.strip('\n ')
56 self.update_fastas(header, sequence)
57
58 def strip_header(self, line):
59 """Strip characters escaped with underscores from pasted text."""
60 return re.sub(r'\_\_.{2}\_\_', '', line).strip('>')
61
62 def update_fastas(self, header: str, sequence: str):
63 # if we have a sequence
64 if sequence:
65 # create generic header if not exists
66 if not header:
67 fasta_count = len(self.fastas)
68 header = f'>sequence_{fasta_count}'
69
70 # Create new Fasta
71 self.fastas.append(Fasta(header, sequence))
72
73
74 class FastaValidator:
75 def __init__(
76 self,
77 min_length=None,
78 max_length=None,
79 multiple=False):
80 self.multiple = multiple
81 self.min_length = min_length
82 self.max_length = max_length
83 self.iupac_characters = {
84 'A', 'B', 'C', 'D', 'E', 'F', 'G',
85 'H', 'I', 'K', 'L', 'M', 'N', 'P',
86 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
87 'Y', 'Z', '-'
88 }
89
90 def validate(self, fasta_list: List[Fasta]):
91 """Perform FASTA validation."""
92 self.fasta_list = fasta_list
93 self.validate_num_seqs()
94 self.validate_length()
95 self.validate_alphabet()
96 # not checking for 'X' nucleotides at the moment.
97 # alphafold can throw an error if it doesn't like it.
98 # self.validate_x()
99 return self.fasta_list
100
101 def validate_num_seqs(self) -> None:
102 """Assert that only one sequence has been provided."""
103 fasta_count = len(self.fasta_list)
104
105 if self.multiple:
106 if fasta_count < 2:
107 raise ValueError(
108 'Error encountered validating FASTA:\n'
109 'Multimer mode requires multiple input sequence.'
110 f' Only {fasta_count} sequences were detected in'
111 ' the provided file.')
112 self.fasta_list = self.fasta_list
113
114 elif fasta_count > MULTIMER_MAX_SEQUENCE_COUNT:
115 sys.stderr.write(
116 f'WARNING: detected {fasta_count} sequences but the'
117 f' maximum allowed is {MULTIMER_MAX_SEQUENCE_COUNT}'
118 ' sequences. The last'
119 f' {fasta_count - MULTIMER_MAX_SEQUENCE_COUNT} sequence(s)'
120 ' have been discarded.\n')
121 self.fasta_list = self.fasta_list[:MULTIMER_MAX_SEQUENCE_COUNT]
122 else:
123 if fasta_count > 1:
124 sys.stderr.write(
125 'WARNING: More than 1 sequence detected.'
126 ' Using first FASTA sequence as input.\n')
127 self.fasta_list = self.fasta_list[:1]
128
129 elif len(self.fasta_list) == 0:
130 raise ValueError(
131 'Error encountered validating FASTA:\n'
132 ' no FASTA sequences detected in input file.')
133
134 def validate_length(self):
135 """Confirm whether sequence length is valid."""
136 fasta = self.fasta_list[0]
137 if self.min_length:
138 if len(fasta.aa_seq) < self.min_length:
139 raise ValueError(
140 'Error encountered validating FASTA:\n Sequence too short'
141 f' ({len(fasta.aa_seq)}AA).'
142 f' Minimum length is {self.min_length}AA.')
143 if self.max_length:
144 if len(fasta.aa_seq) > self.max_length:
145 raise ValueError(
146 'Error encountered validating FASTA:\n'
147 f' Sequence too long ({len(fasta.aa_seq)}AA).'
148 f' Maximum length is {self.max_length}AA.')
149
150 def validate_alphabet(self):
151 """Confirm whether the sequence conforms to IUPAC codes.
152
153 If not, report the offending character and its position.
154 """
155 fasta = self.fasta_list[0]
156 for i, char in enumerate(fasta.aa_seq.upper()):
157 if char not in self.iupac_characters:
158 raise ValueError(
159 'Error encountered validating FASTA:\n Invalid amino acid'
160 f' found at pos {i}: "{char}"')
161
162 def validate_x(self):
163 """Check for X bases."""
164 fasta = self.fasta_list[0]
165 for i, char in enumerate(fasta.aa_seq.upper()):
166 if char == 'X':
167 raise ValueError(
168 'Error encountered validating FASTA:\n Unsupported AA code'
169 f' "X" found at pos {i}')
170
171
172 class FastaWriter:
173 def __init__(self) -> None:
174 self.line_wrap = 60
175
176 def write(self, fasta: Fasta):
177 header = fasta.header
178 seq = self.format_sequence(fasta.aa_seq)
179 sys.stdout.write(header + '\n')
180 sys.stdout.write(seq)
181
182 def format_sequence(self, aa_seq: str):
183 formatted_seq = ''
184 for i in range(0, len(aa_seq), self.line_wrap):
185 formatted_seq += aa_seq[i: i + self.line_wrap] + '\n'
186 return formatted_seq.upper()
187
188
189 def main():
190 # load fasta file
191 try:
192 args = parse_args()
193 fas = FastaLoader(args.input)
194
195 # validate
196 fv = FastaValidator(
197 min_length=args.min_length,
198 max_length=args.max_length,
199 multiple=args.multimer,
200 )
201 clean_fastas = fv.validate(fas.fastas)
202
203 # write clean data
204 fw = FastaWriter()
205 for fas in clean_fastas:
206 fw.write(fas)
207
208 sys.stderr.write("Validated FASTA sequence(s):\n\n")
209 for fas in clean_fastas:
210 sys.stderr.write(fas.header + '\n')
211 sys.stderr.write(fas.aa_seq + '\n\n')
212
213 except ValueError as exc:
214 sys.stderr.write(f"{exc}\n\n")
215 raise exc
216
217 except Exception as exc:
218 sys.stderr.write(
219 "Input error: FASTA input is invalid. Please check your input.\n\n"
220 )
221 raise exc
222
223
224 def parse_args() -> argparse.Namespace:
225 parser = argparse.ArgumentParser()
226 parser.add_argument(
227 "input",
228 help="input fasta file",
229 type=str
230 )
231 parser.add_argument(
232 "--min_length",
233 dest='min_length',
234 help="Minimum length of input protein sequence (AA)",
235 default=None,
236 type=int,
237 )
238 parser.add_argument(
239 "--max_length",
240 dest='max_length',
241 help="Maximum length of input protein sequence (AA)",
242 default=None,
243 type=int,
244 )
245 parser.add_argument(
246 "--multimer",
247 action='store_true',
248 help="Require multiple input sequences",
249 )
250 return parser.parse_args()
251
252
253 if __name__ == '__main__':
254 main()