Mercurial > repos > ahosny > cnvsim
diff cnvsim/genome_simulator.py @ 5:63955244b2fa draft
Uploaded cnvsim library package
author | ahosny |
---|---|
date | Sat, 06 Aug 2016 15:27:30 -0400 |
parents | |
children | eca72016b5b3 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cnvsim/genome_simulator.py Sat Aug 06 15:27:30 2016 -0400 @@ -0,0 +1,236 @@ +__author__ = 'Abdelrahman Hosny' + +import random +import os +import subprocess +import datetime +import shutil + +from . import fileio + + +def _log(message): + print '[CNV SIM {:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + "] " + message + + +def getScriptPath(): + return os.path.dirname(os.path.realpath(__file__)) + + +def _generateCNVMask(mask_length, p_amplify, p_delete, min_variation, max_variation): + ''' + This function generates random Copy Number Variations mask list + :param exons: list of regions. + :param p_amplify: percentage of amplifications introduced + :param p_delete: percentage of deletions introduced + :return: a list of the same length as the exons list. each list item + indicates the variation to be added to the exon in the same position. + Positive number: number of amplification + Zero: no change + -1: delete this exon + ''' + + number_of_amplifications = int(p_amplify * mask_length) + number_of_deletions = int(p_delete * mask_length) + cnv_mask = [0] * mask_length + + # generate CNV mask (a list of amplifications and deletions to be applied to the exons) + while number_of_amplifications > 0: + choice = random.randrange(0, mask_length) + while cnv_mask[choice] != 0: + choice = random.randrange(0, mask_length) + cnv_mask[choice] = random.randrange(min_variation, max_variation) # random amplifications in the range [min_variation, max_variation) + number_of_amplifications -= 1 + random.shuffle(cnv_mask) + while number_of_deletions > 0: + choice = random.randrange(0, mask_length) + while cnv_mask[choice] != 0: + choice = random.randrange(0, mask_length) + cnv_mask[choice] = -1*random.randrange(min_variation, max_variation) # random deletions in the range [min_variation, max_variation) + number_of_deletions -= 1 + random.shuffle(cnv_mask) + + return cnv_mask + + +def _generateCNVList(chromosome_length, number_of_regions, p_amplify, p_delete, min_variation, max_variation): + ''' + + :param chromosome_length: + :param number_of_regions: + :param p_amplify: + :param p_delete: + :param min_variation: + :param max_variation: + :return: + ''' + region_length = chromosome_length / number_of_regions + start = 0 + end = region_length + cnv_list = [] + mask = _generateCNVMask(number_of_regions, p_amplify, p_delete, min_variation, max_variation) + for i in range(number_of_regions): + # jump forward start + jump_start = start + int(random.randrange(int(0.40*region_length), int(0.45*region_length))) + + # jump backward end + jump_end = end - int(random.randrange(int(0.40*region_length), int(0.45*region_length))) + + cnv_list.append([jump_start, jump_end, mask[i]]) + start = end + end += region_length + + return cnv_list + + +def _simulateCNV(genome, cnv_list, read_length): + ''' + Simulate the control genome and the CNV genome + :param genome: original genome sequence + :param cnv_list: a list of region variations (chromosome, start, end, variation) + :return: control_genome, cnv_genome + ''' + control_genome = [] + cnv_genome = [] + + for cnv in cnv_list: + start = cnv[1] + end = cnv[2] + variation = cnv[3] + sequence = genome[start: end] + + if variation > 0: + # amplification + amplification = genome[start-read_length: start] + sequence * variation + genome[end:end+read_length] + cnv_genome.append(amplification) + elif variation < 0: + # deletion + deletion = genome[start-read_length: start] + sequence * variation + genome[end:end+read_length] + control_genome.append(deletion) + + return ''.join(control_genome), ''.join(cnv_genome) + + +def _callART(genome_file, output_file, read_length, fold_coverage=1): + ''' + Calls Wessim to generate artificial reads for the targets on the reference genome + :param genome_file: reference genome file in FASTA format + :param output_file: output file name + :param read_length: the read length + :param fold_coverage: fold coverage for the reads + :return: None + ''' + os.chdir(os.path.join(getScriptPath(), "ART")) + subprocess.call(["./art_illumina", \ + "-sam", \ + "-i", genome_file, \ + "-p", \ + "-m", "200", \ + "-s", "10", \ + "-l", str(read_length), \ + "-f", str(fold_coverage), \ + "-o", output_file], stderr=None) + os.chdir("..") + + +def simulate_genome_cnv(simulation_parameters, cnv_list_parameters=None): + ''' + Simulate copy number variations on the passed reference genome based on the given simulation parameters + :param simulation_parameters: a dictionary containing parameters for simulation + :param cnv_list_parameters: a dictionary containing parameters for CNV List creation + :return: None + ''' + _log('simulation type: whole genome') + + # create a temporary directory for intermediate files + if not os.path.exists(simulation_parameters['tmp_dir']): + os.makedirs(simulation_parameters['tmp_dir']) + + # create output directory for final results + if not os.path.exists(simulation_parameters['output_dir']): + os.makedirs(simulation_parameters['output_dir']) + + # copy genome to the tmp folder + shutil.copyfile(simulation_parameters['genome_file'], os.path.join(simulation_parameters['tmp_dir'], "reference.fa")) + genome_file = os.path.join(simulation_parameters['tmp_dir'], "reference.fa") + + # initialize variables for temporary files + control_genome_file = os.path.join(simulation_parameters['tmp_dir'], "ControlGenome.fa") + cnv_genome_file = os.path.join(simulation_parameters['tmp_dir'], "CNVGenome.fa") + base_reads_file = os.path.join(simulation_parameters['tmp_dir'], "base") + control_reads_file = os.path.join(simulation_parameters['tmp_dir'], "control") + cnv_reads_file = os.path.join(simulation_parameters['tmp_dir'], "cnv") + + _log("loading genome file ..") + header, genome = fileio.readGenome(genome_file) + _log("successfully loaded a genome of length " + `len(genome)`) + + if simulation_parameters['cnv_list_file'] is None: + # CNV list file + cnv_list_file = os.path.join(simulation_parameters['output_dir'], "CNVList.bed") + + _log("generating CNV list ..") + cnv_list = _generateCNVList(len(genome), cnv_list_parameters['regions_count'], \ + cnv_list_parameters['amplifications'], cnv_list_parameters['deletions'], \ + cnv_list_parameters['minimum_variations'], \ + cnv_list_parameters['maximum_variations']) + cnv_list = map(lambda l: [header.replace('>', '')] + l, cnv_list) + + with open(cnv_list_file, 'w') as f: + for i, cnv_region in enumerate(cnv_list): + line = cnv_region[0] + '\t' \ + + `cnv_region[1]` + '\t' \ + + `cnv_region[2]` + '\t' \ + + `cnv_region[3]` + '\n' + f.write(line) + _log("randomly generated CNV list saved to " + cnv_list_file) + + else: + _log("loading CNV list ..") + with open(simulation_parameters['cnv_list_file'], "r") as f: + cnv_list = [] + lines = f.readlines() + for line in lines: + chromosome, region_start, region_end, variation = line.strip().split("\t") + cnv_list.append((chromosome, int(region_start), int(region_end), int(variation))) + + _log("successfully loaded CNV list that contains " + `len(lines)` + " regions ..") + + # call ART to generate reads from the genome file + _log("generating reads for the genome ..") + _log("delegating job to ART ...") + _callART(genome_file, base_reads_file, simulation_parameters['read_length']) + + _log("simulating copy number variations (amplifications/deletions)") + control_genome, cnv_genome = _simulateCNV(genome, cnv_list, simulation_parameters['read_length']) + + _log("saving to the control genome file ..") + + with open(control_genome_file, 'w') as fw: + fw.write(header + "\n") + n = 50 + l = [control_genome[i:i + n] for i in range(0, len(control_genome), n)] + for line in l: + fw.write(line + "\n") + + _log("delegating job to ART ...") + _callART(control_genome_file, control_reads_file, simulation_parameters['read_length']) + + _log("saving to the CNV genome file ..") + with open(cnv_genome_file, 'w') as fw: + fw.write(header + "\n") + n = 50 + l = [cnv_genome[i:i + n] for i in range(0, len(cnv_genome), n)] + for line in l: + fw.write(line + "\n") + + _log("delegating job to ART ...") + _callART(cnv_genome_file, cnv_reads_file, simulation_parameters['read_length']) + + _log("merging results ..") + fileio.mergeARTReads(simulation_parameters['tmp_dir'], simulation_parameters['output_dir']) + + _log("cleaning temporary files ..") + fileio.clean(simulation_parameters['tmp_dir']) + + _log("simulation completed. find results in " + simulation_parameters['output_dir']) \ No newline at end of file