Mercurial > repos > galaxyp > utilities
comparison filter_by_an_id.py @ 0:9156a440afed draft default tip
Improved some datatype handling
author | galaxyp |
---|---|
date | Thu, 20 Jun 2013 11:09:24 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9156a440afed |
---|---|
1 """ A script to build specific fasta databases """ | |
2 import sys | |
3 import logging | |
4 | |
5 #===================================== Iterator =============================== | |
6 class Sequence: | |
7 ''' Holds protein sequence information ''' | |
8 def __init__(self): | |
9 self.header = "" | |
10 self.sequence_parts = [] | |
11 | |
12 def get_sequence(self): | |
13 return "".join([line.rstrip().replace('\n','').replace('\r','') for line in self.sequence_parts]) | |
14 | |
15 class FASTAReader: | |
16 """ | |
17 FASTA db iterator. Returns a single FASTA sequence object. | |
18 """ | |
19 def __init__(self, fasta_name): | |
20 self.fasta_file = open(fasta_name) | |
21 self.next_line = self.fasta_file.readline() | |
22 | |
23 def __iter__(self): | |
24 return self | |
25 | |
26 def next(self): | |
27 ''' Iteration ''' | |
28 #while True: | |
29 # line = self.fasta_file.readline() | |
30 # if not line: | |
31 # raise StopIteration | |
32 # if line[0] == '>': | |
33 # break | |
34 next_line = self.next_line | |
35 if not next_line: | |
36 raise StopIteration | |
37 | |
38 seq = Sequence() | |
39 seq.header = next_line.rstrip().replace('\n','').replace('\r','') | |
40 | |
41 next_line = self.fasta_file.readline() | |
42 while next_line and next_line[0] != '>': | |
43 #tail = self.fasta_file.tell() | |
44 #line = self.fasta_file.readline() | |
45 #if not line: | |
46 # break | |
47 #if line[0] == '>': | |
48 # self.fasta_file.seek(tail) | |
49 # break | |
50 seq.sequence_parts.append(next_line) | |
51 next_line = self.fasta_file.readline() | |
52 self.next_line = next_line | |
53 return seq | |
54 #============================================================================== | |
55 | |
56 def target_match(target, search_entry): | |
57 ''' Matches ''' | |
58 search_entry = search_entry.upper() | |
59 for atarget in target: | |
60 if search_entry.find(atarget) > -1: | |
61 return atarget | |
62 return None | |
63 | |
64 | |
65 def main(): | |
66 ''' the main function''' | |
67 logging.basicConfig(filename='filter_fasta_log', | |
68 level=logging.INFO, | |
69 format='%(asctime)s :: %(levelname)s :: %(message)s',) | |
70 | |
71 used_sequences = set() | |
72 work_summary = {'wanted': 0, 'found':0, 'duplicates':0} | |
73 targets = [] | |
74 | |
75 f_target = open(sys.argv[1]) | |
76 for line in f_target.readlines(): | |
77 targets.append(">%s" % line.strip().upper()) | |
78 f_target.close() | |
79 | |
80 logging.info('Read target file and am now looking for %d %s', len(targets), 'sequences.') | |
81 | |
82 work_summary['wanted'] = len(targets) | |
83 homd_db = FASTAReader(sys.argv[2]) | |
84 | |
85 i = 0 | |
86 output = open(sys.argv[3], "w") | |
87 try: | |
88 for entry in homd_db: | |
89 target_matched_results = target_match(targets, entry.header) | |
90 if target_matched_results: | |
91 work_summary['found'] += 1 | |
92 targets.remove(target_matched_results) | |
93 sequence = entry.get_sequence() | |
94 if sequence in used_sequences: | |
95 work_summary['duplicates'] += 1 | |
96 else: | |
97 used_sequences.add(sequence) | |
98 print >>output, entry.header | |
99 print >>output, sequence | |
100 finally: | |
101 output.close() | |
102 | |
103 logging.info('Completed filtering') | |
104 for parm, count in work_summary.iteritems(): | |
105 logging.info('%s ==> %d', parm, count) | |
106 | |
107 if __name__ == "__main__": | |
108 main() |