Mercurial > repos > artbio > small_rna_maps
diff small_rna_maps.py @ 2:507383cce5a8 draft
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/small_rna_maps commit edbb53cb13b52bf8e71c562fa8acc2c3be2fb270
author | artbio |
---|---|
date | Mon, 14 Aug 2017 05:52:34 -0400 |
parents | 40972a8dfab9 |
children | ed8b0142538d |
line wrap: on
line diff
--- a/small_rna_maps.py Mon Jul 24 12:31:04 2017 -0400 +++ b/small_rna_maps.py Mon Aug 14 05:52:34 2017 -0400 @@ -8,32 +8,28 @@ def Parser(): the_parser = argparse.ArgumentParser() - the_parser.add_argument('--input', dest='input', required=True, - nargs='+', help='input BAM files') - the_parser.add_argument('--sample_name', dest='sample_name', - required=True, nargs='+', help='sample name') - the_parser.add_argument('--output', action='store', - type=str, help='output tabular file') - the_parser.add_argument('-S', '--sizes', action='store', - help='use to output read sizes dataframe') + the_parser.add_argument('--inputs', dest='inputs', required=True, + nargs='+', help='list of input BAM files') + the_parser.add_argument('--sample_names', dest='sample_names', + required=True, nargs='+', + help='list of sample names') + the_parser.add_argument('--outputs', nargs='+', action='store', + help='list of two output paths (only two)') + the_parser.add_argument('-M', '--plot_methods', nargs='+', action='store', + help='list of 2 plot methods (only two) among:\ + Counts, Max, Mean, Median, Coverage and Size') args = the_parser.parse_args() return args class Map: - def __init__(self, bam_file, sample, computeSize=False): + def __init__(self, bam_file, sample): self.sample_name = sample self.bam_object = pysam.AlignmentFile(bam_file, 'rb') self.chromosomes = dict(zip(self.bam_object.references, self.bam_object.lengths)) self.map_dict = self.create_map(self.bam_object) - self.max = self.compute_max(self.map_dict) - self.mean = self.compute_mean(self.map_dict) - self.median = self.compute_median(self.map_dict) - self.coverage = self.compute_coverage(self.map_dict) - if computeSize: - self.size = self.compute_size(self.map_dict) def create_map(self, bam_object): ''' @@ -59,20 +55,33 @@ 'F')].append(read.query_alignment_length) return map_dictionary - def compute_max(self, map_dictionary): + def compute_map(self, map_dictionary, out): + ''' + takes a map_dictionary as input and writes + a readmap_dictionary {(chromosome,read_position,polarity): + number_of_reads} + in an open file handler out ''' - takes a map_dictionary as input and returns + readmap_dictionary = dict() + for key in map_dictionary: + readmap_dictionary[key] = len(map_dictionary[key]) + self.write_table(readmap_dictionary, out) + + def compute_max(self, map_dictionary, out): + ''' + takes a map_dictionary as input and writes a max_dictionary {(chromosome,read_position,polarity): max_of_number_of_read_at_any_position} + Not clear this function is still required ''' merge_keylist = [(i[0], 0) for i in map_dictionary.keys()] max_dictionary = dict(merge_keylist) for key in map_dictionary: if len(map_dictionary[key]) > max_dictionary[key[0]]: max_dictionary[key[0]] = len(map_dictionary[key]) - return max_dictionary + self.write_table(max_dictionary, out) - def compute_mean(self, map_dictionary): + def compute_mean(self, map_dictionary, out): ''' takes a map_dictionary as input and returns a mean_dictionary {(chromosome,read_position,polarity): @@ -85,9 +94,9 @@ else: mean_dictionary[key] = round(numpy.mean(map_dictionary[key]), 1) - return mean_dictionary + self.write_table(mean_dictionary, out) - def compute_median(self, map_dictionary): + def compute_median(self, map_dictionary, out): ''' takes a map_dictionary as input and returns a mean_dictionary {(chromosome,read_position,polarity): @@ -99,9 +108,9 @@ median_dictionary[key] = 0 else: median_dictionary[key] = numpy.median(map_dictionary[key]) - return median_dictionary + self.write_table(median_dictionary, out) - def compute_coverage(self, map_dictionary, quality=10): + def compute_coverage(self, map_dictionary, out, quality=10): ''' takes a map_dictionary as input and returns a coverage_dictionary {(chromosome,read_position,polarity): @@ -120,10 +129,9 @@ """ Add the 4 coverage values """ coverage = [sum(x) for x in zip(*coverage)] coverage_dictionary[key] = coverage[0] - # coverage_dictionary[(key[0], key[1], 'R')] = coverage - return coverage_dictionary + self.write_table(coverage_dictionary, out) - def compute_size(self, map_dictionary): + def compute_size(self, map_dictionary, out): ''' Takes a map_dictionary and returns a dictionary of sizes: {chrom: {polarity: {size: nbre of reads}}} @@ -137,65 +145,66 @@ for key in map_dictionary: for size in map_dictionary[key]: size_dictionary[key[0]][key[2]][size] += 1 - return size_dictionary + self.write_size_table(size_dictionary, out) - def write_size_table(self, out): + def write_table(self, mapdict, out): ''' - Dataset, Chromosome, Polarity, Size, Nbr_reads + Generic writer + Dataset, Chromosome, Chrom_length, Coordinate, Polarity, + <some mapped value> out is an *open* file handler ''' - for chrom in sorted(self.size): - sizes = self.size[chrom]['F'].keys() - sizes.extend(self.size[chrom]['R'].keys()) - for polarity in sorted(self.size[chrom]): + for key in sorted(mapdict): + line = [self.sample_name, key[0], self.chromosomes[key[0]], + key[1], key[2], mapdict[key]] + line = [str(i) for i in line] + out.write('\t'.join(line) + '\n') + + def write_size_table(self, sizedic, out): + ''' + Generic writer of summary values + Dataset, Chromosome, Chrom_length, <some category>, <some value> + out is an *open* file handler + ''' + for chrom in sorted(sizedic): + sizes = sizedic[chrom]['F'].keys() + sizes.extend(sizedic[chrom]['R'].keys()) + for polarity in sorted(sizedic[chrom]): for size in range(min(sizes), max(sizes)+1): try: line = [self.sample_name, chrom, polarity, size, - self.size[chrom][polarity][size]] + sizedic[chrom][polarity][size]] except KeyError: line = [self.sample_name, chrom, polarity, size, 0] line = [str(i) for i in line] out.write('\t'.join(line) + '\n') - def write_table(self, out): - ''' - Dataset, Chromosome, Chrom_length, Coordinate, Nbr_reads - Polarity, Max, Mean, Median, Coverage - out is an *open* file handler - ''' - for key in sorted(self.map_dict): - line = [self.sample_name, key[0], self.chromosomes[key[0]], - key[1], len(self.map_dict[key]), key[2], self.max[key[0]], - self.mean[key], self.median[key], self.coverage[key]] - line = [str(i) for i in line] - out.write('\t'.join(line) + '\n') - -def main(inputs, samples, file_out, size_file_out=''): - F = open(file_out, 'w') - header = ["Dataset", "Chromosome", "Chrom_length", "Coordinate", - "Nbr_reads", "Polarity", "Max", "Mean", "Median", "Coverage"] - F.write('\t'.join(header) + '\n') - if size_file_out: - Fs = open(size_file_out, 'w') - header = ["Dataset", "Chromosome", "Polarity", "Size", "Nbr_reads"] - Fs.write('\t'.join(header) + '\n') - for file, sample in zip(inputs, samples): - mapobj = Map(file, sample, computeSize=True) - mapobj.write_table(F) - mapobj.write_size_table(Fs) - Fs.close() - else: - for file, sample in zip(inputs, samples): - mapobj = Map(file, sample, computeSize=False) - mapobj.write_table(F) +def main(inputs, samples, methods, outputs): + for method, output in zip(methods, outputs): + F = open(output, 'w') + if method == 'Size': + header = ["Dataset", "Chromosome", "Polarity", method, "Count"] + else: + header = ["Dataset", "Chromosome", "Chrom_length", "Coordinate", + "Polarity", method] + F.write('\t'.join(header) + '\n') + for input, sample in zip(inputs, samples): + mapobj = Map(input, sample) + token = {"Counts": mapobj.compute_map, + "Max": mapobj.compute_max, + "Mean": mapobj.compute_mean, + "Median": mapobj.compute_median, + "Coverage": mapobj.compute_coverage, + "Size": mapobj.compute_size} + token[method](mapobj.map_dict, F) F.close() if __name__ == "__main__": args = Parser() # if identical sample names - if len(set(args.sample_name)) != len(args.sample_name): - args.sample_name = [name + '_' + str(i) for - i, name in enumerate(args.sample_name)] - main(args.input, args.sample_name, args.output, args.sizes) + if len(set(args.sample_names)) != len(args.sample_names): + args.sample_names = [name + '_' + str(i) for + i, name in enumerate(args.sample_names)] + main(args.inputs, args.sample_names, args.plot_methods, args.outputs)