Mercurial > repos > earlhaminst > t_coffee
diff filter_by_fasta_ids.py @ 3:78dd29aa7fc1 draft
planemo upload for repository https://github.com/TGAC/earlham-galaxytools/tree/master/tools/t_coffee commit 81a1e79dda127d1afc16c7e456bbec16093a3c3f-dirty
author | earlhaminst |
---|---|
date | Mon, 20 Feb 2017 06:25:50 -0500 |
parents | b3833e5b50d4 |
children | fa59d6fea7f5 |
line wrap: on
line diff
--- a/filter_by_fasta_ids.py Mon Jan 09 14:26:41 2017 -0500 +++ b/filter_by_fasta_ids.py Mon Feb 20 06:25:50 2017 -0500 @@ -2,65 +2,27 @@ """ A script to build specific fasta databases """ from __future__ import print_function - -import logging +import collections import sys - -# ===================================== Iterator =============================== -class Sequence: - ''' Holds protein sequence information ''' - def __init__(self): - self.header = "" - self.sequence_parts = [] - - def get_sequence(self): - return "".join([line.rstrip().replace('\n', '').replace('\r', '') for line in self.sequence_parts]) +Sequence = collections.namedtuple('Sequence', ['header', 'sequence']) -class FASTAReader: - """ - FASTA db iterator. Returns a single FASTA sequence object. - """ - def __init__(self, fasta_name): - self.fasta_file = open(fasta_name) - self.next_line = self.fasta_file.readline() - - def __iter__(self): - return self - - def __next__(self): - ''' Iteration ''' - # while True: - # line = self.fasta_file.readline() - # if not line: - # raise StopIteration - # if line[0] == '>': - # break - next_line = self.next_line - if not next_line: - raise StopIteration - - seq = Sequence() - seq.header = next_line.rstrip().replace('\n', '').replace('\r', '') - - next_line = self.fasta_file.readline() - while next_line and next_line[0] != '>': - # tail = self.fasta_file.tell() - # line = self.fasta_file.readline() - # if not line: - # break - # if line[0] == '>': - # self.fasta_file.seek(tail) - # break - seq.sequence_parts.append(next_line) - next_line = self.fasta_file.readline() - self.next_line = next_line - return seq - - # Python 2/3 compat - next = __next__ -# ============================================================================== +def FASTAReader_gen(fasta_filename): + fasta_file = open(fasta_filename) + line = fasta_file.readline() + while True: + if not line: + return + assert line.startswith('>'), "FASTA headers must start with >" + header = line.rstrip() + sequence_parts = [] + line = fasta_file.readline() + while line and line[0] != '>': + sequence_parts.append(line.rstrip()) + line = fasta_file.readline() + sequence = "".join(sequence_parts) + yield Sequence(header, sequence) def target_match(target, search_entry): @@ -74,9 +36,6 @@ def main(): ''' the main function''' - logging.basicConfig(filename='filter_fasta_log', - level=logging.INFO, - format='%(asctime)s :: %(levelname)s :: %(message)s',) used_sequences = set() work_summary = {'wanted': 0, 'found': 0, 'duplicates': 0} @@ -87,20 +46,17 @@ targets.append(">%s" % line.strip().upper()) work_summary['wanted'] = len(targets) - homd_db = FASTAReader(sys.argv[2]) # output = open(sys.argv[3], "w") - for entry in homd_db: + for entry in FASTAReader_gen(sys.argv[2]): target_matched_results = target_match(targets, entry.header) if target_matched_results: work_summary['found'] += 1 targets.remove(target_matched_results) - sequence = entry.get_sequence() + sequence = entry.sequence used_sequences.add(sequence) print(entry.header) print(sequence) - for parm, count in work_summary.items(): - logging.info('%s ==> %d', parm, count) if __name__ == "__main__":