view filter_by_an_id.py @ 0:9156a440afed draft default tip

Improved some datatype handling
author galaxyp
date Thu, 20 Jun 2013 11:09:24 -0400
parents
children
line wrap: on
line source

""" A script to build specific fasta databases """
import sys
import logging

#===================================== Iterator ===============================
class Sequence:
    ''' Holds protein sequence information '''
    def __init__(self):
        self.header = ""
        self.sequence_parts = []

    def get_sequence(self):
        return "".join([line.rstrip().replace('\n','').replace('\r','') for line in self.sequence_parts])
   
class FASTAReader:
    """
        FASTA db iterator. Returns a single FASTA sequence object.
    """
    def __init__(self, fasta_name):
        self.fasta_file = open(fasta_name)
        self.next_line = self.fasta_file.readline()
        
    def __iter__(self):
        return self
        
    def next(self):
        ''' Iteration '''
        #while True:
        #    line = self.fasta_file.readline()
        #    if not line:
        #        raise StopIteration
        #    if line[0] == '>':
        #        break
        next_line = self.next_line
        if not next_line:
            raise StopIteration
        
        seq = Sequence()
        seq.header = next_line.rstrip().replace('\n','').replace('\r','')

        next_line = self.fasta_file.readline()
        while next_line and next_line[0] != '>':
            #tail = self.fasta_file.tell()
            #line = self.fasta_file.readline()
            #if not line:
            #    break
            #if line[0] == '>':
            #    self.fasta_file.seek(tail)
            #    break
            seq.sequence_parts.append(next_line)
            next_line = self.fasta_file.readline()
        self.next_line = next_line
        return seq
#==============================================================================

def target_match(target, search_entry):
    ''' Matches '''
    search_entry = search_entry.upper()
    for atarget in target:
        if search_entry.find(atarget) > -1:
            return atarget
    return None
       

def main():
    ''' the main function'''
    logging.basicConfig(filename='filter_fasta_log', 
        level=logging.INFO,
        format='%(asctime)s :: %(levelname)s :: %(message)s',)

    used_sequences = set()
    work_summary = {'wanted': 0, 'found':0, 'duplicates':0}
    targets = []

    f_target = open(sys.argv[1])
    for line in f_target.readlines():
        targets.append(">%s" % line.strip().upper())
    f_target.close()

    logging.info('Read target file and am now looking for %d %s', len(targets), 'sequences.') 

    work_summary['wanted'] = len(targets)
    homd_db = FASTAReader(sys.argv[2])
    
    i = 0
    output = open(sys.argv[3], "w")
    try:
        for entry in homd_db:
            target_matched_results = target_match(targets, entry.header)
            if target_matched_results:
                work_summary['found'] += 1
                targets.remove(target_matched_results)
                sequence = entry.get_sequence()
                if sequence in used_sequences:
                    work_summary['duplicates'] += 1
                else:
                    used_sequences.add(sequence)
                    print >>output, entry.header
                    print >>output, sequence
    finally:
        output.close()
        
    logging.info('Completed filtering')
    for parm, count in work_summary.iteritems():
        logging.info('%s ==> %d', parm, count)

if __name__ == "__main__":
    main()