Mercurial > repos > iuc > fasta_stats
diff fasta-stats.py @ 4:0dbb995c7d35 draft default tip
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/fasta_stats/ commit 50f5cce5a8c11001e2c59600a2b99a4243b6d06f"
author | iuc |
---|---|
date | Thu, 18 Nov 2021 20:56:57 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fasta-stats.py Thu Nov 18 20:56:57 2021 +0000 @@ -0,0 +1,201 @@ +#!/usr/bin/env python + + +# python version of fasta-stats with some extra features +# written by anmol.kiran@gmail.com +# git: @codemeleon +# date: 10/11/2021 + +import argparse +import re +from os import path + +import numpy as np +from Bio import SeqIO + + +def calculate_NG50(estimated_genome, total_length, sequence_lengths): + temp = 0 + teoretical_NG50 = estimated_genome / 2.0 + NG50 = 0 + for seq in sequence_lengths: + temp += seq + if teoretical_NG50 < temp: + NG50 = seq + break + return NG50 + + +def run(fasta, stats_output, gaps_output, genome_size): + """Generates scaffold statistics.""" + if not fasta: + exit("Input file not given.") + if not path.isfile(fasta): + exit(f"{fasta} path does not exist.") + + seq_len = {} + bases_global = {"A": 0, "N": 0, "T": 0, "C": 0, "G": 0} + bases_seq = {} + seq_id_Ngaprange = {} + nstart = 0 + contigs_len = [] + gap_count = 0 + for seq_record in SeqIO.parse(fasta, "fasta"): + seq = str(seq_record.seq).upper() + # print(len(seq)) + seq_len[seq_record.id] = len(seq) + + # NOTE: Nucleotide count + bases_seq[seq_record.id] = { + "A": seq.count("A"), + "N": seq.count("N"), + "T": seq.count("T"), + "C": seq.count("C"), + "G": seq.count("G"), + } + bases_global["A"] += bases_seq[seq_record.id]["A"] + bases_global["N"] += bases_seq[seq_record.id]["N"] + bases_global["T"] += bases_seq[seq_record.id]["T"] + bases_global["C"] += bases_seq[seq_record.id]["C"] + bases_global["G"] += bases_seq[seq_record.id]["G"] + + # NOTE: Gap count and their range + range_gen = re.finditer("N+", seq) + n_range = [match.span() for match in range_gen] + for n_rng in n_range: + if n_rng[0] == 0 or n_rng[1] == seq_len[seq_record.id]: + continue + else: + gap_count += 1 + + # NOTE: Contigs, their lenths from scaffold and their N gap range + seq_id_Ngaprange[seq_record.id] = n_range + n_range_len = len(n_range) + if n_range_len > 0: + n_range = ( + [(0, 0)] + n_range + [(seq_len[seq_record.id], seq_len[seq_record.id])] + ) + for idx in range(n_range_len + 1): + nstart = n_range[idx][1] + nend = n_range[idx + 1][0] + con_len = nend - nstart + if con_len: + contigs_len.append(con_len) + else: + contigs_len.append(len(seq)) + + # NOTE: Scaffold statistics + SEQ_LEN_LIST = sorted(seq_len.values(), reverse=True) + scaffold_lens = np.array(SEQ_LEN_LIST) + scaffold_lens_sum = np.cumsum(scaffold_lens) + N50_len = scaffold_lens_sum[-1] * 0.5 + N50_idx = np.where(scaffold_lens_sum > N50_len)[0][0] + N90_len = scaffold_lens_sum[-1] * 0.9 + N90_idx = np.where(scaffold_lens_sum > N90_len)[0][0] + NG50 = calculate_NG50(genome_size, scaffold_lens_sum[-1], scaffold_lens) + + # NOTE: Contig statistics + seq_len_list = sorted(contigs_len, reverse=True) + contigs_len = np.array(seq_len_list) + contigs_len_sum = np.cumsum(contigs_len) + n50_len = contigs_len_sum[-1] * 0.5 + n50_idx = np.where(contigs_len_sum > n50_len)[0][0] + n90_len = contigs_len_sum[-1] * 0.9 + n90_idx = np.where(contigs_len_sum > n90_len)[0][0] + ng50 = calculate_NG50(genome_size, contigs_len_sum[-1], contigs_len) + + with open(stats_output, "w") as soutput: + soutput.write("{}\t{}\n".format("Scaffold L50", N50_idx + 1)) + soutput.write("{}\t{}\n".format("Scaffold N50", SEQ_LEN_LIST[N50_idx])) + soutput.write("{}\t{}\n".format("Scaffold L90", N90_idx + 1)) + soutput.write("{}\t{}\n".format("Scaffold N90", SEQ_LEN_LIST[N90_idx])) + if genome_size != 0: + soutput.write("{}\t{}\n".format("Scaffold NG50", NG50)) + soutput.write("{}\t{}\n".format("Scaffold len_max", SEQ_LEN_LIST[0])) + soutput.write("{}\t{}\n".format("Scaffold len_min", SEQ_LEN_LIST[-1])) + soutput.write( + "{}\t{}\n".format("Scaffold len_mean", int(np.mean(SEQ_LEN_LIST))) + ) + soutput.write( + "{}\t{}\n".format("Scaffold len_median", int(np.median(SEQ_LEN_LIST))) + ) + soutput.write("{}\t{}\n".format("Scaffold len_std", int(np.std(SEQ_LEN_LIST)))) + soutput.write("{}\t{}\n".format("Scaffold num_A", bases_global["A"])) + soutput.write("{}\t{}\n".format("Scaffold num_T", bases_global["T"])) + soutput.write("{}\t{}\n".format("Scaffold num_C", bases_global["C"])) + soutput.write("{}\t{}\n".format("Scaffold num_G", bases_global["G"])) + soutput.write("{}\t{}\n".format("Scaffold num_N", bases_global["N"])) + soutput.write("{}\t{}\n".format("Scaffold num_bp", scaffold_lens_sum[-1])) + soutput.write( + "{}\t{}\n".format( + "Scaffold num_bp_not_N", scaffold_lens_sum[-1] - bases_global["N"] + ) + ) + soutput.write("{}\t{}\n".format("Scaffold num_seq", len(SEQ_LEN_LIST))) + soutput.write( + "{}\t{:.2f}\n".format( + "Scaffold GC content overall", + ( + (bases_global["G"] + bases_global["C"]) + * 100.0 + / scaffold_lens_sum[-1] + ), + ) + ) + + soutput.write("{}\t{}\n".format("Contig L50", n50_idx + 1)) + soutput.write("{}\t{}\n".format("Contig N50", seq_len_list[n50_idx])) + soutput.write("{}\t{}\n".format("Contig L90", n90_idx + 1)) + soutput.write("{}\t{}\n".format("Contig N90", seq_len_list[n90_idx])) + if genome_size != 0: + soutput.write("{}\t{}\n".format("Contig NG50", ng50)) + soutput.write("{}\t{}\n".format("Contig len_max", seq_len_list[0])) + soutput.write("{}\t{}\n".format("Contig len_min", seq_len_list[-1])) + soutput.write("{}\t{}\n".format("Contig len_mean", int(np.mean(seq_len_list)))) + soutput.write( + "{}\t{}\n".format("Contig len_median", int(np.median(seq_len_list))) + ) + soutput.write("{}\t{}\n".format("Contig len_std", int(np.std(seq_len_list)))) + soutput.write("{}\t{}\n".format("Contig num_bp", contigs_len_sum[-1])) + soutput.write("{}\t{}\n".format("Contig num_seq", len(contigs_len_sum))) + soutput.write("{}\t{}\n".format("Number of gaps", gap_count)) + if gaps_output is not None: + # NOTE: generate gaps statistics file + with open(gaps_output, "w") as goutput: + for key in seq_id_Ngaprange: + for rng in seq_id_Ngaprange[key]: + goutput.write("{}\t{}\t{}\n".format(key, rng[0], rng[1])) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-f", "--fasta", required=True, help="FASTA file") + parser.add_argument( + "-z", + "--genome_size", + required=False, + type=int, + help="If provided, the NG50 statistic will be computed", + default=0, + ) + parser.add_argument( + "-s", + "--stats_output", + required=True, + help="File to store the general statistics", + ) + parser.add_argument( + "-r", + "--gaps_output", + required=False, + help="File to store the gaps statistics", + default=None, + ) + args = parser.parse_args() + + run( + args.fasta, + args.stats_output, + args.gaps_output, + args.genome_size, + )