diff validate_fasta.py @ 1:6c92e000d684 draft

"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit a510e97ebd604a5e30b1f16e5031f62074f23e86"
author galaxy-australia
date Tue, 01 Mar 2022 02:53:05 +0000
parents 7ae9d78b06f5
children 04e95886cf24
line wrap: on
line diff
--- a/validate_fasta.py	Fri Jan 28 04:56:29 2022 +0000
+++ b/validate_fasta.py	Tue Mar 01 02:53:05 2022 +0000
@@ -1,5 +1,6 @@
+"""Validate input FASTA sequence."""
 
-
+import re
 import argparse
 from typing import List, TextIO
 
@@ -11,51 +12,67 @@
 
 
 class FastaLoader:
-    def __init__(self):
-        """creates a Fasta() from a file"""
-        self.fastas: List[Fasta] = []
+    def __init__(self, fasta_path: str):
+        """Initialize from FASTA file."""
+        self.fastas = []
+        self.load(fasta_path)
+        print("Loaded FASTA sequences:")
+        for f in self.fastas:
+            print(f.header)
+            print(f.aa_seq)
 
     def load(self, fasta_path: str):
-        """
-        load function has to be very flexible. 
-        file may be normal fasta format (header, seq) or can just be a bare sequence. 
-        """
-        with open(fasta_path, 'r') as fp:
-            header, sequence = self.interpret_first_line(fp)
-            line = fp.readline().rstrip('\n')
-        
-            while line:
-                if line.startswith('>'):
-                    self.update_fastas(header, sequence)
-                    header = line
-                    sequence = ''
-                else:
-                    sequence += line
-                line = fp.readline().rstrip('\n')
+        """Load bare or FASTA formatted sequence."""
+        with open(fasta_path, 'r') as f:
+            self.content = f.read()
+
+        if "__cn__" in self.content:
+            # Pasted content with escaped characters
+            self.newline = '__cn__'
+            self.caret = '__gt__'
+        else:
+            # Uploaded file with normal content
+            self.newline = '\n'
+            self.caret = '>'
+
+        self.lines = self.content.split(self.newline)
+        header, sequence = self.interpret_first_line()
+
+        i = 0
+        while i < len(self.lines):
+            line = self.lines[i]
+            if line.startswith(self.caret):
+                self.update_fastas(header, sequence)
+                header = '>' + self.strip_header(line)
+                sequence = ''
+            else:
+                sequence += line.strip('\n ')
+            i += 1
 
         # after reading whole file, header & sequence buffers might be full
         self.update_fastas(header, sequence)
-        return self.fastas
 
-    def interpret_first_line(self, fp: TextIO):
-        header = ''
-        sequence = ''
-        line = fp.readline().rstrip('\n')
-        if line.startswith('>'):
-            header = line
+    def interpret_first_line(self):
+        line = self.lines[0]
+        if line.startswith(self.caret):
+            header = '>' + self.strip_header(line)
+            return header, ''
         else:
-            sequence += line
-        return header, sequence
-                
+            return '', line
+
+    def strip_header(self, line):
+        """Strip characters escaped with underscores from pasted text."""
+        return re.sub(r'\_\_.{2}\_\_', '', line).strip('>')
+
     def update_fastas(self, header: str, sequence: str):
         # if we have a sequence
-        if not sequence == '':
+        if sequence:
             # create generic header if not exists
-            if header == '':
+            if not header:
                 fasta_count = len(self.fastas)
                 header = f'>sequence_{fasta_count}'
 
-            # create new Fasta    
+            # Create new Fasta
             self.fastas.append(Fasta(header, sequence))
 
 
@@ -65,9 +82,9 @@
         self.min_length = 30
         self.max_length = 2000
         self.iupac_characters = {
-            'A', 'B', 'C', 'D', 'E', 'F', 'G', 
-            'H', 'I', 'K', 'L', 'M', 'N', 'P', 
-            'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 
+            'A', 'B', 'C', 'D', 'E', 'F', 'G',
+            'H', 'I', 'K', 'L', 'M', 'N', 'P',
+            'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
             'Y', 'Z', '-'
         }
 
@@ -76,9 +93,9 @@
         self.validate_num_seqs()
         self.validate_length()
         self.validate_alphabet()
-        # not checking for 'X' nucleotides at the moment. 
-        # alphafold can throw an error if it doesn't like it. 
-        #self.validate_x() 
+        # not checking for 'X' nucleotides at the moment.
+        # alphafold can throw an error if it doesn't like it.
+        #self.validate_x()
 
     def validate_num_seqs(self) -> None:
         if len(self.fasta_list) > 1:
@@ -93,19 +110,19 @@
             raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa')
         if len(fasta.aa_seq) > self.max_length:
             raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa')
-    
+
     def validate_alphabet(self):
         """
-        Confirms whether the sequence conforms to IUPAC codes. 
-        If not, reports the offending character and its position. 
-        """ 
+        Confirms whether the sequence conforms to IUPAC codes.
+        If not, reports the offending character and its position.
+        """
         fasta = self.fasta_list[0]
         for i, char in enumerate(fasta.aa_seq.upper()):
             if char not in self.iupac_characters:
-                raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: {char}')
+                raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: "{char}"')
 
     def validate_x(self):
-        """checks if any bases are X. TODO check whether alphafold accepts X bases. """ 
+        """checks if any bases are X. TODO check whether alphafold accepts X bases. """
         fasta = self.fasta_list[0]
         for i, char in enumerate(fasta.aa_seq.upper()):
             if char == 'X':
@@ -134,28 +151,27 @@
 def main():
     # load fasta file
     args = parse_args()
-    fl = FastaLoader()
-    fastas = fl.load(args.input_fasta)
+    fas = FastaLoader(args.input_fasta)
 
     # validate
-    fv = FastaValidator(fastas)
+    fv = FastaValidator(fas.fastas)
     fv.validate()
 
     # write cleaned version
     fw = FastaWriter()
-    fw.write(fastas[0])
+    fw.write(fas.fastas[0])
 
-        
+
 def parse_args() -> argparse.Namespace:
     parser = argparse.ArgumentParser()
     parser.add_argument(
-        "input_fasta", 
-        help="input fasta file", 
+        "input_fasta",
+        help="input fasta file",
         type=str
-    )   
+    )
     return parser.parse_args()
 
 
 
 if __name__ == '__main__':
-    main()
\ No newline at end of file
+    main()