comparison filter_by_an_id.py @ 0:9156a440afed draft default tip

Improved some datatype handling
author galaxyp
date Thu, 20 Jun 2013 11:09:24 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9156a440afed
1 """ A script to build specific fasta databases """
2 import sys
3 import logging
4
5 #===================================== Iterator ===============================
6 class Sequence:
7 ''' Holds protein sequence information '''
8 def __init__(self):
9 self.header = ""
10 self.sequence_parts = []
11
12 def get_sequence(self):
13 return "".join([line.rstrip().replace('\n','').replace('\r','') for line in self.sequence_parts])
14
15 class FASTAReader:
16 """
17 FASTA db iterator. Returns a single FASTA sequence object.
18 """
19 def __init__(self, fasta_name):
20 self.fasta_file = open(fasta_name)
21 self.next_line = self.fasta_file.readline()
22
23 def __iter__(self):
24 return self
25
26 def next(self):
27 ''' Iteration '''
28 #while True:
29 # line = self.fasta_file.readline()
30 # if not line:
31 # raise StopIteration
32 # if line[0] == '>':
33 # break
34 next_line = self.next_line
35 if not next_line:
36 raise StopIteration
37
38 seq = Sequence()
39 seq.header = next_line.rstrip().replace('\n','').replace('\r','')
40
41 next_line = self.fasta_file.readline()
42 while next_line and next_line[0] != '>':
43 #tail = self.fasta_file.tell()
44 #line = self.fasta_file.readline()
45 #if not line:
46 # break
47 #if line[0] == '>':
48 # self.fasta_file.seek(tail)
49 # break
50 seq.sequence_parts.append(next_line)
51 next_line = self.fasta_file.readline()
52 self.next_line = next_line
53 return seq
54 #==============================================================================
55
56 def target_match(target, search_entry):
57 ''' Matches '''
58 search_entry = search_entry.upper()
59 for atarget in target:
60 if search_entry.find(atarget) > -1:
61 return atarget
62 return None
63
64
65 def main():
66 ''' the main function'''
67 logging.basicConfig(filename='filter_fasta_log',
68 level=logging.INFO,
69 format='%(asctime)s :: %(levelname)s :: %(message)s',)
70
71 used_sequences = set()
72 work_summary = {'wanted': 0, 'found':0, 'duplicates':0}
73 targets = []
74
75 f_target = open(sys.argv[1])
76 for line in f_target.readlines():
77 targets.append(">%s" % line.strip().upper())
78 f_target.close()
79
80 logging.info('Read target file and am now looking for %d %s', len(targets), 'sequences.')
81
82 work_summary['wanted'] = len(targets)
83 homd_db = FASTAReader(sys.argv[2])
84
85 i = 0
86 output = open(sys.argv[3], "w")
87 try:
88 for entry in homd_db:
89 target_matched_results = target_match(targets, entry.header)
90 if target_matched_results:
91 work_summary['found'] += 1
92 targets.remove(target_matched_results)
93 sequence = entry.get_sequence()
94 if sequence in used_sequences:
95 work_summary['duplicates'] += 1
96 else:
97 used_sequences.add(sequence)
98 print >>output, entry.header
99 print >>output, sequence
100 finally:
101 output.close()
102
103 logging.info('Completed filtering')
104 for parm, count in work_summary.iteritems():
105 logging.info('%s ==> %d', parm, count)
106
107 if __name__ == "__main__":
108 main()