Mercurial > repos > galaxyp > filter_by_fasta_ids
comparison tools/filter_by_fasta_ids.py @ 0:463ebeccb854 draft
Uploaded
author | galaxyp |
---|---|
date | Fri, 26 Sep 2014 14:23:16 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:463ebeccb854 |
---|---|
1 #!/usr/bin/env python | |
2 """ A script to build specific fasta databases """ | |
3 from __future__ import print_function | |
4 import sys | |
5 import logging | |
6 | |
7 #===================================== Iterator =============================== | |
8 class Sequence: | |
9 ''' Holds protein sequence information ''' | |
10 def __init__(self): | |
11 self.header = "" | |
12 self.sequence_parts = [] | |
13 | |
14 def get_sequence(self): | |
15 return "".join([line.rstrip().replace('\n','').replace('\r','') for line in self.sequence_parts]) | |
16 | |
17 class FASTAReader: | |
18 """ | |
19 FASTA db iterator. Returns a single FASTA sequence object. | |
20 """ | |
21 def __init__(self, fasta_name): | |
22 self.fasta_file = open(fasta_name) | |
23 self.next_line = self.fasta_file.readline() | |
24 | |
25 def __iter__(self): | |
26 return self | |
27 | |
28 def __next__(self): | |
29 ''' Iteration ''' | |
30 #while True: | |
31 # line = self.fasta_file.readline() | |
32 # if not line: | |
33 # raise StopIteration | |
34 # if line[0] == '>': | |
35 # break | |
36 next_line = self.next_line | |
37 if not next_line: | |
38 raise StopIteration | |
39 | |
40 seq = Sequence() | |
41 seq.header = next_line.rstrip().replace('\n','').replace('\r','') | |
42 | |
43 next_line = self.fasta_file.readline() | |
44 while next_line and next_line[0] != '>': | |
45 #tail = self.fasta_file.tell() | |
46 #line = self.fasta_file.readline() | |
47 #if not line: | |
48 # break | |
49 #if line[0] == '>': | |
50 # self.fasta_file.seek(tail) | |
51 # break | |
52 seq.sequence_parts.append(next_line) | |
53 next_line = self.fasta_file.readline() | |
54 self.next_line = next_line | |
55 return seq | |
56 | |
57 # Python 2/3 compat | |
58 next = __next__ | |
59 #============================================================================== | |
60 | |
61 def target_match(target, search_entry): | |
62 ''' Matches ''' | |
63 search_entry = search_entry.upper() | |
64 for atarget in target: | |
65 if search_entry.find(atarget) > -1: | |
66 return atarget | |
67 return None | |
68 | |
69 | |
70 def main(): | |
71 ''' the main function''' | |
72 logging.basicConfig(filename='filter_fasta_log', | |
73 level=logging.INFO, | |
74 format='%(asctime)s :: %(levelname)s :: %(message)s',) | |
75 | |
76 used_sequences = set() | |
77 work_summary = {'wanted': 0, 'found':0, 'duplicates':0} | |
78 targets = [] | |
79 | |
80 f_target = open(sys.argv[1]) | |
81 for line in f_target.readlines(): | |
82 targets.append(">%s" % line.strip().upper()) | |
83 f_target.close() | |
84 | |
85 logging.info('Read target file and am now looking for %d %s', len(targets), 'sequences.') | |
86 | |
87 work_summary['wanted'] = len(targets) | |
88 homd_db = FASTAReader(sys.argv[2]) | |
89 | |
90 i = 0 | |
91 output = open(sys.argv[3], "w") | |
92 try: | |
93 for entry in homd_db: | |
94 target_matched_results = target_match(targets, entry.header) | |
95 if target_matched_results: | |
96 work_summary['found'] += 1 | |
97 targets.remove(target_matched_results) | |
98 sequence = entry.get_sequence() | |
99 if sequence in used_sequences: | |
100 work_summary['duplicates'] += 1 | |
101 else: | |
102 used_sequences.add(sequence) | |
103 print(entry.header, file=output) | |
104 print(sequence, file=output) | |
105 finally: | |
106 output.close() | |
107 | |
108 logging.info('Completed filtering') | |
109 for parm, count in work_summary.items(): | |
110 logging.info('%s ==> %d', parm, count) | |
111 | |
112 if __name__ == "__main__": | |
113 main() |