diff ngsutils/support/__init__.py @ 0:4e4e4093d65d draft

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ngsutils commit 09194687c74a424732f8b0c017cbb942aad89068
author iuc
date Wed, 11 Nov 2015 13:04:07 -0500
parents
children 7a68005de299
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ngsutils/support/__init__.py	Wed Nov 11 13:04:07 2015 -0500
@@ -0,0 +1,249 @@
+import collections
+import gzip
+import os
+import sys
+import re
+try:
+    from eta import ETA
+except:
+    pass
+
+class FASTARead(collections.namedtuple('FASTARecord', 'name comment seq')):
+    def __repr__(self):
+        if self.comment:
+            return '>%s %s\n%s\n' % (self.name, self.comment, self.seq)
+        return '>%s\n%s\n' % (self.name, self.seq)
+
+    def subseq(self, start, end, comment=None):
+        if self.comment:
+            comment = '%s %s' % (self.comment, comment)
+
+        return FASTARead(self.name, comment, self.seq[start:end])
+
+    def clone(self, name=None, comment=None, seq=None):
+        n = name if name else self.name
+        c = comment if comment else self.comment
+        s = seq if seq else self.seq
+
+        return FASTARead(n, c, s)
+
+    def write(self, out):
+        out.write(repr(self))
+
+
+class FASTA(object):
+    def __init__(self, fname=None, fileobj=None, qual=False):
+        self.fname = fname
+        self.qual = qual
+        if fileobj:
+            self.fileobj = fileobj
+        else:
+            if self.fname == '-':
+                self.fileobj = sys.stdin
+            elif self.fname[-3:] == '.gz' or self.fname[-4:] == '.bgz':
+                self.fileobj = gzip.open(os.path.expanduser(self.fname))
+            else:
+                self.fileobj = open(os.path.expanduser(self.fname))
+
+        if not self.fileobj:
+            raise ValueError("Missing valid filename or fileobj")
+
+    def close(self):
+        if self.fileobj != sys.stdout:
+            self.fileobj.close()
+
+    def tell(self):
+        # always relative to uncompressed...
+        return self.fileobj.tell()
+
+    def seek(self, pos, whence=0):
+        self.fileobj.seek(pos, whence)
+
+    def fetch(self, quiet=False):
+        name = ''
+        comment = ''
+        seq = ''
+
+        if not quiet and self.fname and self.fname != '-':
+            eta = ETA(os.stat(self.fname).st_size, fileobj=self.fileobj)
+        else:
+            eta = None
+
+        for line in self.fileobj:
+            line = line.strip()
+            if not line:
+                continue
+            if line[0] == '#':
+                continue
+
+            if line[0] == '>':
+                if name and seq:
+                    if eta:
+                        eta.print_status(extra=name)
+                    yield FASTARead(name, comment, seq)
+
+                spl = re.split(r'[ \t]', line[1:], maxsplit=1)
+                name = spl[0]
+                if len(spl) > 1:
+                    comment = spl[1]
+                else:
+                    comment = ''
+                seq = ''
+
+            else:
+                if self.qual:
+                    seq = seq + ' ' + line
+                else:
+                    seq += line
+
+        if name and seq:
+            if eta:
+                eta.print_status(extra=name)
+            yield FASTARead(name, comment, seq)
+
+        if eta:
+            eta.done()
+
+
+def gzip_reader(fname, quiet=False, callback=None, done_callback=None, fileobj=None):
+    if fileobj:
+        f = fileobj
+    elif fname == '-':
+        f = sys.stdin
+    elif fname[-3:] == '.gz' or fname[-4:] == '.bgz':
+        f = gzip.open(os.path.expanduser(fname))
+    else:
+        f = open(os.path.expanduser(fname))
+
+    if quiet or fname == '-':
+        eta = None
+    else:
+        eta = ETA(os.stat(fname).st_size, fileobj=f)
+
+    for line in f:
+        if eta:
+            if callback:
+                extra = callback()
+            else:
+                extra = ''
+
+            eta.print_status(extra=extra)
+        yield line
+
+        if done_callback and done_callback():
+                break
+
+    if f != sys.stdin:
+        f.close()
+
+    if eta:
+        eta.done()
+
+
+class Symbolize(object):
+    'Converts strings to symbols - basically a cache of strings'
+    def __init__(self):
+        self.__cache = {}
+
+    def __getitem__(self, k):
+        if not k in self.__cache:
+            self.__cache[k] = k
+
+        return self.__cache[k]
+
+symbols = Symbolize()
+
+_compliments = {
+'a': 't',
+'A': 'T',
+'c': 'g',
+'C': 'G',
+'g': 'c',
+'G': 'C',
+'t': 'a',
+'T': 'A',
+'n': 'n',
+'N': 'N'
+}
+
+
+def revcomp(seq):
+    '''
+    >>> revcomp('ATCGatcg')
+    'cgatCGAT'
+    '''
+    ret = []
+
+    for s in seq:
+        ret.append(_compliments[s])
+
+    ret.reverse()
+    return ''.join(ret)
+
+
+class Counts(object):
+    '''
+    Setup simple binning.  Bins are continuous 0->max.  Values are added to
+    bins and then means / distributions can be calculated.
+    '''
+    def __init__(self):
+        self.bins = []
+
+    def add(self, val):
+        while len(self.bins) <= val:
+            self.bins.append(0)
+        self.bins[val] += 1
+
+    def mean(self):
+        acc = 0
+        count = 0
+
+        for i, val in enumerate(self.bins):
+            acc += (i * val)
+            count += val
+
+        if count > 0:
+            return float(acc) / count
+
+    def max(self):
+        return len(self.bins) - 1
+
+
+def memoize(func):
+    if 'TESTING' in os.environ or 'DEBUG' in os.environ:
+        return func
+
+    __cache = {}
+    def inner(*args, **kwargs):
+        k = (args, tuple(kwargs.iteritems()))
+        if  k not in __cache:
+            __cache[k] = func(*args, **kwargs)
+        return __cache[k]
+
+    inner.__doc__ = '(@memoized %s)\n%s' % (func.__name__, func.__doc__)
+    return inner
+
+
+def quoted_split(s, delim, quote_char='"'):
+    tokens = []
+
+    buf = ""
+    inquote = False
+
+    for c in s:
+        if inquote:
+            buf += c
+            if c == quote_char:
+                inquote = False
+        elif c == delim:
+            tokens.append(buf)
+            buf = ""
+        else:
+            buf += c
+            if c == quote_char:
+                inquote = True
+
+    if buf:
+        tokens.append(buf)
+
+    return tokens