Mercurial > repos > yutaka-saito > bsfcall
view bin/bsfcall.py @ 2:f274c166e738 default tip
remove comments in bsfcall_wrapper.xml
author | yutaka-saito |
---|---|
date | Sun, 19 Apr 2015 23:02:04 +0900 |
parents | 06f8460885ff |
children |
line wrap: on
line source
#!/usr/bin/env python """ Bisulfighter::bsf-call Bisulfighter (http://epigenome.cbrc.jp/bisulfighter) by National Institute of Advanced Industrial Science and Technology (AIST) is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License. http://creativecommons.org/licenses/by-nc-sa/3.0/ """ __version__= "1.3" import sys import os import glob import threading import subprocess import Queue from datetime import datetime import hashlib from string import maketrans import gzip import bz2 import zipfile import logging from time import sleep from shutil import copy import re # import pysam class BsfCallBase(object): """ base class for BsfCall, LastExecutor, McDetector. define functions that is used in each sub classes. """ def splitFilePath(self, filePath): """ split file path to directory path, file name (with file extension), file name (without file extension) and file extension. if extension of filePath is '.gz', '.gz' extension is ignored. """ dir_name, file_name = os.path.split(filePath) base_name, ext = os.path.splitext(file_name) prog = None if (ext == '.gz' or ext == '.gzip' or ext == '.bz2' or ext == '.bzip2' or ext == '.zip'): prog = ext[1:] base_name, ext = os.path.splitext(base_name) if len(ext) > 1: ext = ext[1:] return (dir_name, file_name, base_name, ext, prog) def readNameByReadFile(self, readFilePath): """ get read name by read file path. """ dir_name, file_name, read_name, ext, prog = self.splitFilePath(readFilePath) return read_name def secondReadFilePathByFirstReadFilePath(self, readFile, secondReadType = None): """ get second read file path by first read file path. if first read file path is '/path/to/read_file_1.fastq' second read file path is '/path/to/read_file_2.fastq' if secondReadType is specified, the extension of second read file is its value. """ fpath = "" dir_name, file_name, basename, ext, prog = self.splitFilePath(readFile) if secondReadType: ext = ".%s" % secondReadType if prog is not None: fpath = "%s/%s2%s.%s" % (dir_name, basename[0:-1], ext, prog) else: fpath = "%s/%s2%s" % (dir_name, basename[0:-1], ext) return fpath def pairedEndReadNumbers(self): return (1, 2) def clearGap(self, seq): return seq.replace("-", "") def complementStartPosition(self, genomeLen, subseqStart, subseqLen): return genomeLen - subseqStart - subseqLen def complementSeq(self, seq): return seq.translate(maketrans("ATGCatgc", "TACGtacg"))[::-1] def mcContextType(self, genomeSeq, cBasePos, strand='+'): """ get mC context type (CG, CHG, CHH) by genome sequence and C base position. if no mC context found, return None. """ try: if strand == '+': if genomeSeq[cBasePos + 1] == "G": return "CG" else: if genomeSeq[cBasePos + 2] == "G": return "CHG" else: return "CHH" elif strand == '-': if genomeSeq[cBasePos - 1] == "C": return "CG" else: if genomeSeq[cBasePos - 2] == "C": return "CHG" else: return "CHH" return None except IndexError: return None def chrSort(self, a, b): return cmp(a, b) def strands(self): return ("+", "-") def mcContextTypes(self): return ("CG", "CHG", "CHH") def bzip2File(self, filePath, wait = True, log = False): """ bzip2 file. If wait argument is False, without waiting for bzip2 process to be completed, this function returns immediately. """ if log: logging.info("bzip2 start: %s" % filePath) dirpath, fname = os.path.split(filePath) cmd = "bzip2 %s" % fname p = subprocess.Popen(cmd, shell = True, cwd = dirpath) if wait: p.wait() if log: logging.info("bzip2 done: %s" % filePath) def isGzipFile(self, filePath): return filePath[-3:] == ".gz" or filePath[-5:] == ".gzip" def isBzip2File(self, filePath): return filePath[-4:] == ".bz2" or filePath[-6:] == ".bzip2" def isZipFile(self, filePath): return filePath[-4:] == ".zip" def isMafFile(self, filePath): f = open(filePath, "rb") data = f.read(1) f.close() if re.match("[\x20-\x7E]", data): f = open(filePath, "r") first_line = f.readline() if first_line[0:5] != "track" and first_line[0] != "#" and first_line[0:2] != "a ": f.close() return False if first_line[0:2] == "a ": cnt = 2 else: cnt = 1 while True: line = f.readline() if line == "": break if line[0] == "#" or line.strip() == "": continue if cnt == 1 and line[0:2] != "a ": f.close() return False if cnt == 2 and line[0:2] != "s ": f.close() return False if cnt == 3: f.close() if line[0:2] == "s ": return True else: return False cnt += 1 f.close() return False else: return False def isBamFile(self, filePath): bgzf_magic = b"\x1f\x8b\x08\x04" f = open(filePath, "rb") data = f.read(4) f.close() return data == bgzf_magic def isSamFile(self, filePath): f = open(filePath, "rb") data = f.read(1) if data == "@": tag = f.read(2) if tag == "HD" or tag == "SQ" or tag == "RG" or tag == "CO": f.close() return True f.seek(0) data = f.read(1) f.close() if re.match("[\x20-\x7E]", data): f = open(filePath, "r") line = f.readline() f.close() return len(line.split("\t")) > 10 else: return False def scriptDir(self): return os.path.dirname(os.path.abspath(sys.argv[0])) def chrnoFromFastaDescription(self, description): """ get chromosome number by fasta description line. """ return description.strip()[1:].strip() def chrsByRefGenome(self, refGenome): """ get all chromosome numbers that is included in the reference genome. """ chrs = [] for line in open(refGenome, "r"): line = line.strip() if line[0] == ">": chrs.append(self.chrnoFromFastaDescription(line)) return chrs def readRefGenome(self, refGenome, refGenomeBuf, refGenomeChr): """ read the reference genome fasta file. """ logging.info("BsfCallBase::readRefGenome: %s" % refGenome) chr = None buf = [] fin = open(refGenome, 'r') for line in fin: if line[0] == '>': chr = self.chrnoFromFastaDescription(line) logging.info("BsfCallBase::readRefGenome: chr=%s" % chr) if len(buf) > 0: refGenomeBuf[refGenomeChr[-1]]=''.join(buf) del buf[:] refGenomeChr.append(chr) elif chr != None: buf.append(line.strip().upper()) else: logging.fatal("BsfCallBase::readRefGenome: the specified reference genome file \"%s\" is malformed." % refGenome) fin.close() refGenomeBuf[refGenomeChr[-1]]=''.join(buf) logging.info("BsfCallBase::readRefGenome: done.") return def lastalOpts(self, lastOpt): """ get options for lastal by bsf-call --last option """ return " ".join(lastOpt.split(",")) def mergeOpts(self): return "" def filterOpts(self, mismapProb, scoreThres, isPairedEnd): """ get filtering option. this option is specified to last-map-probs or last-pair-probs. """ option = "" if isPairedEnd: option = "-m%s" % str(mismapProb) else: option = "-s%d -m%s" % (scoreThres, str(mismapProb)) return option def isPairedEnd(self, readAttr): return self.pairedEndReadNumbers()[1] in readAttr def jobIdByQsubOutput(self, qsubOutput): """ get job id submitted by qsub command and its output. """ fields = qsubOutput.strip().split() return fields[2] def waitForSubmitJobs(self, jobIds, checkInterval = 10): """ wait for all jobs that have been submitted with qsub command to finish. """ error_jobs = [] while True: all_done = True qstat = os.popen("qstat") for line in qstat: fields = line.strip().split() if fields[0] in jobIds: if fields[4].find('E') > 0: if fields[0] not in error_jobs: logging.fatal("Error has occurred: Job ID=%s" % fields[0]) error_jobs.append(fields[0]) else: all_done = False break qstat.close() if all_done: break else: sleep(checkInterval) return def logJobSubmit(self, msg, jobId, cmd = None): logging.info("Submit job: %s --> Job ID = %s" % (msg, jobId)) if cmd: logging.info(cmd) def bamMapq2Mismap(self, mapq): return pow(0.1, (float(mapq) / 10)) def getAllMappingResultFiles(self, resultDirs): mapping_result_files = [] for result_dir in resultDirs: for root, dirs, files in os.walk(result_dir): for filename in files: logging.info("McDetector::getAllMappingResultFiles: %s" % filename) mapping_result_file = os.path.join(root, filename) mapping_result_files.append(mapping_result_file) return mapping_result_files class BsfCall(BsfCallBase): """ class to execute bsf-call process. """ def __init__(self, refGenome, readFilePaths, cmdOpts): """ constructor of BsfCall """ self.refGenome = refGenome self.readFilePaths = readFilePaths self.reads = [] self.opts = cmdOpts self.dataDir = None self.genomeDir = None self.mcContextDir = None self.readInFh1 = None self.readInFh2 = None self.numReads = {1: 0, 2: 0} self.mappingResultDirs = [] self.mappingResultFiles = [] self.setDataDir() self.setLogger() logging.info("bsf-call start.") self.logOption() # self.numReadsPerFile = self.sizeForSplitRead(self.opts["split_read_size"]) if self.opts["mapping_dir"]: self.opts["only_mcdetection"] = True else: self.opts["only_mcdetection"] = False def execute(self): """ execute bsf-call process. """ try: if self.opts["mapping_dir"]: # Only mc detection self.mappingResultDirs = self.opts["mapping_dir"].split(",") self.mappingResultFiles = self.getAllMappingResultFiles(self.mappingResultDirs) self.opts["mapping_result_files"] = self.mappingResultFiles else: self.makeIndexFile() self.prepareForReads() self.mappingResultDirs = self.processMapping() logging.debug("BsfCall:execute: mapping result directories are: %s" % ','.join(self.mappingResultDirs)) self.processMcDetection(self.mappingResultDirs, self.opts["local_dir"]) logging.info("bsf-call done.") except: logging.exception("Exception has occurred.") def processMapping(self): """ run read mapping and filtering process. """ logging.info("Mapping and filtering process start.") result_dirs = [] for read_attr in self.reads: self.runLast(read_attr) result_dirs.append(read_attr["results_dir"]) logging.info("Mapping and filtering process done.") return result_dirs def processMcDetection(self, resultDirs, localDir = None): """ run mC detection process. """ logging.info("mC detection process start.") mc_detector = McDetector(self.refGenome, resultDirs, self.mcContextDir, self.opts) mc_detector.execute(self.opts["output"], self.opts["num_threads"]) logging.info("mC detection process done.") def setDataDir(self): """ create directries to store the files of the bsf-call process. """ if self.opts["work_dir"]: self.dataDir = self.opts["work_dir"] else: self.dataDir = self.autoWorkDir() self.mcContextDir = "%s/mc_contexts" % self.dataDir if not os.path.exists(self.dataDir): os.makedirs(self.dataDir) if not os.path.exists(self.mcContextDir): os.makedirs(self.mcContextDir) def setLogger(self): """ create logger to store the logs of the bsf-call process. """ log_level = logging.INFO # log_level = logging.DEBUG log_file = "%s/bsf-call.log" % self.dataDir file_logger = logging.FileHandler(filename=log_file) file_logger.setLevel(log_level) file_logger.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) logging.getLogger().addHandler(file_logger) logging.getLogger().setLevel(log_level) def logOption(self): """ output bsf-call option values and arguments to the log. """ if self.opts["mapping_dir"]: logging.info("Mapping result directory is specified. Only mC detection is executed.") logging.info(" Mapping result directory: %s" % self.opts["mapping_dir"]) logging.info(" Reference genome: %s" % self.refGenome) # logging.info(" Read BAM file: %s" % ("Yes" if self.opts["read_bam"] else "No")) # logging.info(" Read SAM file: %s" % ("Yes" if self.opts["read_sam"] else "No")) else: logging.info("Reference genome: %s" % self.refGenome) logging.info("Read files: %s" % self.readFilePaths) logging.info("Working directory: %s" % self.dataDir) logging.info("Options:") logging.info(" Threshold of the alignment score at filtering: %d" % self.opts["aln_score_thres"]) # logging.info(" Paired-end direction: %s" % self.opts["pe_direction"]) # logging.info(" Options for LAST: %s" % self.opts["last_opts"]) logging.info(" Threshold of read coverate: %d" % self.opts["coverage"]) logging.info(" Threshold of mC ratio: %s" % str(self.opts["lower_bound"])) logging.info(" Threshold of the mismap probability at filtering: %s" % str(self.opts["aln_mismap_prob_thres"])) logging.info(" Working directory: %s" % self.dataDir) logging.info(" Local directory: %s" % self.opts["local_dir"]) logging.info(" Output file: %s" % (self.opts["output"] if self.opts["output"] else "(stdout)")) # logging.info(" Use cluster: %s" % ("Yes" if self.opts["use_cluster"] else "No")) # logging.info(" Queue name: %s" % self.opts["queue_list"]) logging.info(" Number of threads: %d" % self.opts["num_threads"]) # logging.info(" Split read size: %s" % self.opts["split_read_size"]) def prepareForReads(self): """ create directories to store split reads and result files. """ for read_no, read_path in enumerate(self.readFilePaths): readNo = read_no + 1 logging.info("Preparations for a read file start: %d: %s" % (readNo, read_path)) data_dir = "%s/%d" % (self.dataDir, readNo) read = {"base_dir": data_dir, "path": read_path, "reads_dir": data_dir + "/reads", "results_dir": data_dir + "/results"} if not os.path.exists(data_dir): os.makedirs(data_dir) os.makedirs(read["reads_dir"]) os.makedirs(read["results_dir"]) pe_no = self.pairedEndReadNumbers()[0] for readpath in read_path.split(","): dir_name, file_name, base_name, ext, prog = self.splitFilePath(readpath) file_type = self.checkReadFileType(readpath) read[pe_no] = {"name": base_name, "fname": file_name, "type": file_type, "path": readpath} pe_no += 1 is_paired_end = self.isPairedEnd(read) logging.info("Paired-end: %s" % is_paired_end) logging.info(" Forward: %s" % read[1]["path"]) if is_paired_end: logging.info(" Reverse: %s" % read[2]["path"]) logging.info("Preparations for a read file done") self.reads.append(read) return def runLast(self, readAttr): """ run LAST programs to map reads and filtering. """ is_paired_end = self.isPairedEnd(readAttr) filter_option = self.filterOpts(self.opts["aln_mismap_prob_thres"], self.opts["aln_score_thres"], is_paired_end) if is_paired_end: logging.info('BsfCall::runLast: PairedEnd') last_exec = LastExecutorPairedEnd(self.refGenome, self.dataDir, readAttr["reads_dir"], readAttr["results_dir"], self.opts["num_threads"]) else: logging.info('BsfCall::runLast: Not PairedEnd') last_exec = LastExecutorSingle(self.refGenome, self.dataDir, readAttr["reads_dir"], readAttr["results_dir"]) # last_exec.execute(readAttr, self.opts["num_threads"], self.lastalOpts(self.opts["last_opts"]), self.mergeOpts(), filter_option) # last_exec.execute(readAttr, 1, self.lastalOpts(self.opts["last_opts"]), self.mergeOpts(), filter_option) last_exec.execute(readAttr, 1, "", self.mergeOpts(), filter_option) def makeIndexFile(self): """ create index file of reference genome. """ directions = [] if not os.path.exists("%s.f.prj" % self.refGenome): directions.append("f") if not os.path.exists("%s.r.prj" % self.refGenome): directions.append("r") if len(directions) > 0: logging.info("Make index file start.") last_executor = LastExecutor(self.refGenome, self.dataDir) last_executor.lastdb(directions, self.opts["num_threads"] > 1) logging.info("Make index file done.") def checkReadFileType(self, readFilePath): """ get read file type. """ name, ext = os.path.splitext(readFilePath) if ext == ".gz": name, ext = os.path.splitext(name) if len(ext) > 1: ext = ext[1:] file_type = None if ext == "sra" or "fastq" or "fasta": file_type = ext elif ext == "fa": file_type = "fasta" else: f = open(readFilePath, "r") first_char = f.read(1) if first_char == "@": file_type = "fastq" elif first_char == ">": file_type = "fasta" else: file_type = "sra" f.close() return file_type def splitedReadFilePath(self, outputDir, start, end, readDirection, ext): """ get splitted read file path. """ return "%s/%010d-%010d_%d.%s" % (outputDir, start, end, readDirection, ext) def fastqDumpedFilePath(self, outputDir, readName, readDirection = None): """ get output file path for fastq-dump command. """ path = "%s/%s" % (outputDir, readName) if readDirection: path += "_%d" % readDirection return path + ".fastq" def autoWorkDir(self): """ get working directory path determined automatically. """ now = datetime.now() s = ",".join(self.readFilePaths) + self.refGenome for key, value in self.opts.items(): s += ("%s:%s" % (key, value)) h = hashlib.md5(s).hexdigest() return "%s-%06d-%s" % (now.strftime("%Y%m%d-%H%M%S"), now.microsecond, h[0:16]) class BsfCallCluster(BsfCall): """ class to execute bsf-call process on pc cluster. """ def __init__(self, refGenome, readFilePaths, cmdOpts): """ constructor of BsfCallCluster """ BsfCall.__init__(self, refGenome, readFilePaths, cmdOpts) self.lastExecutor = LastExecutorCluster(self.refGenome, self.opts) self.mappingJobIds = [] def processMapping(self): """ if bsf-call is executed on cluster, mapping and filtering job is submitted when read file is splitted. therefore, in this function, only wait for all jobs to finish. """ logging.info("Waiting for all jobs to finish.") self.waitForSubmitJobs(self.mappingJobIds) logging.info("Mapping and filtering process done.") return self.lastExecutor.resultDirs def processMcDetection(self, resultDirs, localDir = None): """ for each chromosome number, submit mC detection process job to the cluster. after all jobs have been finished, output mC detection result. """ logging.info("mC detection process start.") chrs = self.chrsByRefGenome(self.refGenome) job_ids = [] for chr_no in chrs: job_id = self.submitMcDetectionJob(resultDirs, chr_no) job_ids.append(job_id) sleep(1) logging.info("Submitted jobs: %s" % ",".join(job_ids)) self.waitForSubmitJobs(job_ids) mc_detector = McDetector(self.refGenome, resultDirs, self.mcContextDir, self.opts) mc_detector.chrs = chrs mc_detector.output(self.opts["output"]) logging.info("mC detection process done.") def submitMcDetectionJob(self, resultDirs, chrNo): """ submit mC detection process job to the cluster. """ argv = self.qsubRemoteCommandArgv(resultDirs, chrNo) cmd = self.qsubCommand(chrNo, " ".join(map((lambda s: '"' + s + '"'), argv))) qsub = os.popen(cmd) out = qsub.read() qsub.close() job_id = self.jobIdByQsubOutput(out) self.logJobSubmit("mC detection job: %s: Mapping result directories: %s" % (chrNo, resultDirs), job_id) return job_id def qsubRemoteCommandArgv(self, resultDirs, chrNo): """ get arguments for mC detection program (mc-detector.py). """ argv = [] argv.append(self.refGenome) argv.append(",".join(resultDirs)) argv.append(self.mcContextDir) argv.append(chrNo) argv.append(str(self.opts["only_mcdetection"])) argv.append(str(self.opts["lower_bound"])) argv.append(str(self.opts["coverage"])) argv.append(str(self.opts["aln_mismap_prob_thres"])) # if self.opts["read_bam"]: # argv.append("BAM") # elif self.opts["read_sam"]: # argv.append("SAM") # else: # argv.append("MAF") argv.append("MAF") if self.opts["local_dir"]: argv.append(self.opts["local_dir"]) return argv def qsubCommand(self, chrNo, cmdArgs): """ get qsub command to submit mC detection job. """ remote_cmd = os.path.join(self.scriptDir(), "mc-detector.py") out_file = os.path.join(self.mcContextDir, "mc-detector-%s.out" % chrNo) err_file = os.path.join(self.mcContextDir, "mc-detector-%s.err" % chrNo) if self.opts["queue_list"]: cmd = "qsub -o %s -e %s -q %s -pe openmpi %d -cwd -b y %s %s" % (out_file, err_file, self.opts["queue_list"], self.opts['num_threads'], remote_cmd, cmdArgs) else: cmd = "qsub -o %s -e %s -pe openmpi %d -cwd -b y %s %s" % (out_file, err_file, self.opts['num_threads'], remote_cmd, cmdArgs) return cmd def afterProcessSplitRead(self, readFile, readAttr = None): """ this function is called after output splitted one read file. if bsf-call is executed on pc cluster, mapping and filtering job is submitted after output one splitted read file. """ if self.isPairedEnd(readAttr): fpath, ext = os.path.splitext(readFile) if fpath[-1:] == "2": read_file = "%s1.%s" % (fpath[0:-1], readAttr[1]["type"]) job_id = self.lastExecutor.executeOneRead(readFile, readAttr) self.mappingJobIds.append(job_id) else: job_id = self.lastExecutor.executeOneRead(readFile, readAttr) self.mappingJobIds.append(job_id) class LastExecutor(BsfCallBase): """ class to run LAST programs to map read and filtering. """ def __init__(self, refGenome, baseDir = ".", readsDir = None, resultsDir = None, numThreads = 1): """ constructor of LastExecutor """ self.refGenome = refGenome self.baseDir = baseDir self.readsDir = readsDir self.resultsDir = resultsDir self.queue = Queue.Queue() self.lock = threading.Lock() self.numThreads = numThreads def execute(self, readAttr, numThreads = 1, lastalOpts = "", mergeOpts = "", filterOpts = ""): """ enqueue all splited read files to the queue. create and start threads to execute read mapping and filtering process. wait for all threads to finish. """ self.enqueue(readAttr) if self.queue.qsize()==0: logging.fatal("LastExecutor::execute: Error: queue size=%d" % self.queue.qsize()) sys.exit(1) logging.info("Queued %d files." % self.queue.qsize()) threads = [] for i in range(numThreads): t = threading.Thread(target=self.worker, args=(readAttr, lastalOpts, mergeOpts, filterOpts)) t.daemon = True threads.append(t) t.start() i = 1 for thread in threads: logging.info("Waiting for %d th thread." % i) thread.join() logging.info("Joined %d th thread." % i) i+=1 def worker(self, readAttr, lastalOpts, mergeOpts, filterOpts): """ thread worker to execute LAST. dequeue read file path from the queue and execute read mapping filtering process. """ if self.queue.empty(): logging.info("LastExecutor::worker: Queue is empty.") while not self.queue.empty(): fpath = self.queue.get_nowait() self.runLast(fpath, readAttr, lastalOpts, mergeOpts, filterOpts) return def runLast(self, readFile, readAttr, lastalOpts, mergeOpts, filterOpts, rmInFiles = True): """ execute LAST programs to map read and filtering. """ cmd = self.batchCmd(readFile, readAttr, lastalOpts, mergeOpts, filterOpts, rmInFiles) logging.info("LastExecutor::runLast: command=%s" % cmd) p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) # out, error = p.communicate() p.wait() error_msg = p.communicate()[1] if len(error_msg) > 0: logging.fatal(error_msg) def enqueue(self, readAttr): """ enqueue all splitted read files to the queue. """ # logging.info("%s/*_1.%s.bz2" % (self.readsDir, readAttr[1]["type"])) # for read_file in glob.glob("%s/*_1.%s" % (self.readsDir, readAttr[1]["type"])): # self.queue.put(read_file) self.queue.put(readAttr[1]["path"]) def lastdb(self, directions, parallel = False): """ execute lastdb command to create index file of reference genome. """ cmds = [] for direction in directions: cmds.append(self.lastdbCmd(direction)) if parallel: processes = [] for cmd in cmds: logging.info(cmd) p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) out = p.stdout processes.append({"p": p, "out": out}) for process in processes: process["p"].wait() out_data = process["out"].read() if len(out_data) > 0: logging.info(out_data) else: for cmd in cmds: logging.info(cmd) p = subprocess.Popen(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) out = p.stdout p.wait() out_data = out.read() if len(out_data) > 0: logging.info(out_data) def lastdbCmd(self, direction): """ get lastdb command to create index file of reference genome. """ # return "lastdb -w2 -u bisulfite_%s.seed %s.%s %s" % (direction, self.refGenome, direction, self.refGenome) return "lastdb -uBIS%s %s.%s %s" % (direction.upper(), self.refGenome, direction, self.refGenome) def lastalBsCmd(self, readFile, opts = ""): """ get lastal command to map read. """ # s_opt = self.lastalSopt(direction) read_name = self.readNameByReadFile(readFile) return "TMPDIR=%s last-bisulfite.sh %s.%s %s.%s %s > %s" % (self.readsDir, self.refGenome, 'f', self.refGenome, 'r', readFile, self.mappingResultFilePath(read_name)) def lastalBsPairCmd(self, readFile1, readFile2, opts = ""): """ get lastal command to map read. """ # s_opt = self.lastalSopt(direction) read_name = self.readNameByReadFile(readFile) return "PARALLEL=-j%d TMPDIR=%s last-bisulfite-pair.sh %s.%s %s.%s %s %s > %s" % (self.numThreads, self.readsDir, self.refGenome, 'f', self.refGenome, 'r', readFile1, readFile2, self.mappingResultFilePath(read_name)) # def mergeCmd(self, forwardFile, reverseFile, outputFile, opts = "", rmInFiles = True): def mergeCmd(self, inputFile, outputFile, opts = "", rmInFiles = True): """ get command to merge lastal output. """ cmd = "last-merge-batches %s > %s" % (inputFile, outputFile) if rmInFiles: cmd += "; rm %s" % inputFile return cmd # def mappingAndMergeCmd(self, readFile, lastalOpts = "", mergeOpts = "", rmInFiles = True): def mappingPairCmd(self, readFile1, readFile2, lastalOpts = "", mergeOpts = "", rmInFiles = True): """ get read mapping and filtering command. """ read_name = self.readNameByReadFile(readFile) n, ext = os.path.splitext(readFile) if ext == ".gz": n, ext = os.path.splitext(n) lastal_qopt = self.lastalQopt(ext[1:]) lastal_opt = "%s %s" % (lastalOpts, lastal_qopt) mapping_file = self.mappingResultFilePath(read_name) return self.lastalBsPairCmd(readFile1, readFile2, lastal_opt) # def mappingResultFilePath(self, readName, direction): def mappingResultFilePath(self, readName): """ get read mapping result file path. """ return "%s/%s.maf" % (self.resultsDir, readName) def mergeResultFilePath(self, readName): """ get merge result file path. """ return "%s/%s.merge.maf" % (self.resultsDir, readName) def filterResultFilePath(self, readName): """ get filtering result file path. """ return "%s/%s.maf" % (self.resultsDir, readName) def lastalSopt(self, direction): """ get -s option for lastal. """ opt = "" if direction == "f": opt = "-s1" elif direction == "r": opt = "-s0" return opt def lastalQopt(self, fileType): """ get -Q option for lastal. """ opt = "" if fileType == "fasta": opt = "-Q0" elif fileType == "fastq": opt = "-Q1" return opt class LastExecutorCluster(LastExecutor): """ class to run LAST programs on pc cluster. """ def __init__(self, refGenome, bsfCallOpts): """ constructor of LastExecutorCluster """ self.refGenome = refGenome self.opts = bsfCallOpts self.resultDirs = [] def executeOneRead(self, readFile, readAttr): """ execute read mapping and filtering process with specified read. on pc cluster, submit mapping and filtering job and return job id. """ if readAttr["results_dir"] not in self.resultDirs: self.resultDirs.append(readAttr["results_dir"]) lastal_opts = self.lastalOpts(self.opts["last_opts"]) merge_opts = self.mergeOpts() filter_opts = self.filterOpts(self.opts["aln_mismap_prob_thres"], self.opts["aln_score_thres"], self.isPairedEnd(readAttr)) job_id = self.submitJob(readFile, readAttr, lastal_opts, merge_opts, filter_opts, self.opts["queue_list"]) return job_id def submitJob(self, readFile, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", queueName = None): """ submit read mapping and filtering process job. """ job_id = None read_name = self.readNameByReadFile(readFile)[0:-2] out_file = self.qsubStdoutFilePath(readAttr["base_dir"], read_name) err_file = self.qsubStderrFilePath(readAttr["base_dir"], read_name) remote_cmd = self.remoteCommand(readAttr) remote_cmd_args = " ".join(map((lambda s: '"' + s + '"'), self.remoteCommandArgv(read_name, readAttr, lastalOpts, filterOpts))) if queueName: cmd = "qsub -o %s -e %s -q %s -cwd %s %s" % (out_file, err_file, queueName, remote_cmd, remote_cmd_args) else: cmd = "qsub -o %s -e %s -cwd %s %s" % (out_file, err_file, remote_cmd, remote_cmd_args) qsub = os.popen(cmd) out = qsub.read() qsub.close() job_id = self.jobIdByQsubOutput(out) dir_path, file_name, base_name, ext, prog = self.splitFilePath(readFile) self.logJobSubmit("Mapping and filtering: read: %s/%s" % (dir_path, base_name[0:-2]), job_id) return job_id def remoteCommand(self, readAttr): """ get read mapping and filtering command path to submit by qsub command. """ if self.isPairedEnd(readAttr): return os.path.join(self.scriptDir(), "mapping-p.sh") else: return os.path.join(self.scriptDir(), "mapping-s.sh") def remoteCommandArgv(self, readName, readAttr, lastalOpts, filterOpts): """ get read mapping and filtering command arguments. """ argv = [] argv.append(readAttr["reads_dir"]) argv.append(readAttr["results_dir"]) argv.append(self.refGenome) argv.append(filterOpts) argv.append(readName) argv.append(readAttr[1]["type"]) argv.append("%s %s" % (lastalOpts, self.lastalQopt(readAttr[1]["type"]))) if self.isPairedEnd(readAttr): argv.append(readName) argv.append(readAttr[2]["type"]) argv.append("%s %s" % (lastalOpts, self.lastalQopt(readAttr[2]["type"]))) return argv def qsubStdoutFilePath(self, dirPath, readName): """ get qsub command standard output file path. """ return "%s/mapping_%s.out" % (dirPath, readName) def qsubStderrFilePath(self, dirPath, readName): """ get qsub command standard error file path. """ return "%s/mapping_%s.err" % (dirPath, readName) class LastExecutorSingle(LastExecutor): """ class to run LAST programs to map single read and filtering. """ def __init__(self, refGenome, baseDir, readsDir, resultsDir): """ constructor of LastExecutorSingle """ LastExecutor.__init__(self, refGenome, baseDir, readsDir, resultsDir) def batchCmd(self, readFile1, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", rmInFiles = True): """ get batch command to map read and filtering. """ read_name = self.readNameByReadFile(readFile1) out_file = self.filterResultFilePath(read_name[0:-2]) return "TMPDIR=%s last-bisulfite.sh %s.f %s.r %s > %s/%s.maf" % (self.readsDir, self.refGenome, self.refGenome, readFile1, self.resultsDir, read_name) # cmds = [] # cmds.append(self.mappingAndMergeCmd(readFile, lastalOpts, mergeOpts, rmInFiles)) # cmds.append(self.mappingPairCmd(readFile1, readFile2, lastalOpts, mergeOpts, rmInFiles)) # cmds.append(self.filterCmd(self.mergeResultFilePath(read_name), out_file, filterOpts, rmInFiles)) # cmds.append(self.filterCmd(self.mappingResultFilePath(read_name), out_file, filterOpts, rmInFiles)) # cmds.append("bzip2 %s" % out_file) # return "; ".join(cmds) def filterCmd(self, inputFile, outputFile, opts = "", rmInFile = True): """ get filter command. """ cmd = "last-map-probs %s %s > %s" % (opts, inputFile, outputFile) if rmInFile: cmd += "; rm %s" % inputFile return cmd class LastExecutorPairedEnd(LastExecutor): """ class to run LAST programs to map paired-end read and filtering. """ def __init__(self, refGenome, baseDir, readsDir, resultsDir, numThreads): """ constructor of LastExecutorPairedEnd """ LastExecutor.__init__(self, refGenome, baseDir, readsDir, resultsDir) def batchCmd(self, readFile1, readAttr, lastalOpts = "", mergeOpts = "", filterOpts = "", rmInFiles = True): """ get batch command to map read and filtering. """ # readFile2 = self.secondReadFilePathByFirstReadFilePath(readFile1, readAttr[2]["type"]) # read_name1 = self.readNameByReadFile(readFile1) # read_name2 = self.readNameByReadFile(read_files[1]) # merge_result_file = "%s %s" % (self.mergeResultFilePath(read_name1), self.mergeResultFilePath(read_name2)) # mapping_result_file = "%s %s" % (self.mappingResultFilePath(read_name1), self.mappingResultFilePath(read_name2)) # out_file = self.filterResultFilePath(read_name1[0:-2]) # return "TMPDIR=%s last-bisulfite-paired.sh %s.f %s.r %s %s > %s" % (self.baseDir, self.refGenome, self.refGenome, readFile1, readFile2, out_file) read_name1 = self.readNameByReadFile(readAttr[1]["path"]) read_name2 = self.readNameByReadFile(readAttr[2]["path"]) return "PARALLEL=-j%d TMPDIR=%s last-bisulfite-paired.sh %s.f %s.r %s %s > %s/%s,%s.maf" % (self.numThreads, self.readsDir, self.refGenome, self.refGenome, readAttr[1]["path"], readAttr[2]["path"], self.resultsDir, read_name1, read_name2) def filterCmd(self, inputFile, outputFile, opts = "", rmInFile = True): """ get filter command. """ cmd = "last-pair-probs %s %s > %s" % (opts, inputFile, outputFile) if rmInFile: cmd += "; rm %s" % inputFile return cmd class McDetector(BsfCallBase): """ class to execute mC detection process. """ def __init__(self, refGenome, resultDirs, mcContextDir, options): """ constructor of McDetector """ self.refGenome = refGenome self.refGenomeBuf = {} self.refGenomeChr = [] self.mappingResultDirs = resultDirs self.mcContextDir = mcContextDir self.lowerBound = options["lower_bound"] self.coverageThreshold = options["coverage"] self.onlyMcDetection = options["only_mcdetection"] self.opts = options self.mappingResultFiles = [] if self.onlyMcDetection: self.mismapThreshold = options["aln_mismap_prob_thres"] # self.readBam = options["read_bam"] # self.readSam = options["read_sam"] self.readBam = False self.readSam = False if "mapping_result_files" in options: self.mappingResultFiles = options["mapping_result_files"] else: self.mappingResultFiles = self.getAllMappingResultFiles(resultDirs) else: self.mismapThreshold = 1e-10 self.readBam = False self.readSam = False self.mappingResultFiles = self.getAllMappingResultFiles(resultDirs) if len(self.mappingResultFiles)==0: logging.fatal("McDetector::__init__: error: no mapping result file found.") sys.exit(1) if options["local_dir"]: self.localDir = options["local_dir"] else: self.localDir = mcContextDir self.readRefGenome(self.refGenome, self.refGenomeBuf, self.refGenomeChr) logging.debug("McDetector::__init__: mappingResultDirs=%s" % ','.join(self.mappingResultDirs)) def execute(self, outputFile, numWorkers = 1): """ execute mC detection process and output result. """ self.process() self.output(outputFile) def process(self): """ execute mC detection process. """ logging.info("mC detection process start") if len(self.mappingResultFiles)==0: logging.fatal("McDetector::process: error: no mapping result found.") sys.exit(1) for mapping_result_file in self.mappingResultFiles: logging.info("Parsing mapping result file: %s" % mapping_result_file) if self.onlyMcDetection: if not (self.readBam or self.readSam): if self.isGzipFile(mapping_result_file) or self.isMafFile(mapping_result_file): self.processMafFile(mapping_result_file) else: # BAM or SAM if self.readBam and self.isBamFile(mapping_result_file): self.processSamFile(mapping_result_file) elif self.readSam and self.isSamFile(mapping_result_file): self.processSamFile(mapping_result_file) else: self.processMafFile(mapping_result_file) logging.info("Parsing mapping result file done") if self.mcContextDir != self.localDir: copy(self.mcContextLocalFilePath(self.targetChr), self.mcContextDir) def output(self, outputFile): """ output mC detection result. """ logging.info("McDetector::output: outputFile=%s" % outputFile) popen_args = ['sort', '-k1'] list = glob.glob("%s/*.bsf" % self.localDir) if len(list)==0: logging.fatal("McDetect::output: no *._bsf_ files found in %s" % self.localDir) sys.exit(1) for bsf_file in list: if os.path.getsize(bsf_file) > 0: popen_args.append(bsf_file) logging.info("McDetector::output: added bsf_file=\"%s\"" % bsf_file) logging.debug("McDetector::output: popen_args=%s" % ' '.join(popen_args)) fout = open(outputFile, 'w') if len(popen_args) > 2: pipe = subprocess.Popen(popen_args, stdout=subprocess.PIPE) block = [] for line in pipe.stdout: (chr, ctx_pos, strand, mc_ctx, base, conf) = line.strip().split("\t") ctx_pos = int(ctx_pos) conf = float(conf) if len(block)==0 or block[-1][0]==chr: block.append([chr, ctx_pos, strand, mc_ctx, base, conf]) else: self.outputOneChrBlock(fout, block) del block[:] block.append([chr, ctx_pos, strand, mc_ctx, base, conf]) if len(block) > 0: self.outputOneChrBlock(fout, block) pipe.stdout.close() else: logging.info("McDetector::output: no result files found.") fout.close() def outputOneChrBlock(self, fout, block): """ output mC detection result for one chromosome. """ chr = block[0][0] logging.info("McDetector::outputOneChrBlock: chr=%s" % chr) mc_contexts = {} for b in sorted(block, key=lambda block: block[1]): try: ctx_pos, strand, mc_ctx, base, conf = b[1:] if not ctx_pos in mc_contexts: mc_contexts[ctx_pos] = {} if not strand in mc_contexts[ctx_pos]: mc_contexts[ctx_pos][strand] = {} if not mc_ctx in mc_contexts[ctx_pos][strand]: mc_contexts[ctx_pos][strand][mc_ctx] = [] mc_contexts[ctx_pos][strand][mc_ctx].append([base, conf]) except ValueError, e: logging.warning("McDetect::outputOneChrBlock: value error: %s: %s -> %s" % (fpath, line.strip(), e.args[0])) num_entry = 0 if len(mc_contexts.keys()) > 0: for pos in sorted(mc_contexts.keys()): for strand in mc_contexts[pos].keys(): for mc_ctx in mc_contexts[pos][strand].keys(): coverage, mc_ratio = self.calcCoverage(mc_contexts[pos][strand][mc_ctx], strand) if coverage >= self.coverageThreshold and mc_ratio >= self.lowerBound: fout.write("%s\t%d\t%s\t%s\t%g\t%d\n" % (chr, pos, strand, mc_ctx, mc_ratio, coverage)) num_entry += 1 else: logging.info("McDetect::outputOneChrBlock: rejected: chr=%s pos=%d strand=%s coverage=%g mc_ratio=%g" % (chr, pos, strand, coverage, mc_ratio)) # self.bzip2File(fpath, False) logging.info("McDetector::outputOneChrBlock: number of entries %s" % num_entry) def processMafFile(self, resultFile): """ read mapping result file, and output mC context data file. """ file_name = self.splitFilePath(resultFile)[1] outputFile = "%s/%s.bsf" % (self.localDir, file_name) try: fin = open(resultFile, 'r') fout = open(outputFile, 'w') block = [] for line in fin: if line[0] == '#' or line[0] == 'p' or line[0] == "\n": continue if line[0] == 'a' or line[0] == 's' or line[0] == 'q': block.append(line.strip()) if len(block)==4: if block[0][0]=='a' and block[1][0]=='s' and block[2][0]=='s' and block[3][0]=='q': mismap_prob = float(block[0].split('=')[2]) if mismap_prob <= self.mismapThreshold: b1 = block[1].split() chr = b1[1] ref_seq = b1[-1].upper() ref_start = int(b1[2]) # 0-based position read_seq = block[2].split()[-1] read_len = len(read_seq) read_qual = block[3].split()[-1] strand = self.findStrandFromAlignment(chr, ref_start, read_seq, read_len) # strand = block[2].split()[4] if strand == '+' or strand == '-': lines = self.extractMcContextsByOneRead(chr, strand, mismap_prob, ref_seq, ref_start, read_seq, read_qual, read_len) for line in lines: fout.write(line) logging.debug("processMafFile: a maf block(%s) is successfully captured." % strand) elif strand == '+-': for st in ('+', '-'): lines = self.extractMcContextsByOneRead(chr, st, mismap_prob, ref_seq, ref_start, read_seq, read_qual, read_len) for line in lines: fout.write(line) logging.debug("processMafFile: a maf block(%s) is successfully captured." % strand) else: logging.debug("processMafFile: a maf block does not show strand-specific info.") else: logging.info("processMafFile: alignment \"%s\" has greater mismap prob. than the threshold." % block[0]) del block[:] else: logging.fatal("processMafFile: error: unexpected malformed maf block is found.") logging.fatal("block 1: \"%s\"\n" % block[0]) logging.fatal("block 2: \"%s\"\n" % block[1]) logging.fatal("block 3: \"%s\"\n" % block[2]) logging.fatal("block 4: \"%s\"\n" % block[3]) sys.exit(1) fin.close() fout.close() if len(block) > 0: logging.fatal("McDetect::processMafFile: error: possible malformed MAF file.") for b in block: logging.fatal(b) sys.exit(1) except IOError: logging.fatal("McDetect::processMafFile: error: unable to read a MAF file \"%s\"" % resultFile) sys.exit(1) return def processSamFile(self, samFile): """ read mapping BAM/SAM file, and output mC context data file. """ logging.info("Process BAM/SAM file start: %s" % samFile) samfile = None try: if self.readBam: samfile = pysam.Samfile(samFile, "rb") else: samfile = pysam.Samfile(samFile, "r") counter = 1 for aln in samfile.fetch(until_eof = True): samaln = SamAlnParser(samfile, aln) samaln.setRefGenome(self.targetChr, self.targetSeqD, self.targetSeqLen) samaln.parse() chr = samaln.referenceName() if (chr is None) or (chr != self.targetChr): continue if aln.mapq: mismap = self.bamMapq2Mismap(aln.mapq) if mismap - self.mismapThreshold > 0: continue read_seq = samaln.alnReadSeq.replace("t", "C") self.extractMcContextsByOneRead(chr, samaln.strand, 0, samaln.alnRefSeq.upper(), samaln.alnRefStart, self.targetSeqLen, read_seq, read_qual) counter += 1 if counter > 500000: self.outputMcContextData() counter = 1 samfile.close() self.outputMcContextData() except Exception, e: logging.fatal(samFile) logging.fatal(" %s" % str(type(e))) logging.fatal(" %s" % str(e)) if samfile: samfile.close() def extractMcContextsByOneRead(self, chr, strand, mismapProb, refSeq, refStart, readSeq, readQual, readLen): """ extract mC context by one read. """ lines = [] if strand == '+': real_pos = refStart for i in range(readLen): if readSeq[i] == '-': continue # if refSeq[i] == 'C' and (readSeq[i] == 'C' or readSeq[i] == 'T'): if refSeq[i] == 'C': baseConf = self.qualityCharToErrorProb(readQual[i]) ctx_type = self.mcContextType(self.refGenomeBuf[chr], real_pos, strand) line = "%s\t%d\t%s\t%s\t%s\t%g\n" % (chr, real_pos, strand, ctx_type, readSeq[i], (1-mismapProb) * (1-baseConf)) # logging.debug("extractMcContextsByOneRead: line=%s" % line) lines.append(line) real_pos += 1 if strand == '-': real_pos = refStart for i in range(readLen): if readSeq[i] == '-': continue # if refSeq[i] == 'G' and (readSeq[i] == 'G' or readSeq[i] == 'A'): if refSeq[i] == 'G': baseConf = self.qualityCharToErrorProb(readQual[i]) ctx_type = self.mcContextType(self.refGenomeBuf[chr], real_pos, strand) read_base = self.complementaryBase(readSeq[i]) line = "%s\t%d\t%s\t%s\t%s\t%g\n" % (chr, real_pos, strand, ctx_type, readSeq[i], (1-mismapProb) * (1-baseConf)) # logging.debug("extractMcContextsByOneRead: line=%s" % line) lines.append(line) real_pos += 1 return lines def complementaryBase(self, base): """ compute a complemetary base. """ if base == 'A': return 'T' if base == 'C': return 'G' if base == 'G': return 'C' if base == 'T': return 'A' if base == 'N': return 'N' if base == 'a': return 't' if base == 'c': return 'g' if base == 'g': return 'c' if base == 't': return 'a' if base == 'n': return 'n' sys.exit(1) def findStrandFromAlignment(self, chr, ref_start, read_seq, read_len): plus_sup = 0 minus_sup = 0 other_mismatch = 0 base_size = 0 ref_seq = self.refGenomeBuf[chr] for i in range(read_len): j = ref_start + i if ref_seq[j]=='C' and read_seq[i]=='T': plus_sup += 1 base_size += 1 elif ref_seq[j]=='G' and read_seq[i]=='A': minus_sup += 1 base_size += 1 elif (ref_seq[j]!='-' and read_seq[i]!='-') and ref_seq[j]!=read_seq[i]: other_mismatch += 1 base_size += 1 if base_size==0: return '.' mismatch_rate = float(other_mismatch)/float(base_size) # if plus_sup > minus_sup and plus_sup > other_mismatch: # if plus_sup > minus_sup and mismatch_rate < 0.05: if plus_sup > minus_sup: return '+' # if minus_sup > plus_sup and minus_sup > other_mismatch: # if minus_sup > plus_sup and mismatch_rate < 0.05: if minus_sup > plus_sup: return '-' if plus_sup == minus_sup: return '+-' return '.' def __extractMcContextsByOneRead(self, chr, strand, mismapProb, refSeq, refStart, refSeqLen, readSeq, readQual): """ extract mC context by one read. """ logging.debug("extractMcContextsByOneRead(%s, %s, %g, %s, %d, %d, %s, %s)" % (chr, strand, mismapProb, refSeq, refStart, refSeqLen, readSeq, readQual)) nogap_refseq = self.clearGap(refSeq) bases = list(refSeq) last_pos = len(nogap_refseq) - 1 pos = -1 while True: try: pos = bases.index("C", pos + 1) num_gaps = refSeq.count("-", 0, pos) real_pos = pos - num_gaps ctx_type = self.mcContextType(nogap_refseq, real_pos) ctx_pos = refStart + real_pos if ctx_type == None: if strand == "+": ctx_type = self.mcContextType(self.targetSeqD, ctx_pos) elif strand == "-": ctx_type = self.mcContextType(self.targetSeqC, ctx_pos) if ctx_type == None: continue if strand == "-": ctx_pos = refSeqLen - ctx_pos - 1 line = "%d\t%s\t%s\t%s\n" % (ctx_pos, strand, ctx_type, readSeq[pos]) base_name = self.mcContextFileBaseName(ctx_pos) if base_name not in self.mcDetectData: self.mcDetectData[base_name] = [] self.mcDetectData[base_name].append(line) except IndexError: logging.debug("extractMcContextsByOneRead#IndexError: %s %d %s %s %s %d" % (chr, ctx_pos, strand, ctx_type, readSeq, pos)) except ValueError: break def qualityCharToErrorProb(self, qualityChar): """ convert FASTQ (Sanger) quality character to error probability. """ return 10**((ord('!')-ord(qualityChar))*0.1) def outputMcContextData(self): """ output mC context data to file. """ logging.info("Output mC detection data start.") for base_name in self.mcDetectData.keys(): fpath = self.mcContextFilePathByName(base_name) logging.debug("%s: %d" % (fpath, len(self.mcDetectData[base_name]))) fo = open(fpath, "a") fo.write("".join(self.mcDetectData[base_name])) fo.close() self.mcDetectData.clear() logging.info("Output mC detection data done.") def mcContextHash(self): h = {} for strand in self.strands(): h[strand] = {} for mc_ctx in self.mcContextTypes(): h[strand][mc_ctx] = {} return h def isC(self, seq): return seq[0:1].upper() == "C" def isT(self, seq): return seq[0:1].upper() == "T" def calcCoverage(self, seqAry, strand): """ count the number of C and T (or G and A), calculate mC ratio. """ num_c = 0.0 num_t = 0.0 num_all = 0 (C, T) = ('C', 'T') if strand == '-': (C, T) = ('G', 'A') for i in seqAry: num_all += 1 if i[0] == C: num_c += i[1] elif i[0] == T: num_t += i[1] num_ct = num_c + num_t if num_all == 0 or num_ct == 0: return (0, 0) # return (num_all, num_c/num_all) return (num_all, num_c/num_ct) def mafMismapValue(self, aLine): """ get mismap value by maf "a" line. """ mismap_fields = filter((lambda s: s.startswith('mismap=')), aLine.split()) if len(mismap_fields) > 0: return float(mismap_fields[0][7:]) else: return None def mcContextFilePath(self, pos): """ get mC context data file path with specified position. """ return self.mcContextFilePathByName(self.mcContextFileBaseName(pos)) def mcContextFilePathByName(self, name): """ get mC context data file path with specified name. """ return "%s/%s/%s.tsv" % (self.localDir, self.targetChr, name) def mcContextFileBaseName(self, pos): """ get mC context data file name with specified chromosome number. """ return "%010d" % (int(pos) / 100000) def mcContextLocalFilePath(self, chrNo): """ get local mC context data file path with specified chromosome number. """ return "%s/%s.tsv" % (self.localDir, chrNo) def mcContextGlobalFilePath(self, chrNo): """ get global mC context data file path with specified chromosome number. """ return "%s/%s.tsv" % (self.mcContextDir, chrNo) class SamAlnParser(BsfCallBase): def __init__(self, samfile, aln): self.samfile = samfile self.aln = aln self.refName = None self.refSeq = None self.refSeqLen = None self.strand = None self.alnRefStart = None self.alnRefSeq = None self.alnRefSeqLen = None self.alnReadSeq = None def setRefGenome(self, name, sequence, length = None): if length is None: length = len(sequence) self.refName = name self.refSeq = sequence self.refSeqLen = length def referenceName(self): if self.aln.tid >= 0: return self.samfile.getrname(self.aln.tid) else: return None def getStrand(self): if self.aln.is_reverse: return "-" else: return "+" def parseCigar(self, cigar): return [{"op": v[-1:], "num": int(v[:-1])} for v in re.findall('\d+[A-Z=]', cigar)] def alignmentSequences(self, refSeqPos, readSeq, cigars): refs = [] rest_refseq = 0 reads = [] read_pos = 0 for cigar in cigars: if cigar["op"] == "M": refs.append(self.refSeq[refSeqPos:refSeqPos+cigar["num"]]) refSeqPos += cigar["num"] reads.append(readSeq[read_pos:read_pos+cigar["num"]]) read_pos += cigar["num"] elif cigar["op"] == "I": refs.append("-" * cigar["num"]) reads.append(readSeq[read_pos:read_pos+cigar["num"]]) read_pos += cigar["num"] elif cigar["op"] == "P": refs.append("-" * cigar["num"]) reads.append("-" * cigar["num"]) elif cigar["op"] == "D" or cigar["op"] == "N": rest_refseq += cigar["num"] reads.append("-" * cigar["num"]) elif cigar["op"] == "S": read_pos += cigar["num"] if rest_refseq > 0: refs.append(self.refSeq[refSeqPos:refSeqPos+rest_refseq]) return {"reference": "".join(refs), "read": "".join(reads)} def parse(self): self.strand = self.getStrand() self.alnRefStart = self.aln.pos read_seq = self.aln.seq cigars = self.parseCigar(self.aln.cigarstring) alignment = self.alignmentSequences(self.alnRefStart, read_seq, cigars) if self.strand == "+": self.alnRefSeq = alignment["reference"] self.alnReadSeq = alignment["read"] elif self.strand == "-": nogap_refseq = self.clearGap(alignment["reference"]) self.alnRefStart = self.complementStartPosition(self.refSeqLen, self.alnRefStart, len(nogap_refseq)) self.alnRefSeq = self.complementSeq(alignment["reference"]) self.alnReadSeq = self.complementSeq(alignment["read"])