Mercurial > repos > galaxy-australia > alphafold2
view validate_fasta.py @ 0:7ae9d78b06f5 draft
"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit 7b79778448363aa8c9b14604337e81009e461bd2-dirty"
author | galaxy-australia |
---|---|
date | Fri, 28 Jan 2022 04:56:29 +0000 |
parents | |
children | 6c92e000d684 |
line wrap: on
line source
import argparse from typing import List, TextIO class Fasta: def __init__(self, header_str: str, seq_str: str): self.header = header_str self.aa_seq = seq_str class FastaLoader: def __init__(self): """creates a Fasta() from a file""" self.fastas: List[Fasta] = [] def load(self, fasta_path: str): """ load function has to be very flexible. file may be normal fasta format (header, seq) or can just be a bare sequence. """ with open(fasta_path, 'r') as fp: header, sequence = self.interpret_first_line(fp) line = fp.readline().rstrip('\n') while line: if line.startswith('>'): self.update_fastas(header, sequence) header = line sequence = '' else: sequence += line line = fp.readline().rstrip('\n') # after reading whole file, header & sequence buffers might be full self.update_fastas(header, sequence) return self.fastas def interpret_first_line(self, fp: TextIO): header = '' sequence = '' line = fp.readline().rstrip('\n') if line.startswith('>'): header = line else: sequence += line return header, sequence def update_fastas(self, header: str, sequence: str): # if we have a sequence if not sequence == '': # create generic header if not exists if header == '': fasta_count = len(self.fastas) header = f'>sequence_{fasta_count}' # create new Fasta self.fastas.append(Fasta(header, sequence)) class FastaValidator: def __init__(self, fasta_list: List[Fasta]): self.fasta_list = fasta_list self.min_length = 30 self.max_length = 2000 self.iupac_characters = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '-' } def validate(self): """performs fasta validation""" self.validate_num_seqs() self.validate_length() self.validate_alphabet() # not checking for 'X' nucleotides at the moment. # alphafold can throw an error if it doesn't like it. #self.validate_x() def validate_num_seqs(self) -> None: if len(self.fasta_list) > 1: raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') elif len(self.fasta_list) == 0: raise Exception(f'Error encountered validating fasta: input file has no fasta sequences') def validate_length(self): """Confirms whether sequence length is valid. """ fasta = self.fasta_list[0] if len(fasta.aa_seq) < self.min_length: raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') if len(fasta.aa_seq) > self.max_length: raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') def validate_alphabet(self): """ Confirms whether the sequence conforms to IUPAC codes. If not, reports the offending character and its position. """ fasta = self.fasta_list[0] for i, char in enumerate(fasta.aa_seq.upper()): if char not in self.iupac_characters: raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: {char}') def validate_x(self): """checks if any bases are X. TODO check whether alphafold accepts X bases. """ fasta = self.fasta_list[0] for i, char in enumerate(fasta.aa_seq.upper()): if char == 'X': raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') class FastaWriter: def __init__(self) -> None: self.outfile = 'alphafold.fasta' self.formatted_line_len = 60 def write(self, fasta: Fasta): with open(self.outfile, 'w') as fp: header = fasta.header seq = self.format_sequence(fasta.aa_seq) fp.write(header + '\n') fp.write(seq + '\n') def format_sequence(self, aa_seq: str): formatted_seq = '' for i in range(0, len(aa_seq), self.formatted_line_len): formatted_seq += aa_seq[i: i + self.formatted_line_len] + '\n' return formatted_seq def main(): # load fasta file args = parse_args() fl = FastaLoader() fastas = fl.load(args.input_fasta) # validate fv = FastaValidator(fastas) fv.validate() # write cleaned version fw = FastaWriter() fw.write(fastas[0]) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument( "input_fasta", help="input fasta file", type=str ) return parser.parse_args() if __name__ == '__main__': main()