Mercurial > repos > iuc > ngsutils_bam_filter
view ngsutils/support/__init__.py @ 0:4e4e4093d65d draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ngsutils commit 09194687c74a424732f8b0c017cbb942aad89068
author | iuc |
---|---|
date | Wed, 11 Nov 2015 13:04:07 -0500 |
parents | |
children | 7a68005de299 |
line wrap: on
line source
import collections import gzip import os import sys import re try: from eta import ETA except: pass class FASTARead(collections.namedtuple('FASTARecord', 'name comment seq')): def __repr__(self): if self.comment: return '>%s %s\n%s\n' % (self.name, self.comment, self.seq) return '>%s\n%s\n' % (self.name, self.seq) def subseq(self, start, end, comment=None): if self.comment: comment = '%s %s' % (self.comment, comment) return FASTARead(self.name, comment, self.seq[start:end]) def clone(self, name=None, comment=None, seq=None): n = name if name else self.name c = comment if comment else self.comment s = seq if seq else self.seq return FASTARead(n, c, s) def write(self, out): out.write(repr(self)) class FASTA(object): def __init__(self, fname=None, fileobj=None, qual=False): self.fname = fname self.qual = qual if fileobj: self.fileobj = fileobj else: if self.fname == '-': self.fileobj = sys.stdin elif self.fname[-3:] == '.gz' or self.fname[-4:] == '.bgz': self.fileobj = gzip.open(os.path.expanduser(self.fname)) else: self.fileobj = open(os.path.expanduser(self.fname)) if not self.fileobj: raise ValueError("Missing valid filename or fileobj") def close(self): if self.fileobj != sys.stdout: self.fileobj.close() def tell(self): # always relative to uncompressed... return self.fileobj.tell() def seek(self, pos, whence=0): self.fileobj.seek(pos, whence) def fetch(self, quiet=False): name = '' comment = '' seq = '' if not quiet and self.fname and self.fname != '-': eta = ETA(os.stat(self.fname).st_size, fileobj=self.fileobj) else: eta = None for line in self.fileobj: line = line.strip() if not line: continue if line[0] == '#': continue if line[0] == '>': if name and seq: if eta: eta.print_status(extra=name) yield FASTARead(name, comment, seq) spl = re.split(r'[ \t]', line[1:], maxsplit=1) name = spl[0] if len(spl) > 1: comment = spl[1] else: comment = '' seq = '' else: if self.qual: seq = seq + ' ' + line else: seq += line if name and seq: if eta: eta.print_status(extra=name) yield FASTARead(name, comment, seq) if eta: eta.done() def gzip_reader(fname, quiet=False, callback=None, done_callback=None, fileobj=None): if fileobj: f = fileobj elif fname == '-': f = sys.stdin elif fname[-3:] == '.gz' or fname[-4:] == '.bgz': f = gzip.open(os.path.expanduser(fname)) else: f = open(os.path.expanduser(fname)) if quiet or fname == '-': eta = None else: eta = ETA(os.stat(fname).st_size, fileobj=f) for line in f: if eta: if callback: extra = callback() else: extra = '' eta.print_status(extra=extra) yield line if done_callback and done_callback(): break if f != sys.stdin: f.close() if eta: eta.done() class Symbolize(object): 'Converts strings to symbols - basically a cache of strings' def __init__(self): self.__cache = {} def __getitem__(self, k): if not k in self.__cache: self.__cache[k] = k return self.__cache[k] symbols = Symbolize() _compliments = { 'a': 't', 'A': 'T', 'c': 'g', 'C': 'G', 'g': 'c', 'G': 'C', 't': 'a', 'T': 'A', 'n': 'n', 'N': 'N' } def revcomp(seq): ''' >>> revcomp('ATCGatcg') 'cgatCGAT' ''' ret = [] for s in seq: ret.append(_compliments[s]) ret.reverse() return ''.join(ret) class Counts(object): ''' Setup simple binning. Bins are continuous 0->max. Values are added to bins and then means / distributions can be calculated. ''' def __init__(self): self.bins = [] def add(self, val): while len(self.bins) <= val: self.bins.append(0) self.bins[val] += 1 def mean(self): acc = 0 count = 0 for i, val in enumerate(self.bins): acc += (i * val) count += val if count > 0: return float(acc) / count def max(self): return len(self.bins) - 1 def memoize(func): if 'TESTING' in os.environ or 'DEBUG' in os.environ: return func __cache = {} def inner(*args, **kwargs): k = (args, tuple(kwargs.iteritems())) if k not in __cache: __cache[k] = func(*args, **kwargs) return __cache[k] inner.__doc__ = '(@memoized %s)\n%s' % (func.__name__, func.__doc__) return inner def quoted_split(s, delim, quote_char='"'): tokens = [] buf = "" inquote = False for c in s: if inquote: buf += c if c == quote_char: inquote = False elif c == delim: tokens.append(buf) buf = "" else: buf += c if c == quote_char: inquote = True if buf: tokens.append(buf) return tokens