Mercurial > repos > iuc > fasta_stats
view fasta-stats.py @ 4:0dbb995c7d35 draft default tip
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/fasta_stats/ commit 50f5cce5a8c11001e2c59600a2b99a4243b6d06f"
author | iuc |
---|---|
date | Thu, 18 Nov 2021 20:56:57 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python # python version of fasta-stats with some extra features # written by anmol.kiran@gmail.com # git: @codemeleon # date: 10/11/2021 import argparse import re from os import path import numpy as np from Bio import SeqIO def calculate_NG50(estimated_genome, total_length, sequence_lengths): temp = 0 teoretical_NG50 = estimated_genome / 2.0 NG50 = 0 for seq in sequence_lengths: temp += seq if teoretical_NG50 < temp: NG50 = seq break return NG50 def run(fasta, stats_output, gaps_output, genome_size): """Generates scaffold statistics.""" if not fasta: exit("Input file not given.") if not path.isfile(fasta): exit(f"{fasta} path does not exist.") seq_len = {} bases_global = {"A": 0, "N": 0, "T": 0, "C": 0, "G": 0} bases_seq = {} seq_id_Ngaprange = {} nstart = 0 contigs_len = [] gap_count = 0 for seq_record in SeqIO.parse(fasta, "fasta"): seq = str(seq_record.seq).upper() # print(len(seq)) seq_len[seq_record.id] = len(seq) # NOTE: Nucleotide count bases_seq[seq_record.id] = { "A": seq.count("A"), "N": seq.count("N"), "T": seq.count("T"), "C": seq.count("C"), "G": seq.count("G"), } bases_global["A"] += bases_seq[seq_record.id]["A"] bases_global["N"] += bases_seq[seq_record.id]["N"] bases_global["T"] += bases_seq[seq_record.id]["T"] bases_global["C"] += bases_seq[seq_record.id]["C"] bases_global["G"] += bases_seq[seq_record.id]["G"] # NOTE: Gap count and their range range_gen = re.finditer("N+", seq) n_range = [match.span() for match in range_gen] for n_rng in n_range: if n_rng[0] == 0 or n_rng[1] == seq_len[seq_record.id]: continue else: gap_count += 1 # NOTE: Contigs, their lenths from scaffold and their N gap range seq_id_Ngaprange[seq_record.id] = n_range n_range_len = len(n_range) if n_range_len > 0: n_range = ( [(0, 0)] + n_range + [(seq_len[seq_record.id], seq_len[seq_record.id])] ) for idx in range(n_range_len + 1): nstart = n_range[idx][1] nend = n_range[idx + 1][0] con_len = nend - nstart if con_len: contigs_len.append(con_len) else: contigs_len.append(len(seq)) # NOTE: Scaffold statistics SEQ_LEN_LIST = sorted(seq_len.values(), reverse=True) scaffold_lens = np.array(SEQ_LEN_LIST) scaffold_lens_sum = np.cumsum(scaffold_lens) N50_len = scaffold_lens_sum[-1] * 0.5 N50_idx = np.where(scaffold_lens_sum > N50_len)[0][0] N90_len = scaffold_lens_sum[-1] * 0.9 N90_idx = np.where(scaffold_lens_sum > N90_len)[0][0] NG50 = calculate_NG50(genome_size, scaffold_lens_sum[-1], scaffold_lens) # NOTE: Contig statistics seq_len_list = sorted(contigs_len, reverse=True) contigs_len = np.array(seq_len_list) contigs_len_sum = np.cumsum(contigs_len) n50_len = contigs_len_sum[-1] * 0.5 n50_idx = np.where(contigs_len_sum > n50_len)[0][0] n90_len = contigs_len_sum[-1] * 0.9 n90_idx = np.where(contigs_len_sum > n90_len)[0][0] ng50 = calculate_NG50(genome_size, contigs_len_sum[-1], contigs_len) with open(stats_output, "w") as soutput: soutput.write("{}\t{}\n".format("Scaffold L50", N50_idx + 1)) soutput.write("{}\t{}\n".format("Scaffold N50", SEQ_LEN_LIST[N50_idx])) soutput.write("{}\t{}\n".format("Scaffold L90", N90_idx + 1)) soutput.write("{}\t{}\n".format("Scaffold N90", SEQ_LEN_LIST[N90_idx])) if genome_size != 0: soutput.write("{}\t{}\n".format("Scaffold NG50", NG50)) soutput.write("{}\t{}\n".format("Scaffold len_max", SEQ_LEN_LIST[0])) soutput.write("{}\t{}\n".format("Scaffold len_min", SEQ_LEN_LIST[-1])) soutput.write( "{}\t{}\n".format("Scaffold len_mean", int(np.mean(SEQ_LEN_LIST))) ) soutput.write( "{}\t{}\n".format("Scaffold len_median", int(np.median(SEQ_LEN_LIST))) ) soutput.write("{}\t{}\n".format("Scaffold len_std", int(np.std(SEQ_LEN_LIST)))) soutput.write("{}\t{}\n".format("Scaffold num_A", bases_global["A"])) soutput.write("{}\t{}\n".format("Scaffold num_T", bases_global["T"])) soutput.write("{}\t{}\n".format("Scaffold num_C", bases_global["C"])) soutput.write("{}\t{}\n".format("Scaffold num_G", bases_global["G"])) soutput.write("{}\t{}\n".format("Scaffold num_N", bases_global["N"])) soutput.write("{}\t{}\n".format("Scaffold num_bp", scaffold_lens_sum[-1])) soutput.write( "{}\t{}\n".format( "Scaffold num_bp_not_N", scaffold_lens_sum[-1] - bases_global["N"] ) ) soutput.write("{}\t{}\n".format("Scaffold num_seq", len(SEQ_LEN_LIST))) soutput.write( "{}\t{:.2f}\n".format( "Scaffold GC content overall", ( (bases_global["G"] + bases_global["C"]) * 100.0 / scaffold_lens_sum[-1] ), ) ) soutput.write("{}\t{}\n".format("Contig L50", n50_idx + 1)) soutput.write("{}\t{}\n".format("Contig N50", seq_len_list[n50_idx])) soutput.write("{}\t{}\n".format("Contig L90", n90_idx + 1)) soutput.write("{}\t{}\n".format("Contig N90", seq_len_list[n90_idx])) if genome_size != 0: soutput.write("{}\t{}\n".format("Contig NG50", ng50)) soutput.write("{}\t{}\n".format("Contig len_max", seq_len_list[0])) soutput.write("{}\t{}\n".format("Contig len_min", seq_len_list[-1])) soutput.write("{}\t{}\n".format("Contig len_mean", int(np.mean(seq_len_list)))) soutput.write( "{}\t{}\n".format("Contig len_median", int(np.median(seq_len_list))) ) soutput.write("{}\t{}\n".format("Contig len_std", int(np.std(seq_len_list)))) soutput.write("{}\t{}\n".format("Contig num_bp", contigs_len_sum[-1])) soutput.write("{}\t{}\n".format("Contig num_seq", len(contigs_len_sum))) soutput.write("{}\t{}\n".format("Number of gaps", gap_count)) if gaps_output is not None: # NOTE: generate gaps statistics file with open(gaps_output, "w") as goutput: for key in seq_id_Ngaprange: for rng in seq_id_Ngaprange[key]: goutput.write("{}\t{}\t{}\n".format(key, rng[0], rng[1])) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-f", "--fasta", required=True, help="FASTA file") parser.add_argument( "-z", "--genome_size", required=False, type=int, help="If provided, the NG50 statistic will be computed", default=0, ) parser.add_argument( "-s", "--stats_output", required=True, help="File to store the general statistics", ) parser.add_argument( "-r", "--gaps_output", required=False, help="File to store the gaps statistics", default=None, ) args = parser.parse_args() run( args.fasta, args.stats_output, args.gaps_output, args.genome_size, )