Mercurial > repos > galaxy-australia > alphafold2
comparison validate_fasta.py @ 0:7ae9d78b06f5 draft
"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit 7b79778448363aa8c9b14604337e81009e461bd2-dirty"
author | galaxy-australia |
---|---|
date | Fri, 28 Jan 2022 04:56:29 +0000 |
parents | |
children | 6c92e000d684 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:7ae9d78b06f5 |
---|---|
1 | |
2 | |
3 import argparse | |
4 from typing import List, TextIO | |
5 | |
6 | |
7 class Fasta: | |
8 def __init__(self, header_str: str, seq_str: str): | |
9 self.header = header_str | |
10 self.aa_seq = seq_str | |
11 | |
12 | |
13 class FastaLoader: | |
14 def __init__(self): | |
15 """creates a Fasta() from a file""" | |
16 self.fastas: List[Fasta] = [] | |
17 | |
18 def load(self, fasta_path: str): | |
19 """ | |
20 load function has to be very flexible. | |
21 file may be normal fasta format (header, seq) or can just be a bare sequence. | |
22 """ | |
23 with open(fasta_path, 'r') as fp: | |
24 header, sequence = self.interpret_first_line(fp) | |
25 line = fp.readline().rstrip('\n') | |
26 | |
27 while line: | |
28 if line.startswith('>'): | |
29 self.update_fastas(header, sequence) | |
30 header = line | |
31 sequence = '' | |
32 else: | |
33 sequence += line | |
34 line = fp.readline().rstrip('\n') | |
35 | |
36 # after reading whole file, header & sequence buffers might be full | |
37 self.update_fastas(header, sequence) | |
38 return self.fastas | |
39 | |
40 def interpret_first_line(self, fp: TextIO): | |
41 header = '' | |
42 sequence = '' | |
43 line = fp.readline().rstrip('\n') | |
44 if line.startswith('>'): | |
45 header = line | |
46 else: | |
47 sequence += line | |
48 return header, sequence | |
49 | |
50 def update_fastas(self, header: str, sequence: str): | |
51 # if we have a sequence | |
52 if not sequence == '': | |
53 # create generic header if not exists | |
54 if header == '': | |
55 fasta_count = len(self.fastas) | |
56 header = f'>sequence_{fasta_count}' | |
57 | |
58 # create new Fasta | |
59 self.fastas.append(Fasta(header, sequence)) | |
60 | |
61 | |
62 class FastaValidator: | |
63 def __init__(self, fasta_list: List[Fasta]): | |
64 self.fasta_list = fasta_list | |
65 self.min_length = 30 | |
66 self.max_length = 2000 | |
67 self.iupac_characters = { | |
68 'A', 'B', 'C', 'D', 'E', 'F', 'G', | |
69 'H', 'I', 'K', 'L', 'M', 'N', 'P', | |
70 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', | |
71 'Y', 'Z', '-' | |
72 } | |
73 | |
74 def validate(self): | |
75 """performs fasta validation""" | |
76 self.validate_num_seqs() | |
77 self.validate_length() | |
78 self.validate_alphabet() | |
79 # not checking for 'X' nucleotides at the moment. | |
80 # alphafold can throw an error if it doesn't like it. | |
81 #self.validate_x() | |
82 | |
83 def validate_num_seqs(self) -> None: | |
84 if len(self.fasta_list) > 1: | |
85 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') | |
86 elif len(self.fasta_list) == 0: | |
87 raise Exception(f'Error encountered validating fasta: input file has no fasta sequences') | |
88 | |
89 def validate_length(self): | |
90 """Confirms whether sequence length is valid. """ | |
91 fasta = self.fasta_list[0] | |
92 if len(fasta.aa_seq) < self.min_length: | |
93 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') | |
94 if len(fasta.aa_seq) > self.max_length: | |
95 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') | |
96 | |
97 def validate_alphabet(self): | |
98 """ | |
99 Confirms whether the sequence conforms to IUPAC codes. | |
100 If not, reports the offending character and its position. | |
101 """ | |
102 fasta = self.fasta_list[0] | |
103 for i, char in enumerate(fasta.aa_seq.upper()): | |
104 if char not in self.iupac_characters: | |
105 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: {char}') | |
106 | |
107 def validate_x(self): | |
108 """checks if any bases are X. TODO check whether alphafold accepts X bases. """ | |
109 fasta = self.fasta_list[0] | |
110 for i, char in enumerate(fasta.aa_seq.upper()): | |
111 if char == 'X': | |
112 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') | |
113 | |
114 | |
115 class FastaWriter: | |
116 def __init__(self) -> None: | |
117 self.outfile = 'alphafold.fasta' | |
118 self.formatted_line_len = 60 | |
119 | |
120 def write(self, fasta: Fasta): | |
121 with open(self.outfile, 'w') as fp: | |
122 header = fasta.header | |
123 seq = self.format_sequence(fasta.aa_seq) | |
124 fp.write(header + '\n') | |
125 fp.write(seq + '\n') | |
126 | |
127 def format_sequence(self, aa_seq: str): | |
128 formatted_seq = '' | |
129 for i in range(0, len(aa_seq), self.formatted_line_len): | |
130 formatted_seq += aa_seq[i: i + self.formatted_line_len] + '\n' | |
131 return formatted_seq | |
132 | |
133 | |
134 def main(): | |
135 # load fasta file | |
136 args = parse_args() | |
137 fl = FastaLoader() | |
138 fastas = fl.load(args.input_fasta) | |
139 | |
140 # validate | |
141 fv = FastaValidator(fastas) | |
142 fv.validate() | |
143 | |
144 # write cleaned version | |
145 fw = FastaWriter() | |
146 fw.write(fastas[0]) | |
147 | |
148 | |
149 def parse_args() -> argparse.Namespace: | |
150 parser = argparse.ArgumentParser() | |
151 parser.add_argument( | |
152 "input_fasta", | |
153 help="input fasta file", | |
154 type=str | |
155 ) | |
156 return parser.parse_args() | |
157 | |
158 | |
159 | |
160 if __name__ == '__main__': | |
161 main() |