Mercurial > repos > maciek > spamr_vet_tools
diff spamr_vet_tools_v2/quast_get_fasta.py @ 2:d7b099fbb003 draft default tip
Corrected file names and updated tool wrappers for consistency.
author | maciek |
---|---|
date | Tue, 25 Mar 2025 13:35:00 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/spamr_vet_tools_v2/quast_get_fasta.py Tue Mar 25 13:35:00 2025 +0000 @@ -0,0 +1,134 @@ +import json +import csv +import sys +import os + +def extract_software_data(json_data, software_name): + """ + Extract QUAST data from JSON and create a CSV with assembly metrics. + For "quast", include specific columns and calculate a Filter_N50 based on "N50". + """ + # Ensure json_data is a dictionary + if isinstance(json_data, list): + json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == software_name), None) + + if not isinstance(json_data, dict): + print(f"Invalid JSON format for {software_name} extraction.") + return + + results = json_data.get("results", []) + extracted_data = [] + headers = [ + "Assembly", + "contigs_(>=_0_bp)", + "contigs_(>=_1000_bp)", + "Total_length_(>=_0_bp)", + "Total_length_(>=_1000_bp)", + "contigs", + "Largest_contig", + "Total_length", + "GC", + "N50", + "Filter_N50", + "N90", + "auN", + "L50", + "L90", + "total_reads", + "left", + "right", + "Mapped", + "Properly_paired", + "Avg._coverage_depth", + "Coverage_>=_1x", + "N's_per_100_kbp" + ] + output_csv_file = f"{software_name}_output.csv" + + for entry in results: + if "content" in entry and isinstance(entry["content"], list): + for content_item in entry["content"]: + n50 = content_item.get("N50", "") + try: + n50_value = float(n50) if n50 else 0 + filter_n50 = "pass" if n50_value > 20000 else "fail" + except ValueError: + filter_n50 = "fail" # If the value is non-numeric, consider it as "fail" + + extracted_data.append({ + "Assembly": content_item.get("Assembly", ""), + "contigs_(>=_0_bp)": content_item.get("contigs_(>=_0_bp)", ""), + "contigs_(>=_1000_bp)": content_item.get("contigs_(>=_1000_bp)", ""), + "Total_length_(>=_0_bp)": content_item.get("Total_length_(>=_0_bp)", ""), + "Total_length_(>=_1000_bp)": content_item.get("Total_length_(>=_1000_bp)", ""), + "contigs": content_item.get("contigs", ""), + "Largest_contig": content_item.get("Largest_contig", ""), + "Total_length": content_item.get("Total_length", ""), + "GC": content_item.get("GC", ""), + "N50": content_item.get("N50", ""), + "Filter_N50": filter_n50, + "N90": content_item.get("N90", ""), + "auN": content_item.get("auN", ""), + "L50": content_item.get("L50", ""), + "L90": content_item.get("L90", ""), + "total_reads": content_item.get("total_reads", ""), + "left": content_item.get("left", ""), + "right": content_item.get("right", ""), + "Mapped": content_item.get("Mapped", ""), + "Properly_paired": content_item.get("Properly_paired", ""), + "Avg._coverage_depth": content_item.get("Avg._coverage_depth", ""), + "Coverage_>=_1x": content_item.get("Coverage_>=_1x", ""), + "N's_per_100_kbp": content_item.get("N's_per_100_kbp", "") + }) + + with open(output_csv_file, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=headers) + writer.writeheader() + writer.writerows(extracted_data) + + print(f"CSV file successfully generated: {output_csv_file}") + +def extract_contigs_to_fasta(json_data): + """ + Extract contigs information from "shovill" and save it as a FASTA file. + """ + if isinstance(json_data, list): + json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == "shovill"), None) + + if not isinstance(json_data, dict): + print("Invalid JSON format for shovill extraction.") + return + + results = json_data.get("results", []) + output_fasta_file = "shovill_contigs.fasta" + + with open(output_fasta_file, "w", encoding="utf-8") as f: + for entry in results: + if "content" in entry and isinstance(entry["content"], list): + for content_item in entry["content"]: + name = content_item.get("name", "unknown") + length = content_item.get("length", "unknown") + coverage = content_item.get("coverage", "unknown") + sequence = content_item.get("sequence", "") + + header = f">{name}_{length}_{coverage}" + f.write(f"{header}\n{sequence}\n") + + print(f"FASTA file successfully generated: {output_fasta_file}") + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python script.py input.json") + sys.exit(1) + + input_json_file = sys.argv[1] + + try: + with open(input_json_file, "r", encoding="utf-8") as file: + json_data = json.load(file) + extract_software_data(json_data, "quast") + extract_contigs_to_fasta(json_data) + sys.exit(0) + except Exception as e: + print(f"Error processing file: {e}") + sys.exit(1)