view filter_by_fasta_ids.py @ 5:dff7df6fcab5 draft default tip

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/filter_by_fasta_ids commit f608f41d45664d04d3124c6ebc791bf8a566b3c5
author galaxyp
date Wed, 15 May 2019 03:18:11 -0400
parents cd22452edec2
children
line wrap: on
line source

#!/usr/bin/env python
""" A script to build specific fasta databases """
from __future__ import print_function

import argparse
import re
import sys


class Sequence(object):
    def __init__(self, header, sequence_parts):
        self.header = header
        self.sequence_parts = sequence_parts
        self._sequence = None

    @property
    def sequence(self):
        if self._sequence is None:
            self._sequence = ''.join(self.sequence_parts)
        return self._sequence

    def print(self, fh=sys.stdout):
        print(self.header, file=fh)
        for line in self.sequence_parts:
            print(line, file=fh)


def FASTAReader_gen(fasta_filename):
    with open(fasta_filename) as fasta_file:
        line = fasta_file.readline()
        while True:
            if not line:
                return
            assert line.startswith('>'), "FASTA headers must start with >"
            header = line.rstrip()
            sequence_parts = []
            line = fasta_file.readline()
            while line and line[0] != '>':
                sequence_parts.append(line.rstrip())
                line = fasta_file.readline()
            yield Sequence(header, sequence_parts)


def target_match(targets, search_entry, pattern):
    ''' Matches '''
    search_entry = search_entry.upper()
    m = pattern.search(search_entry)
    if m:
        target = m.group(len(m.groups()))
        if target in targets:
            return target
    else:
        print('No ID match: %s' % search_entry, file=sys.stdout)
    return None


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', required=True, help='Path to input FASTA file')
    parser.add_argument('-o', required=True, help='Path to output FASTA file')
    parser.add_argument('-d', help='Path to discarded entries file')
    header_criteria = parser.add_mutually_exclusive_group()
    header_criteria.add_argument('--id_list', help='Path to the ID list file')
    parser.add_argument('--pattern', help='regex search pattern for ID in FASTA entry')
    header_criteria.add_argument('--header_regexp', help='Regular expression pattern the header should match')
    sequence_criteria = parser.add_mutually_exclusive_group()
    sequence_criteria.add_argument('--min_length', type=int, help='Minimum sequence length')
    sequence_criteria.add_argument('--sequence_regexp', help='Regular expression pattern the sequence should match')
    parser.add_argument('--max_length', type=int, help='Maximum sequence length')
    parser.add_argument('--dedup', action='store_true', default=False, help='Whether to remove duplicate sequences')
    options = parser.parse_args()

    if options.pattern:
        if not re.match('^.*[(](?![?]:).*[)].*$', options.pattern):
            sys.exit('pattern: "%s" did not include capture group "()" in regex ' % options.pattern)
        pattern = re.compile(options.pattern)

    if options.min_length is not None and options.max_length is None:
        options.max_length = sys.maxsize
    if options.header_regexp:
        header_regexp = re.compile(options.header_regexp)
    if options.sequence_regexp:
        sequence_regexp = re.compile(options.sequence_regexp)

    work_summary = {'found': 0, 'discarded': 0}

    if options.dedup:
        used_sequences = set()
        work_summary['duplicates'] = 0

    if options.id_list:
        targets = set()
        with open(options.id_list) as f_target:
            for line in f_target:
                targets.add(line.strip().upper())
        work_summary['wanted'] = len(targets)

    homd_db = FASTAReader_gen(options.i)
    if options.d:
        discarded = open(options.d, 'w')

    with open(options.o, "w") as output:
        for entry in homd_db:
            print_entry = True
            if options.id_list:
                target_matched_results = target_match(targets, entry.header, pattern)
                if target_matched_results:
                    targets.remove(target_matched_results)
                else:
                    print_entry = False
            elif options.header_regexp:
                if header_regexp.search(entry.header) is None:
                    print_entry = False
            if options.min_length is not None:
                sequence_length = len(entry.sequence)
                if not(options.min_length <= sequence_length <= options.max_length):
                    print_entry = False
            elif options.sequence_regexp:
                if sequence_regexp.search(entry.sequence) is None:
                    print_entry = False
            if print_entry:
                if options.dedup:
                    if entry.sequence in used_sequences:
                        work_summary['duplicates'] += 1
                        continue
                    else:
                        used_sequences.add(entry.sequence)
                work_summary['found'] += 1
                entry.print(output)
            else:
                work_summary['discarded'] += 1
                if options.d:
                    entry.print(discarded)

    if options.d:
        discarded.close()

    for parm, count in work_summary.items():
        print('%s ==> %d' % (parm, count))


if __name__ == "__main__":
    main()