view @ 7:6d94241370cc draft default tip

author schang
date Tue, 02 Dec 2014 21:06:35 -0500
parents 70e76f6bfcfb
line wrap: on
line source

''' Script to parse the reference genome sequence'''

# Features and assumptions:
#  - filenames do not matter; gzip and bzip2 streams are autodetected, and
#    the input format (FASTA or FASTQ) is autodetected

import sys, getopt, gzip, bz2, StringIO

class SequenceInputError(Exception):
    """Used to raise error conditions."""

class Sequence:
    Represents a protein, DNA, or RNA sequence.

    Contains 3 members:
        name: string
        seq: string containing the full, unbroken sequence
        qual: array of ints, one per character in seq (FASTQ only)

    def __init__(self, name, seq, qual=None): = name
        self.seq = seq
        self.qual = qual

    def __repr__(self):
        out = []
        out.append("Name: %s\n" %
        out.append("Seq:  %s" % self.seq)
        if self.qual:
            for q in self.qual:
                out.append("%d " % q)
        return ''.join(out)

def supportCompression(stream):
    Detect if a stream is compressed and modify to handle, if possible.
    no_filename = False

        filename =
    except AttributeError:
        # probably a StringIO object
        no_filename = True

    if filename == '<stdin>':
        stream = StringIO.StringIO(

    pos = stream.tell()
    magic =
    if magic == '\037\213':
        stream = gzip.GzipFile(None, 'rb', None, stream)
    elif magic == 'BZ':
        if no_filename:
            raise SequenceInputError('Cannot handle bzip2 compressed ' +
                                     'StringIO data.')
        if filename == '<stdin>':
            raise SequenceInputError('Cannot handle bzip2 compressed data ' +
                                     'over stdin.')
        stream = bz2.BZ2File(filename, 'rb')

    return stream

def SequenceFile(stream):
    Parses FASTA or FASTQ data, returning Sequence objects.

    This is a generator, so that there is no need to store a full list of
    sequences in memory if the user does not want to.

    f = supportCompression(stream)

    l = f.readline().rstrip()

    # autodetect file format based on first character of line
    if l[0] == '>':
        format_type = 'fasta'
    elif l[0] == '@':
        format_type = 'fastq'
        raise SequenceInputError('Could not determine data type ' +
                                 '(fasta or fastq)')

    # l == '' indicates end-of-file
    while l != '':
        name = l[1:]
        seq_list = []
        qual_list = None

        # Begin parsing
        l = f.readline().rstrip()

        if format_type == 'fasta':
            while l != '' and l[0] != '>':
                l = f.readline().rstrip()
            s = ''.join(seq_list)

        elif format_type == 'fastq':
            # read in the sequence
            while l != '' and l[0] != '+':
                l = f.readline().rstrip()
            s = ''.join(seq_list)

            # check if the second sequence name (if existing) matches the first
            if len(l) > 1:
                if name != l[1:]:
                    raise SequenceInputError('Sequence and quality score ' +
                                             'identifiers do not match ' +
                                             '(\"%s\" != \"%s\")' % (name, l))
            l = f.readline().rstrip()

            # read in the quality scores
            q = ''
            while l != '' and len(q) < len(s):
                q += l
                l = f.readline().rstrip()

            # ensure that the quality and sequence strings are the same length
            if len(q) != len(s):
                raise SequenceInputError('Sequence and quality string ' +
                                         'lengths do not match (\"%s\")' % name)
            qual_list = []
            for c in q:
                qual_list.append(ord(c) - 33)

        yield Sequence(name, s, qual_list)


if __name__ == '__main__':
    seqs = 0
    seqlen = 0

    opts, args = getopt.getopt(sys.argv[1:], 'hp')

    for o, a in opts:
        if o == '-h':
            print 'Usage:'
            print '  %s  <filename>' % sys.argv[0]
            print '  %s  < <filename>' % sys.argv[0]
            print '  cat <filename> | %s [-p]\n' % sys.argv[0]
            print '  <filename> must be in fasta or fastq format, and can be'
            print '  gzipped, or (with the first usage style) bzip2\'d.'

            print 'unhandled option'

    if len(args) > 0:
        f = open(args[0], 'rb')
        f = sys.stdin

        for s in SequenceFile(f):
            #print s  # DEBUG
            seqs += 1
            seqlen += len(s.seq)

        print 'Total sequences found: %d' % seqs
        print 'Total residues found:  %d' % seqlen
    except SequenceInputError as e:
        print >> sys.stderr, '\nFaulty input (record %d): %s' % \
                             (seqs+1, e.args[0])
