Mercurial > repos > schang > frp_tool
view refseq_parser.py @ 5:70e76f6bfcfb draft
Uploaded
author | schang |
---|---|
date | Tue, 02 Dec 2014 19:54:12 -0500 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/python ''' Script to parse the reference genome sequence''' # Features and assumptions: # - filenames do not matter; gzip and bzip2 streams are autodetected, and # the input format (FASTA or FASTQ) is autodetected import sys, getopt, gzip, bz2, StringIO class SequenceInputError(Exception): """Used to raise error conditions.""" class Sequence: """ Represents a protein, DNA, or RNA sequence. Contains 3 members: name: string seq: string containing the full, unbroken sequence qual: array of ints, one per character in seq (FASTQ only) """ def __init__(self, name, seq, qual=None): self.name = name self.seq = seq self.qual = qual def __repr__(self): out = [] out.append("Name: %s\n" % self.name) out.append("Seq: %s" % self.seq) if self.qual: out.append("\nQual:\n") for q in self.qual: out.append("%d " % q) return ''.join(out) def supportCompression(stream): """ Detect if a stream is compressed and modify to handle, if possible. """ no_filename = False try: filename = stream.name except AttributeError: # probably a StringIO object no_filename = True if filename == '<stdin>': stream = StringIO.StringIO(sys.stdin.read()) pos = stream.tell() magic = stream.read(2) stream.seek(pos) if magic == '\037\213': stream = gzip.GzipFile(None, 'rb', None, stream) elif magic == 'BZ': if no_filename: raise SequenceInputError('Cannot handle bzip2 compressed ' + 'StringIO data.') if filename == '<stdin>': raise SequenceInputError('Cannot handle bzip2 compressed data ' + 'over stdin.') stream = bz2.BZ2File(filename, 'rb') return stream def SequenceFile(stream): """ Parses FASTA or FASTQ data, returning Sequence objects. This is a generator, so that there is no need to store a full list of sequences in memory if the user does not want to. """ f = supportCompression(stream) l = f.readline().rstrip() # autodetect file format based on first character of line if l[0] == '>': format_type = 'fasta' elif l[0] == '@': format_type = 'fastq' else: raise SequenceInputError('Could not determine data type ' + '(fasta or fastq)') # l == '' indicates end-of-file while l != '': name = l[1:] seq_list = [] qual_list = None # Begin parsing l = f.readline().rstrip() if format_type == 'fasta': while l != '' and l[0] != '>': seq_list.append(l) l = f.readline().rstrip() s = ''.join(seq_list) elif format_type == 'fastq': # read in the sequence while l != '' and l[0] != '+': seq_list.append(l) l = f.readline().rstrip() s = ''.join(seq_list) # check if the second sequence name (if existing) matches the first if len(l) > 1: if name != l[1:]: raise SequenceInputError('Sequence and quality score ' + 'identifiers do not match ' + '(\"%s\" != \"%s\")' % (name, l)) l = f.readline().rstrip() # read in the quality scores q = '' while l != '' and len(q) < len(s): q += l l = f.readline().rstrip() # ensure that the quality and sequence strings are the same length if len(q) != len(s): raise SequenceInputError('Sequence and quality string ' + 'lengths do not match (\"%s\")' % name) qual_list = [] for c in q: qual_list.append(ord(c) - 33) yield Sequence(name, s, qual_list) f.close() if __name__ == '__main__': seqs = 0 seqlen = 0 opts, args = getopt.getopt(sys.argv[1:], 'hp') for o, a in opts: if o == '-h': print 'Usage:' print ' %s <filename>' % sys.argv[0] print ' %s < <filename>' % sys.argv[0] print ' cat <filename> | %s [-p]\n' % sys.argv[0] print ' <filename> must be in fasta or fastq format, and can be' print ' gzipped, or (with the first usage style) bzip2\'d.' sys.exit(0) else: print 'unhandled option' sys.exit(1) if len(args) > 0: f = open(args[0], 'rb') else: f = sys.stdin try: for s in SequenceFile(f): #print s # DEBUG seqs += 1 seqlen += len(s.seq) print 'Total sequences found: %d' % seqs print 'Total residues found: %d' % seqlen except SequenceInputError as e: print >> sys.stderr, '\nFaulty input (record %d): %s' % \ (seqs+1, e.args[0]) f.close()