Mercurial > repos > iuc > ngsutils_bam_filter
diff ngsutils/support/regions.py @ 0:4e4e4093d65d draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ngsutils commit 09194687c74a424732f8b0c017cbb942aad89068
author | iuc |
---|---|
date | Wed, 11 Nov 2015 13:04:07 -0500 |
parents | |
children | 7a68005de299 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ngsutils/support/regions.py Wed Nov 11 13:04:07 2015 -0500 @@ -0,0 +1,166 @@ +class RangeMatch(object): + ''' + Simple genomic ranges. You can define chrom:start-end ranges, then ask if a + particular genomic coordinate maps to any of those ranges. This is less- + efficient than an R-Tree, but easier to code. + ''' + def __init__(self, name): + self.ranges = {} + self.name = name + + def add_range(self, chrom, strand, start, end): + if not chrom in self.ranges: + self.ranges[chrom] = {} + + bin = start / 100000 + if not bin in self.ranges[chrom]: + self.ranges[chrom][bin] = [] + self.ranges[chrom][bin].insert(0, (start, end, strand)) + + if (end / 100000) != bin: + for bin in xrange(bin + 1, (end / 100000) + 1): + if not bin in self.ranges[chrom]: + self.ranges[chrom][bin] = [] + self.ranges[chrom][bin].insert(0, (start, end, strand)) + + def get_tag(self, chrom, strand, pos, ignore_strand=False): + ''' + returns (region, is_reverse_orientation) + ''' + if not chrom in self.ranges: + return None, False + bin = pos / 100000 + if not bin in self.ranges[chrom]: + return None, False + for start, end, r_strand in self.ranges[chrom][bin]: + if pos >= start and pos <= end: + if ignore_strand or strand == r_strand: + return self.name, False + return self.name, True + return None, False + + +class RegionTagger(object): + def __init__(self, gtf, valid_chroms=None, only_first_fragment=True): + self.regions = [] + self.counts = {} + self.only_first_fragment = only_first_fragment + + coding = RangeMatch('coding') + exons = RangeMatch('other-exon') + utr_5 = RangeMatch('utr-5') + utr_3 = RangeMatch('utr-3') + introns = RangeMatch('intron') + promoters = RangeMatch('promoter') + + for gene in gtf.genes: + if valid_chroms and not gene.chrom in valid_chroms: + continue + if gene.strand == '+': + promoters.add_range(gene.chrom, gene.strand, gene.start - 2000, gene.start) + else: + promoters.add_range(gene.chrom, gene.strand, gene.end, gene.end + 2000) + + for transcript in gene.transcripts: + if transcript.has_cds: + for start, end in transcript.cds: + coding.add_range(gene.chrom, gene.strand, start, end) + + # TODO: Fix this so that it iterates over exons in the 5'/3' UTRS + for s, e in transcript.utr_5: + utr_5.add_range(gene.chrom, gene.strand, s, e) + for s, e in transcript.utr_3: + utr_3.add_range(gene.chrom, gene.strand, s, e) + + last_end = None + for start, end in transcript.exons: + if last_end: + introns.add_range(gene.chrom, gene.strand, last_end, start) + exons.add_range(gene.chrom, gene.strand, start, end) + last_end = end + + + self.regions.append(coding) + self.regions.append(utr_5) + self.regions.append(utr_3) + self.regions.append(exons) + self.regions.append(introns) + self.regions.append(promoters) + + self.counts['coding'] = 0 + self.counts['coding-rev'] = 0 + self.counts['other-exon'] = 0 + self.counts['utr-5'] = 0 + self.counts['utr-3'] = 0 + self.counts['utr-5-rev'] = 0 + self.counts['utr-3-rev'] = 0 + self.counts['intron'] = 0 + self.counts['promoter'] = 0 + self.counts['other-exon-rev'] = 0 + self.counts['intron-rev'] = 0 + self.counts['promoter-rev'] = 0 + self.counts['junction'] = 0 + self.counts['intergenic'] = 0 + self.counts['mitochondrial'] = 0 + + def add_read(self, read, chrom): + if read.is_unmapped: + return + + if self.only_first_fragment and read.is_paired and not read.is_read1: + return + + tag = None + is_rev = False + + strand = '-' if read.is_reverse else '+' + + if chrom == 'chrM': + tag = 'mitochondrial' + + if not tag: + for op, length in read.cigar: + if op == 3: + tag = 'junction' + break + + if not tag: + for region in self.regions: + tag, is_rev = region.get_tag(chrom, strand, read.pos) + if tag: + break + + if not tag: + tag = 'intergenic' + + if tag: + if is_rev: + self.counts['%s-rev' % tag] += 1 + else: + self.counts[tag] += 1 + + return tag + + def tag_region(self, chrom, start, end, strand): + tag = None + is_rev = False + + if chrom == 'chrM' or chrom == 'M': + tag = 'mitochondrial' + + if not tag: + for region in self.regions: + tag, is_rev = region.get_tag(chrom, strand, start) + if is_rev: + tag = '%s-rev' % tag + + if start != end: + endtag, is_rev = region.get_tag(chrom, strand, end) + if is_rev: + endtag = '%s-rev' % endtag + + if tag and endtag and endtag != tag: + tag = '%s/%s' % (tag, endtag) + + if not tag: + tag = 'intergenic' \ No newline at end of file