Mercurial > repos > iuc > extract_genomic_dna
diff extract_genomic_dna.py @ 11:80414c33a59a draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/extract_genomic_dna commit 6db2d98b513e4980788fcba49d809c91e5750296
author | iuc |
---|---|
date | Thu, 21 Nov 2024 07:20:29 +0000 |
parents | e400dcbc60d0 |
children |
line wrap: on
line diff
--- a/extract_genomic_dna.py Fri Dec 06 15:24:00 2019 -0500 +++ b/extract_genomic_dna.py Thu Nov 21 07:20:29 2024 +0000 @@ -11,24 +11,47 @@ import extract_genomic_dna_utils as egdu # noqa: I100,I202 parser = argparse.ArgumentParser() -parser.add_argument('--input_format', dest='input_format', help="Input dataset format") -parser.add_argument('--input', dest='input', help="Input dataset") -parser.add_argument('--genome', dest='genome', help="Input dataset genome build") -parser.add_argument('--interpret_features', dest='interpret_features', default=None, help="Interpret features if input format is gff") -parser.add_argument('--columns', dest='columns', help="Columns to use in input file") -parser.add_argument('--reference_genome_source', dest='reference_genome_source', help="Source of reference genome file") -parser.add_argument('--reference_genome', dest='reference_genome', help="Reference genome file") -parser.add_argument('--output_format', dest='output_format', help="Output format") -parser.add_argument('--fasta_header_type', dest='fasta_header_type', default=None, help="Fasta header format") -parser.add_argument('--fasta_header_delimiter', dest='fasta_header_delimiter', default=None, help="Fasta header field delimiter") -parser.add_argument('--output', dest='output', help="Output dataset") +parser.add_argument("--input_format", dest="input_format", help="Input dataset format") +parser.add_argument("--input", dest="input", help="Input dataset") +parser.add_argument("--genome", dest="genome", help="Input dataset genome build") +parser.add_argument( + "--interpret_features", + dest="interpret_features", + default=None, + help="Interpret features if input format is gff", +) +parser.add_argument("--columns", dest="columns", help="Columns to use in input file") +parser.add_argument( + "--reference_genome_source", + dest="reference_genome_source", + help="Source of reference genome file", +) +parser.add_argument( + "--reference_genome", dest="reference_genome", help="Reference genome file" +) +parser.add_argument("--output_format", dest="output_format", help="Output format") +parser.add_argument( + "--fasta_header_type", + dest="fasta_header_type", + default=None, + help="Fasta header format", +) +parser.add_argument( + "--fasta_header_delimiter", + dest="fasta_header_delimiter", + default=None, + help="Fasta header field delimiter", +) +parser.add_argument("--output", dest="output", help="Output dataset") args = parser.parse_args() -input_is_gff = args.input_format == 'gff' +input_is_gff = args.input_format == "gff" interpret_features = input_is_gff and args.interpret_features == "yes" -if len(args.columns.split(',')) == 5: +if len(args.columns.split(",")) == 5: # Bed file. - chrom_col, start_col, end_col, strand_col, name_col = egdu.parse_cols_arg(args.columns) + chrom_col, start_col, end_col, strand_col, name_col = egdu.parse_cols_arg( + args.columns + ) else: # Gff file. chrom_col, start_col, end_col, strand_col = egdu.parse_cols_arg(args.columns) @@ -47,13 +70,13 @@ first_invalid_line = 0 invalid_lines = [] warnings = [] -warning = '' +warning = "" twobitfile = None line_count = 1 file_iterator = open(args.input) if interpret_features: file_iterator = egdu.GFFReaderWrapper(file_iterator, fix_strand=False) -out = open(args.output, 'wt') +out = open(args.output, "wt") for feature in file_iterator: # Ignore comments, headers. @@ -70,9 +93,9 @@ strand = feature.strand else: # Processing lines, either interval or GFF format. - line = feature.rstrip('\r\n') + line = feature.rstrip("\r\n") if line and not line.startswith("#"): - fields = line.split('\t') + fields = line.split("\t") try: chrom = fields[chrom_col] start = int(fields[start_col]) @@ -99,9 +122,9 @@ first_invalid_line = line_count skipped_lines += len(invalid_lines) continue - if strand not in ['+', '-']: - strand = '+' - sequence = '' + if strand not in ["+", "-"]: + strand = "+" + sequence = "" else: continue # Open sequence file and get sequence for feature/interval. @@ -109,11 +132,16 @@ if chrom in nibs: nib = nibs[chrom] else: - nibs[chrom] = nib = bx.seq.nib.NibFile(open("%s/%s.nib" % (seq_path, chrom))) + nibs[chrom] = nib = bx.seq.nib.NibFile( + open("%s/%s.nib" % (seq_path, chrom)) + ) try: sequence = nib.get(start, end - start) - except Exception as e: - warning = "Unable to fetch the sequence from '%d' to '%d' for build '%s'. " % (start, end - start, args.genome) + except Exception: + warning = ( + "Unable to fetch the sequence from '%d' to '%d' for build '%s'. " + % (start, end - start, args.genome) + ) warnings.append(warning) if not invalid_lines: invalid_lines = egdu.get_lines(feature) @@ -121,18 +149,23 @@ skipped_lines += len(invalid_lines) continue elif os.path.isfile(seq_path): - if not(twobitfile): + if not (twobitfile): twobitfile = bx.seq.twobit.TwoBitFile(open(seq_path)) try: if interpret_features: # Create sequence from intervals within a feature. - sequence = '' + sequence = "" for interval in feature.intervals: - sequence += twobitfile[interval.chrom][interval.start:interval.end] + sequence += twobitfile[interval.chrom][ + interval.start: interval.end + ] else: sequence = twobitfile[chrom][start:end] except Exception: - warning = "Unable to fetch the sequence from '%d' to '%d' for chrom '%s'. " % (start, end - start, chrom) + warning = ( + "Unable to fetch the sequence from '%d' to '%d' for chrom '%s'. " + % (start, end - start, chrom) + ) warnings.append(warning) if not invalid_lines: invalid_lines = egdu.get_lines(feature) @@ -140,15 +173,21 @@ skipped_lines += len(invalid_lines) continue else: - warning = "Chromosome by name '%s' was not found for build '%s'. " % (chrom, args.genome) + warning = "Chromosome by name '%s' was not found for build '%s'. " % ( + chrom, + args.genome, + ) warnings.append(warning) if not invalid_lines: invalid_lines = egdu.get_lines(feature) first_invalid_line = line_count skipped_lines += len(invalid_lines) continue - if sequence == '': - warning = "Chrom: '%s', start: '%d', end: '%d' is either invalid or not present in build '%s'. " % (chrom, start, end, args.genome) + if sequence == "": + warning = ( + "Chrom: '%s', start: '%d', end: '%d' is either invalid or not present in build '%s'. " + % (chrom, start, end, args.genome) + ) warnings.append(warning) if not invalid_lines: invalid_lines = egdu.get_lines(feature) @@ -161,15 +200,18 @@ if input_is_gff: start, end = egdu.convert_bed_coords_to_gff([start, end]) if args.fasta_header_type == "bedtools_getfasta_default": - out.write(">%s\n" % egdu.get_bedtools_getfasta_default_header(str(chrom), - str(start), - str(end), - strand, - includes_strand_col)) + out.write( + ">%s\n" + % egdu.get_bedtools_getfasta_default_header( + str(chrom), str(start), str(end), strand, includes_strand_col + ) + ) else: # args.fasta_header_type == "char_delimited": fields = [args.genome, str(chrom), str(start), str(end), strand] - field_delimiter = egdu.get_fasta_header_delimiter(args.fasta_header_delimiter) + field_delimiter = egdu.get_fasta_header_delimiter( + args.fasta_header_delimiter + ) meta_data = field_delimiter.join(fields) if name.strip(): out.write(">%s %s\n" % (meta_data, name)) @@ -184,20 +226,24 @@ else: # output_format == "interval". if interpret_features: - meta_data = "\t".join([feature.chrom, - "galaxy_extract_genomic_dna", - "interval", - str(feature.start), - str(feature.end), - feature.score, - feature.strand, - ".", - egdu.gff_attributes_to_str(feature.attributes, "GTF")]) + meta_data = "\t".join( + [ + feature.chrom, + "galaxy_extract_genomic_dna", + "interval", + str(feature.start), + str(feature.end), + feature.score, + feature.strand, + ".", + egdu.gff_attributes_to_str(feature.attributes, "GTF"), + ] + ) else: # Here fields was set up around line 73. meta_data = "\t".join(fields) if input_is_gff: - format_str = "%s seq \"%s\";\n" + format_str = '%s seq "%s";\n' else: format_str = "%s\t%s\n" out.write(format_str % (meta_data, str(sequence))) @@ -214,7 +260,10 @@ print(warn_msg) if skipped_lines: # Error message includes up to the first 10 skipped lines. - print('Skipped %d invalid lines, 1st is #%d, "%s"' % (skipped_lines, first_invalid_line, '\n'.join(invalid_lines[:10]))) + print( + 'Skipped %d invalid lines, 1st is #%d, "%s"' + % (skipped_lines, first_invalid_line, "\n".join(invalid_lines[:10])) + ) if args.reference_genome_source == "history": os.remove(seq_path)