comparison spamr_vet_tools_v2/quast_get_fasta.py @ 2:d7b099fbb003 draft default tip

Corrected file names and updated tool wrappers for consistency.
author maciek
date Tue, 25 Mar 2025 13:35:00 +0000
parents
children
comparison
equal deleted inserted replaced
1:e57a908b9d3d 2:d7b099fbb003
1 import json
2 import csv
3 import sys
4 import os
5
6 def extract_software_data(json_data, software_name):
7 """
8 Extract QUAST data from JSON and create a CSV with assembly metrics.
9 For "quast", include specific columns and calculate a Filter_N50 based on "N50".
10 """
11 # Ensure json_data is a dictionary
12 if isinstance(json_data, list):
13 json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == software_name), None)
14
15 if not isinstance(json_data, dict):
16 print(f"Invalid JSON format for {software_name} extraction.")
17 return
18
19 results = json_data.get("results", [])
20 extracted_data = []
21 headers = [
22 "Assembly",
23 "contigs_(>=_0_bp)",
24 "contigs_(>=_1000_bp)",
25 "Total_length_(>=_0_bp)",
26 "Total_length_(>=_1000_bp)",
27 "contigs",
28 "Largest_contig",
29 "Total_length",
30 "GC",
31 "N50",
32 "Filter_N50",
33 "N90",
34 "auN",
35 "L50",
36 "L90",
37 "total_reads",
38 "left",
39 "right",
40 "Mapped",
41 "Properly_paired",
42 "Avg._coverage_depth",
43 "Coverage_>=_1x",
44 "N's_per_100_kbp"
45 ]
46 output_csv_file = f"{software_name}_output.csv"
47
48 for entry in results:
49 if "content" in entry and isinstance(entry["content"], list):
50 for content_item in entry["content"]:
51 n50 = content_item.get("N50", "")
52 try:
53 n50_value = float(n50) if n50 else 0
54 filter_n50 = "pass" if n50_value > 20000 else "fail"
55 except ValueError:
56 filter_n50 = "fail" # If the value is non-numeric, consider it as "fail"
57
58 extracted_data.append({
59 "Assembly": content_item.get("Assembly", ""),
60 "contigs_(>=_0_bp)": content_item.get("contigs_(>=_0_bp)", ""),
61 "contigs_(>=_1000_bp)": content_item.get("contigs_(>=_1000_bp)", ""),
62 "Total_length_(>=_0_bp)": content_item.get("Total_length_(>=_0_bp)", ""),
63 "Total_length_(>=_1000_bp)": content_item.get("Total_length_(>=_1000_bp)", ""),
64 "contigs": content_item.get("contigs", ""),
65 "Largest_contig": content_item.get("Largest_contig", ""),
66 "Total_length": content_item.get("Total_length", ""),
67 "GC": content_item.get("GC", ""),
68 "N50": content_item.get("N50", ""),
69 "Filter_N50": filter_n50,
70 "N90": content_item.get("N90", ""),
71 "auN": content_item.get("auN", ""),
72 "L50": content_item.get("L50", ""),
73 "L90": content_item.get("L90", ""),
74 "total_reads": content_item.get("total_reads", ""),
75 "left": content_item.get("left", ""),
76 "right": content_item.get("right", ""),
77 "Mapped": content_item.get("Mapped", ""),
78 "Properly_paired": content_item.get("Properly_paired", ""),
79 "Avg._coverage_depth": content_item.get("Avg._coverage_depth", ""),
80 "Coverage_>=_1x": content_item.get("Coverage_>=_1x", ""),
81 "N's_per_100_kbp": content_item.get("N's_per_100_kbp", "")
82 })
83
84 with open(output_csv_file, "w", newline="", encoding="utf-8") as f:
85 writer = csv.DictWriter(f, fieldnames=headers)
86 writer.writeheader()
87 writer.writerows(extracted_data)
88
89 print(f"CSV file successfully generated: {output_csv_file}")
90
91 def extract_contigs_to_fasta(json_data):
92 """
93 Extract contigs information from "shovill" and save it as a FASTA file.
94 """
95 if isinstance(json_data, list):
96 json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == "shovill"), None)
97
98 if not isinstance(json_data, dict):
99 print("Invalid JSON format for shovill extraction.")
100 return
101
102 results = json_data.get("results", [])
103 output_fasta_file = "shovill_contigs.fasta"
104
105 with open(output_fasta_file, "w", encoding="utf-8") as f:
106 for entry in results:
107 if "content" in entry and isinstance(entry["content"], list):
108 for content_item in entry["content"]:
109 name = content_item.get("name", "unknown")
110 length = content_item.get("length", "unknown")
111 coverage = content_item.get("coverage", "unknown")
112 sequence = content_item.get("sequence", "")
113
114 header = f">{name}_{length}_{coverage}"
115 f.write(f"{header}\n{sequence}\n")
116
117 print(f"FASTA file successfully generated: {output_fasta_file}")
118
119 if __name__ == "__main__":
120 if len(sys.argv) != 2:
121 print("Usage: python script.py input.json")
122 sys.exit(1)
123
124 input_json_file = sys.argv[1]
125
126 try:
127 with open(input_json_file, "r", encoding="utf-8") as file:
128 json_data = json.load(file)
129 extract_software_data(json_data, "quast")
130 extract_contigs_to_fasta(json_data)
131 sys.exit(0)
132 except Exception as e:
133 print(f"Error processing file: {e}")
134 sys.exit(1)