diff ensemblref.py @ 0:9f4ea174ce3d draft

planemo upload for repository https://github.com/jj-umn/galaxytools/tree/master/ensembl_variant_report commit e6aa05bbbee3cc7d98f16354fc41c674f439ff1b-dirty
author jjohnson
date Thu, 14 Jun 2018 17:51:39 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ensemblref.py	Thu Jun 14 17:51:39 2018 -0400
@@ -0,0 +1,185 @@
+"""
+with gtf and twobit
+get_bed(transcript_id)  
+"""
+from gtf_to_genes import gene, gene_utilities
+from twobitreader import TwoBitFile
+from Bio.Seq import reverse_complement, translate
+import logging
+logger = logging.getLogger("test")
+
+class EnsemblRef(object):
+
+    def __init__(self,gtf_file,twobitfile,read_now=True):
+        self.gtf_file = gtf_file
+        self.twobitfile = twobitfile
+        self.twobit = TwoBitFile(self.twobitfile)
+        self.gene_dict = None
+        self.transcript_idx = None
+        self.name_idx = None
+        if read_now:
+            self.get_transcript_idx()
+        
+
+    def get_gene_dict(self):
+        if self.gene_dict is None:
+            gene_structures = gene.t_parse_gtf('test')
+            self.gene_dict = gene_structures.get_genes(self.gtf_file,logger=logger)
+        return self.gene_dict
+
+
+    def get_transcript_idx(self):
+        if self.transcript_idx is None:
+            self.transcript_idx = gene_utilities.index_transcripts(self.get_gene_dict(),by_prot_id=False)
+        return self.transcript_idx
+
+
+    def get_name_idx(self):
+        if self.name_idx is None:
+            self.name_idx = dict()
+            for i,t in self.get_transcript_idx().items():
+                for name in t.gene.names:
+                   self.name_idx[name] = t.gene
+                for name in t.names:
+                   self.name_idx[name] = t
+                if t.prot_id:
+                   self.name_idx[t.prot_id] = t
+        return self.name_idx
+
+
+    def get_gtf_transcript(self,name):
+        idx = self.get_transcript_idx()
+        if name in idx:
+            return idx[name]
+        else:
+            nidx = self.get_name_idx()
+            if name in nidx:
+                return nidx[name]
+        return None
+
+
+    def transcript_is_coding(self,transcript_id):
+        tx = self.get_transcript_idx()[transcript_id]
+        return len(tx.start_codons) > 0
+
+
+    def get_transcript_start_codon(self,transcript_id):
+        tx = self.get_transcript_idx()[transcript_id]
+        return tx.start_codons[0] if len(tx.start_codons) > 0 else None
+
+
+    def get_bed_line(self,transcript_id,score=0,itemRgb='0,0,0',coding=False):
+        tx = self.get_transcript_idx()[transcript_id]
+        chrom = tx.gene.contig
+        chromStart = tx.coding_beg if coding else tx.beg 
+        chromEnd = tx.coding_end if coding else tx.end
+        name = transcript_id
+        strand = '+' if tx.gene.strand else '-'
+        thickStart = tx.coding_beg if tx.coding_beg else chromStart
+        thickEnd = tx.coding_end if tx.coding_end else chromEnd
+        exons = tx.get_coding_exons() if coding else tx.get_exons()
+        blockCount = len(exons)
+        if tx.gene.strand:
+            strand = '+'
+            blockSizes = [abs(e-s) for s,e in exons]
+            blockStarts = [s - chromStart for s,e in exons]
+        else:
+            strand = '-'
+            blockSizes = [abs(e-s) for s,e in reversed(exons)]
+            blockStarts = [s - chromStart for s,e in reversed(exons)]
+        blockSizes = ','.join([str(x) for x in blockSizes])
+        blockStarts = ','.join([str(x) for x in blockStarts])
+        return '%s\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % (chrom,chromStart,chromEnd,name,score,strand,thickStart,thickEnd,itemRgb,blockCount,blockSizes,blockStarts)
+
+
+    def transcripts_in_range(self,chrom,startpos,endpos,strand=None):
+        spos = min(startpos,endpos) if endpos else startpos
+        epos = max(startpos,endpos) if endpos else startpos
+        transcripts = []
+        for i,t in self.get_transcript_idx().items():
+            if t.gene.contig == chrom and t.beg <= epos and spos <= t.end:
+                if strand and t.gene.strand != strand:
+                    continue
+                transcripts.append(t)
+        return transcripts
+
+
+    def genes_in_range(self,chrom,startpos,endpos,strand=None,gene_types=None):
+        spos = min(startpos,endpos) if endpos else startpos
+        epos = max(startpos,endpos) if endpos else startpos
+        gene_dict = self.get_gene_dict()
+        gtypes = set(gene_types)  & set(gene_dict.keys()) if gene_types else set(gene_dict.keys())
+        genes = []
+        for gt in gtypes:
+            for gene in gene_dict[gt]:
+                if gene.contig == chrom and gene.beg <= epos and spos <= gene.end:
+                    if strand and gene.strand != strand:
+                        continue
+                    genes.append(gene)
+        return genes
+
+
+    def get_sequence(self,chrom,start,end):
+        if self.twobit:
+            if chrom in self.twobit:
+                return self.twobit[chrom][start:end]
+            contig = chrom[3:] if chrom.startswith('chr') else 'chr%s' % chrom
+            if contig in self.twobit:
+                return self.twobit[contig][start:end]
+        return None
+
+
+    def sequence_sizes(self):
+       return self.twobit.sequence_sizes()
+
+
+    def get_transcript_seq(self,transcript_id,coding=False):
+        tx = self.get_transcript_idx()[transcript_id]
+        chrom = tx.gene.contig
+        exonbnds = tx.get_coding_exons() if coding else tx.get_exons()
+        if tx.gene.strand:
+            seqs = [self.get_sequence(chrom,s,e) for s,e in exonbnds]
+        else:
+            seqs = [reverse_complement(self.get_sequence(chrom,s,e)) for s,e in exonbnds]
+        return ''.join(seqs)
+
+
+    def get_cdna(self,transcript_id):
+        return self.get_transcript_seq(transcript_id,coding=False)
+
+
+    def get_cds(self,transcript_id):
+        return self.get_transcript_seq(transcript_id,coding=True)
+
+
+    def genome_to_transcript_pos(self,transcript_id,genome_pos,coding=False):
+        tx = self.get_transcript_idx()[transcript_id]
+        if not tx.beg <= genome_pos < tx.end:
+            return None
+        exonbnds = tx.get_coding_exons() if coding else tx.get_exons()
+        cdna_pos = 0
+        if tx.gene.strand:
+            for s,e in exonbnds:
+                if s <= genome_pos < e:
+                    cdna_pos += genome_pos - s
+                    break
+                else:
+                    cdna_pos += e - s
+        else:
+            for s,e in exonbnds:
+                if s <= genome_pos < e:
+                    cdna_pos += e - genome_pos - 1
+                    break
+                else:
+                    cdna_pos += e - s
+        return cdna_pos
+
+
+    def genome_to_cdna_pos(self,transcript_id,genome_pos):
+        return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=False)
+
+
+    def genome_to_cds_pos(self,transcript_id,genome_pos):
+        return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=True)
+
+