Mercurial > repos > jjohnson > ensembl_variant_report
view ensemblref.py @ 4:7fc91849ab21 draft default tip
"planemo upload for repository https://github.com/jj-umn/galaxytools/tree/master/ensembl_variant_report commit 4078a495c5569e9055b4eba33004fe609ee42aff"
author | jjohnson |
---|---|
date | Sat, 25 Jan 2020 15:22:46 -0500 |
parents | 9f4ea174ce3d |
children |
line wrap: on
line source
""" with gtf and twobit get_bed(transcript_id) """ from gtf_to_genes import gene, gene_utilities from twobitreader import TwoBitFile from Bio.Seq import reverse_complement, translate import logging logger = logging.getLogger("test") class EnsemblRef(object): def __init__(self,gtf_file,twobitfile,read_now=True): self.gtf_file = gtf_file self.twobitfile = twobitfile self.twobit = TwoBitFile(self.twobitfile) self.gene_dict = None self.transcript_idx = None self.name_idx = None if read_now: self.get_transcript_idx() def get_gene_dict(self): if self.gene_dict is None: gene_structures = gene.t_parse_gtf('test') self.gene_dict = gene_structures.get_genes(self.gtf_file,logger=logger) return self.gene_dict def get_transcript_idx(self): if self.transcript_idx is None: self.transcript_idx = gene_utilities.index_transcripts(self.get_gene_dict(),by_prot_id=False) return self.transcript_idx def get_name_idx(self): if self.name_idx is None: self.name_idx = dict() for i,t in self.get_transcript_idx().items(): for name in t.gene.names: self.name_idx[name] = t.gene for name in t.names: self.name_idx[name] = t if t.prot_id: self.name_idx[t.prot_id] = t return self.name_idx def get_gtf_transcript(self,name): idx = self.get_transcript_idx() if name in idx: return idx[name] else: nidx = self.get_name_idx() if name in nidx: return nidx[name] return None def transcript_is_coding(self,transcript_id): tx = self.get_transcript_idx()[transcript_id] return len(tx.start_codons) > 0 def get_transcript_start_codon(self,transcript_id): tx = self.get_transcript_idx()[transcript_id] return tx.start_codons[0] if len(tx.start_codons) > 0 else None def get_bed_line(self,transcript_id,score=0,itemRgb='0,0,0',coding=False): tx = self.get_transcript_idx()[transcript_id] chrom = tx.gene.contig chromStart = tx.coding_beg if coding else tx.beg chromEnd = tx.coding_end if coding else tx.end name = transcript_id strand = '+' if tx.gene.strand else '-' thickStart = tx.coding_beg if tx.coding_beg else chromStart thickEnd = tx.coding_end if tx.coding_end else chromEnd exons = tx.get_coding_exons() if coding else tx.get_exons() blockCount = len(exons) if tx.gene.strand: strand = '+' blockSizes = [abs(e-s) for s,e in exons] blockStarts = [s - chromStart for s,e in exons] else: strand = '-' blockSizes = [abs(e-s) for s,e in reversed(exons)] blockStarts = [s - chromStart for s,e in reversed(exons)] blockSizes = ','.join([str(x) for x in blockSizes]) blockStarts = ','.join([str(x) for x in blockStarts]) return '%s\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % (chrom,chromStart,chromEnd,name,score,strand,thickStart,thickEnd,itemRgb,blockCount,blockSizes,blockStarts) def transcripts_in_range(self,chrom,startpos,endpos,strand=None): spos = min(startpos,endpos) if endpos else startpos epos = max(startpos,endpos) if endpos else startpos transcripts = [] for i,t in self.get_transcript_idx().items(): if t.gene.contig == chrom and t.beg <= epos and spos <= t.end: if strand and t.gene.strand != strand: continue transcripts.append(t) return transcripts def genes_in_range(self,chrom,startpos,endpos,strand=None,gene_types=None): spos = min(startpos,endpos) if endpos else startpos epos = max(startpos,endpos) if endpos else startpos gene_dict = self.get_gene_dict() gtypes = set(gene_types) & set(gene_dict.keys()) if gene_types else set(gene_dict.keys()) genes = [] for gt in gtypes: for gene in gene_dict[gt]: if gene.contig == chrom and gene.beg <= epos and spos <= gene.end: if strand and gene.strand != strand: continue genes.append(gene) return genes def get_sequence(self,chrom,start,end): if self.twobit: if chrom in self.twobit: return self.twobit[chrom][start:end] contig = chrom[3:] if chrom.startswith('chr') else 'chr%s' % chrom if contig in self.twobit: return self.twobit[contig][start:end] return None def sequence_sizes(self): return self.twobit.sequence_sizes() def get_transcript_seq(self,transcript_id,coding=False): tx = self.get_transcript_idx()[transcript_id] chrom = tx.gene.contig exonbnds = tx.get_coding_exons() if coding else tx.get_exons() if tx.gene.strand: seqs = [self.get_sequence(chrom,s,e) for s,e in exonbnds] else: seqs = [reverse_complement(self.get_sequence(chrom,s,e)) for s,e in exonbnds] return ''.join(seqs) def get_cdna(self,transcript_id): return self.get_transcript_seq(transcript_id,coding=False) def get_cds(self,transcript_id): return self.get_transcript_seq(transcript_id,coding=True) def genome_to_transcript_pos(self,transcript_id,genome_pos,coding=False): tx = self.get_transcript_idx()[transcript_id] if not tx.beg <= genome_pos < tx.end: return None exonbnds = tx.get_coding_exons() if coding else tx.get_exons() cdna_pos = 0 if tx.gene.strand: for s,e in exonbnds: if s <= genome_pos < e: cdna_pos += genome_pos - s break else: cdna_pos += e - s else: for s,e in exonbnds: if s <= genome_pos < e: cdna_pos += e - genome_pos - 1 break else: cdna_pos += e - s return cdna_pos def genome_to_cdna_pos(self,transcript_id,genome_pos): return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=False) def genome_to_cds_pos(self,transcript_id,genome_pos): return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=True)