# HG changeset patch
# User maciek
# Date 1742909700 0
# Node ID d7b099fbb003ad9b03b4ed8901620dceda75c15f
# Parent e57a908b9d3daecd31be07f03a304e215fbd845d
Corrected file names and updated tool wrappers for consistency.
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/1_qualitty_script_fastp_bracken_v2.py
--- a/spamr_vet_tools_v2/1_qualitty_script_fastp_bracken_v2.py Tue Feb 25 14:14:39 2025 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,134 +0,0 @@
-import json
-import csv
-import sys
-import os
-
-def extract_software_data(json_data, software_name):
- """
- Extract data for a specific software from the JSON input
-
- For "bracken", add a "contamination" column where the value is "pass"
- if fraction_total_reads > 0.6, otherwise "fail".
-
- For "fastp", include only specific columns.
- """
- # Ensure json_data is a dictionary
- if isinstance(json_data, list):
- json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == software_name), None)
-
- if not isinstance(json_data, dict):
- print(f"Invalid JSON format for {software_name} extraction.")
- return
-
- results = json_data.get("results", [])
- extracted_data = []
- headers = [] # Use list to collect headers to maintain order
- output_csv_file = f"{software_name}_output.csv"
-
- # Define specific columns for "fastp"
- fastp_columns = [
- "summary_sequencing",
- "summary_before_filtering_total_reads",
- "summary_before_filtering_total_bases",
- "summary_before_filtering_q20_bases",
- "summary_before_filtering_q30_bases",
- "summary_before_filtering_q20_rate",
- "summary_before_filtering_q30_rate",
- "summary_before_filtering_read1_mean_length",
- "summary_before_filtering_read2_mean_length",
- "summary_before_filtering_gc_content",
- "summary_after_filtering_total_reads",
- "summary_after_filtering_total_bases",
- "summary_after_filtering_q20_bases",
- "summary_after_filtering_q30_bases",
- "summary_after_filtering_q20_rate",
- "summary_after_filtering_q30_rate",
- "summary_after_filtering_read1_mean_length",
- "summary_after_filtering_read2_mean_length",
- "summary_after_filtering_gc_content",
- "filtering_result_passed_filter_reads",
- "filtering_result_low_quality_reads",
- "filtering_result_too_many_N_reads",
- "filtering_result_too_short_reads",
- "filtering_result_too_long_reads",
- "duplication_rate",
- "insert_size_peak",
- ]
-
- for entry in results:
- if "content" in entry and isinstance(entry["content"], list):
- for content_item in entry["content"]:
- row_data = {}
- if software_name == "fastp":
- for key, value in content_item.items():
- if isinstance(value, dict):
- for sub_key, sub_value in value.items():
- if isinstance(sub_value, dict):
- for sub_sub_key, sub_sub_value in sub_value.items():
- column_name = f"{key}_{sub_key}_{sub_sub_key}"
- if column_name in fastp_columns:
- row_data[column_name] = sub_sub_value
- if column_name not in headers:
- headers.append(column_name)
- else:
- column_name = f"{key}_{sub_key}"
- if column_name in fastp_columns:
- row_data[column_name] = sub_value
- if column_name not in headers:
- headers.append(column_name)
- else:
- if key in fastp_columns:
- row_data[key] = value
- if key not in headers:
- headers.append(key)
- elif software_name == "bracken":
- for key, value in content_item.items():
- if isinstance(value, dict):
- for sub_key, sub_value in value.items():
- column_name = f"{key}_{sub_key}"
- row_data[column_name] = sub_value
- if column_name not in headers:
- headers.append(column_name)
- else:
- row_data[key] = value
- if key not in headers:
- headers.append(key)
-
- # Add contamination column for "bracken"
- fraction_total_reads = row_data.get("fraction_total_reads", 0)
- row_data["contamination"] = "pass" if float(fraction_total_reads) > 0.6 else "fail"
- if "contamination" not in headers:
- headers.append("contamination")
-
- extracted_data.append(row_data)
-
- if not extracted_data:
- print(f"No data extracted for {software_name}")
- # Create empty file to prevent Galaxy error
- with open(output_csv_file, "w", newline="", encoding="utf-8") as f:
- f.write("No data available\n")
- return
-
- with open(output_csv_file, "w", newline="", encoding="utf-8") as f:
- writer = csv.DictWriter(f, fieldnames=headers)
- writer.writeheader()
- writer.writerows(extracted_data)
-
- print(f"CSV file successfully generated: {output_csv_file}")
-
-if __name__ == "__main__":
- if len(sys.argv) != 2:
- print("Usage: python extract_software_data.py input.json")
- sys.exit(1)
-
- input_json_file = sys.argv[1]
-
- try:
- with open(input_json_file, "r", encoding="utf-8") as file:
- json_data = json.load(file)
- extract_software_data(json_data, "fastp")
- extract_software_data(json_data, "bracken")
- sys.exit(0)
- except Exception as e:
- print(f"Error processing file: {e}")
- sys.exit(1)
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/1_qualitty_script_fastp_bracken_v2.xml
--- a/spamr_vet_tools_v2/1_qualitty_script_fastp_bracken_v2.xml Tue Feb 25 14:14:39 2025 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,43 +0,0 @@
-
-
- Quality control using FastP and Bracken
-
-
- python
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/2_quast_get_fasta_v2.py
--- a/spamr_vet_tools_v2/2_quast_get_fasta_v2.py Tue Feb 25 14:14:39 2025 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,134 +0,0 @@
-import json
-import csv
-import sys
-import os
-
-def extract_software_data(json_data, software_name):
- """
- Extract QUAST data from JSON and create a CSV with assembly metrics.
- For "quast", include specific columns and calculate a Filter_N50 based on "N50".
- """
- # Ensure json_data is a dictionary
- if isinstance(json_data, list):
- json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == software_name), None)
-
- if not isinstance(json_data, dict):
- print(f"Invalid JSON format for {software_name} extraction.")
- return
-
- results = json_data.get("results", [])
- extracted_data = []
- headers = [
- "Assembly",
- "contigs_(>=_0_bp)",
- "contigs_(>=_1000_bp)",
- "Total_length_(>=_0_bp)",
- "Total_length_(>=_1000_bp)",
- "contigs",
- "Largest_contig",
- "Total_length",
- "GC",
- "N50",
- "Filter_N50",
- "N90",
- "auN",
- "L50",
- "L90",
- "total_reads",
- "left",
- "right",
- "Mapped",
- "Properly_paired",
- "Avg._coverage_depth",
- "Coverage_>=_1x",
- "N's_per_100_kbp"
- ]
- output_csv_file = f"{software_name}_output.csv"
-
- for entry in results:
- if "content" in entry and isinstance(entry["content"], list):
- for content_item in entry["content"]:
- n50 = content_item.get("N50", "")
- try:
- n50_value = float(n50) if n50 else 0
- filter_n50 = "pass" if n50_value > 20000 else "fail"
- except ValueError:
- filter_n50 = "fail" # If the value is non-numeric, consider it as "fail"
-
- extracted_data.append({
- "Assembly": content_item.get("Assembly", ""),
- "contigs_(>=_0_bp)": content_item.get("contigs_(>=_0_bp)", ""),
- "contigs_(>=_1000_bp)": content_item.get("contigs_(>=_1000_bp)", ""),
- "Total_length_(>=_0_bp)": content_item.get("Total_length_(>=_0_bp)", ""),
- "Total_length_(>=_1000_bp)": content_item.get("Total_length_(>=_1000_bp)", ""),
- "contigs": content_item.get("contigs", ""),
- "Largest_contig": content_item.get("Largest_contig", ""),
- "Total_length": content_item.get("Total_length", ""),
- "GC": content_item.get("GC", ""),
- "N50": content_item.get("N50", ""),
- "Filter_N50": filter_n50,
- "N90": content_item.get("N90", ""),
- "auN": content_item.get("auN", ""),
- "L50": content_item.get("L50", ""),
- "L90": content_item.get("L90", ""),
- "total_reads": content_item.get("total_reads", ""),
- "left": content_item.get("left", ""),
- "right": content_item.get("right", ""),
- "Mapped": content_item.get("Mapped", ""),
- "Properly_paired": content_item.get("Properly_paired", ""),
- "Avg._coverage_depth": content_item.get("Avg._coverage_depth", ""),
- "Coverage_>=_1x": content_item.get("Coverage_>=_1x", ""),
- "N's_per_100_kbp": content_item.get("N's_per_100_kbp", "")
- })
-
- with open(output_csv_file, "w", newline="", encoding="utf-8") as f:
- writer = csv.DictWriter(f, fieldnames=headers)
- writer.writeheader()
- writer.writerows(extracted_data)
-
- print(f"CSV file successfully generated: {output_csv_file}")
-
-def extract_contigs_to_fasta(json_data):
- """
- Extract contigs information from "shovill" and save it as a FASTA file.
- """
- if isinstance(json_data, list):
- json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == "shovill"), None)
-
- if not isinstance(json_data, dict):
- print("Invalid JSON format for shovill extraction.")
- return
-
- results = json_data.get("results", [])
- output_fasta_file = "shovill_contigs.fasta"
-
- with open(output_fasta_file, "w", encoding="utf-8") as f:
- for entry in results:
- if "content" in entry and isinstance(entry["content"], list):
- for content_item in entry["content"]:
- name = content_item.get("name", "unknown")
- length = content_item.get("length", "unknown")
- coverage = content_item.get("coverage", "unknown")
- sequence = content_item.get("sequence", "")
-
- header = f">{name}_{length}_{coverage}"
- f.write(f"{header}\n{sequence}\n")
-
- print(f"FASTA file successfully generated: {output_fasta_file}")
-
-if __name__ == "__main__":
- if len(sys.argv) != 2:
- print("Usage: python script.py input.json")
- sys.exit(1)
-
- input_json_file = sys.argv[1]
-
- try:
- with open(input_json_file, "r", encoding="utf-8") as file:
- json_data = json.load(file)
- extract_software_data(json_data, "quast")
- extract_contigs_to_fasta(json_data)
- sys.exit(0)
- except Exception as e:
- print(f"Error processing file: {e}")
- sys.exit(1)
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/2_quast_get_fasta_v2.xml
--- a/spamr_vet_tools_v2/2_quast_get_fasta_v2.xml Tue Feb 25 14:14:39 2025 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,67 +0,0 @@
-
-
- Extracts QUAST metrics and generates FASTA files from JSON input.
-
-
- python
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- `_ - Quality assessment tool for genome assemblies.
-- `Shovill `_ - A tool for rapid bacterial genome assembly using SPAdes.
-
-For questions or issues, please contact the tool maintainers.
- ]]>
-
-
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/3_MLST_AMRFINDER_STARMAR_v2.py
--- a/spamr_vet_tools_v2/3_MLST_AMRFINDER_STARMAR_v2.py Tue Feb 25 14:14:39 2025 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,73 +0,0 @@
-import json
-import csv
-import sys
-
-def generate_csv_from_json(json_data):
- """
- Parse the JSON and generate CSV files based on the analysis_software_name Abricate, AMRfinder plus and STARamr.
- Additionally, extract and process the 'mlst_file' content into its own CSV.
- """
- for entry in json_data:
- analysis_software = entry.get("analysis_software_name", "unknown")
- results = entry.get("results", [])
-
- if results:
- csv_file = f"{analysis_software}_output.csv"
- extracted_data = []
- headers = []
-
- for result in results:
- if result.get("name") == "mlst_file":
- mlst_file_path = "mlst.csv"
- mlst_content = result.get("content", [])
- mlst_headers = ["Isolate ID", "Scheme", "Sequence Type", "Locus"]
-
- # Write the MLST CSV file
- if mlst_content:
- with open(mlst_file_path, "w", newline="", encoding="utf-8") as f:
- writer = csv.DictWriter(f, fieldnames=mlst_headers)
- writer.writeheader()
- for row in mlst_content:
- writer.writerow({
- "Isolate ID": row.get("Isolate ID", ""),
- "Scheme": row.get("Scheme", ""),
- "Sequence Type": row.get("Sequence Type", ""),
- "Locus": "; ".join(row.get("Locus", []))
- })
-
- print(f"MLST CSV file successfully generated: {mlst_file_path}")
-
- if "content" in result and isinstance(result["content"], list):
- for content_item in result["content"]:
- extracted_data.append(content_item)
- for key in content_item.keys():
- if key not in headers:
- headers.append(key) # Maintain the original order of the JSON keys
-
- # Write the CSV file if there is data
- if extracted_data:
- with open(csv_file, "w", newline="", encoding="utf-8") as f:
- writer = csv.DictWriter(f, fieldnames=headers)
- writer.writeheader()
- for row in extracted_data:
- writer.writerow({key: row.get(key, "") for key in headers})
-
- print(f"CSV file successfully generated: {csv_file}")
- else:
- print(f"No content found for {analysis_software}.")
-
-if __name__ == "__main__":
- if len(sys.argv) != 2:
- print("Usage: python script.py input.json")
- sys.exit(1)
-
- input_json_file = sys.argv[1]
-
- try:
- with open(input_json_file, "r", encoding="utf-8") as file:
- json_data = json.load(file)
- generate_csv_from_json(json_data)
- sys.exit(0)
- except Exception as e:
- print(f"Error processing file: {e}")
- sys.exit(1)
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/3_MLST_AMRFINDER_STARMAR_v2.xml
--- a/spamr_vet_tools_v2/3_MLST_AMRFINDER_STARMAR_v2.xml Tue Feb 25 14:14:39 2025 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,70 +0,0 @@
-
-
- Extracts MLST, AMRfinder Plus, and STARamr results from JSON input.
-
-
- python
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- `_ - Multi-locus sequence typing database.
-- `AMRfinder Plus `_ - Antimicrobial resistance gene detection.
-- `STARamr `_ - Salmonella sequence typing and resistance analysis.
-
-For questions or issues, please contact the tool maintainers.
- ]]>
-
-
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/mlst_amrfinder_staramr.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/spamr_vet_tools_v2/mlst_amrfinder_staramr.py Tue Mar 25 13:35:00 2025 +0000
@@ -0,0 +1,73 @@
+import json
+import csv
+import sys
+
+def generate_csv_from_json(json_data):
+ """
+ Parse the JSON and generate CSV files based on the analysis_software_name Abricate, AMRfinder plus and STARamr.
+ Additionally, extract and process the 'mlst_file' content into its own CSV.
+ """
+ for entry in json_data:
+ analysis_software = entry.get("analysis_software_name", "unknown")
+ results = entry.get("results", [])
+
+ if results:
+ csv_file = f"{analysis_software}_output.csv"
+ extracted_data = []
+ headers = []
+
+ for result in results:
+ if result.get("name") == "mlst_file":
+ mlst_file_path = "mlst.csv"
+ mlst_content = result.get("content", [])
+ mlst_headers = ["Isolate ID", "Scheme", "Sequence Type", "Locus"]
+
+ # Write the MLST CSV file
+ if mlst_content:
+ with open(mlst_file_path, "w", newline="", encoding="utf-8") as f:
+ writer = csv.DictWriter(f, fieldnames=mlst_headers)
+ writer.writeheader()
+ for row in mlst_content:
+ writer.writerow({
+ "Isolate ID": row.get("Isolate ID", ""),
+ "Scheme": row.get("Scheme", ""),
+ "Sequence Type": row.get("Sequence Type", ""),
+ "Locus": "; ".join(row.get("Locus", []))
+ })
+
+ print(f"MLST CSV file successfully generated: {mlst_file_path}")
+
+ if "content" in result and isinstance(result["content"], list):
+ for content_item in result["content"]:
+ extracted_data.append(content_item)
+ for key in content_item.keys():
+ if key not in headers:
+ headers.append(key) # Maintain the original order of the JSON keys
+
+ # Write the CSV file if there is data
+ if extracted_data:
+ with open(csv_file, "w", newline="", encoding="utf-8") as f:
+ writer = csv.DictWriter(f, fieldnames=headers)
+ writer.writeheader()
+ for row in extracted_data:
+ writer.writerow({key: row.get(key, "") for key in headers})
+
+ print(f"CSV file successfully generated: {csv_file}")
+ else:
+ print(f"No content found for {analysis_software}.")
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print("Usage: python script.py input.json")
+ sys.exit(1)
+
+ input_json_file = sys.argv[1]
+
+ try:
+ with open(input_json_file, "r", encoding="utf-8") as file:
+ json_data = json.load(file)
+ generate_csv_from_json(json_data)
+ sys.exit(0)
+ except Exception as e:
+ print(f"Error processing file: {e}")
+ sys.exit(1)
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/mlst_amrfinder_staramr.xml
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/spamr_vet_tools_v2/mlst_amrfinder_staramr.xml Tue Mar 25 13:35:00 2025 +0000
@@ -0,0 +1,70 @@
+
+
+ Extracts MLST, AMRfinder Plus, and STARamr results from JSON input.
+
+
+ python
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ `_ - Multi-locus sequence typing database.
+- `AMRfinder Plus `_ - Antimicrobial resistance gene detection.
+- `STARamr `_ - Salmonella sequence typing and resistance analysis.
+
+For questions or issues, please contact the tool maintainers.
+ ]]>
+
+
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/quality_script_fastp_bracken.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/spamr_vet_tools_v2/quality_script_fastp_bracken.py Tue Mar 25 13:35:00 2025 +0000
@@ -0,0 +1,134 @@
+import json
+import csv
+import sys
+import os
+
+def extract_software_data(json_data, software_name):
+ """
+ Extract data for a specific software from the JSON input
+
+ For "bracken", add a "contamination" column where the value is "pass"
+ if fraction_total_reads > 0.6, otherwise "fail".
+
+ For "fastp", include only specific columns.
+ """
+ # Ensure json_data is a dictionary
+ if isinstance(json_data, list):
+ json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == software_name), None)
+
+ if not isinstance(json_data, dict):
+ print(f"Invalid JSON format for {software_name} extraction.")
+ return
+
+ results = json_data.get("results", [])
+ extracted_data = []
+ headers = [] # Use list to collect headers to maintain order
+ output_csv_file = f"{software_name}_output.csv"
+
+ # Define specific columns for "fastp"
+ fastp_columns = [
+ "summary_sequencing",
+ "summary_before_filtering_total_reads",
+ "summary_before_filtering_total_bases",
+ "summary_before_filtering_q20_bases",
+ "summary_before_filtering_q30_bases",
+ "summary_before_filtering_q20_rate",
+ "summary_before_filtering_q30_rate",
+ "summary_before_filtering_read1_mean_length",
+ "summary_before_filtering_read2_mean_length",
+ "summary_before_filtering_gc_content",
+ "summary_after_filtering_total_reads",
+ "summary_after_filtering_total_bases",
+ "summary_after_filtering_q20_bases",
+ "summary_after_filtering_q30_bases",
+ "summary_after_filtering_q20_rate",
+ "summary_after_filtering_q30_rate",
+ "summary_after_filtering_read1_mean_length",
+ "summary_after_filtering_read2_mean_length",
+ "summary_after_filtering_gc_content",
+ "filtering_result_passed_filter_reads",
+ "filtering_result_low_quality_reads",
+ "filtering_result_too_many_N_reads",
+ "filtering_result_too_short_reads",
+ "filtering_result_too_long_reads",
+ "duplication_rate",
+ "insert_size_peak",
+ ]
+
+ for entry in results:
+ if "content" in entry and isinstance(entry["content"], list):
+ for content_item in entry["content"]:
+ row_data = {}
+ if software_name == "fastp":
+ for key, value in content_item.items():
+ if isinstance(value, dict):
+ for sub_key, sub_value in value.items():
+ if isinstance(sub_value, dict):
+ for sub_sub_key, sub_sub_value in sub_value.items():
+ column_name = f"{key}_{sub_key}_{sub_sub_key}"
+ if column_name in fastp_columns:
+ row_data[column_name] = sub_sub_value
+ if column_name not in headers:
+ headers.append(column_name)
+ else:
+ column_name = f"{key}_{sub_key}"
+ if column_name in fastp_columns:
+ row_data[column_name] = sub_value
+ if column_name not in headers:
+ headers.append(column_name)
+ else:
+ if key in fastp_columns:
+ row_data[key] = value
+ if key not in headers:
+ headers.append(key)
+ elif software_name == "bracken":
+ for key, value in content_item.items():
+ if isinstance(value, dict):
+ for sub_key, sub_value in value.items():
+ column_name = f"{key}_{sub_key}"
+ row_data[column_name] = sub_value
+ if column_name not in headers:
+ headers.append(column_name)
+ else:
+ row_data[key] = value
+ if key not in headers:
+ headers.append(key)
+
+ # Add contamination column for "bracken"
+ fraction_total_reads = row_data.get("fraction_total_reads", 0)
+ row_data["contamination"] = "pass" if float(fraction_total_reads) > 0.6 else "fail"
+ if "contamination" not in headers:
+ headers.append("contamination")
+
+ extracted_data.append(row_data)
+
+ if not extracted_data:
+ print(f"No data extracted for {software_name}")
+ # Create empty file to prevent Galaxy error
+ with open(output_csv_file, "w", newline="", encoding="utf-8") as f:
+ f.write("No data available\n")
+ return
+
+ with open(output_csv_file, "w", newline="", encoding="utf-8") as f:
+ writer = csv.DictWriter(f, fieldnames=headers)
+ writer.writeheader()
+ writer.writerows(extracted_data)
+
+ print(f"CSV file successfully generated: {output_csv_file}")
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print("Usage: python extract_software_data.py input.json")
+ sys.exit(1)
+
+ input_json_file = sys.argv[1]
+
+ try:
+ with open(input_json_file, "r", encoding="utf-8") as file:
+ json_data = json.load(file)
+ extract_software_data(json_data, "fastp")
+ extract_software_data(json_data, "bracken")
+ sys.exit(0)
+ except Exception as e:
+ print(f"Error processing file: {e}")
+ sys.exit(1)
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/quality_script_fastp_bracken.xml
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/spamr_vet_tools_v2/quality_script_fastp_bracken.xml Tue Mar 25 13:35:00 2025 +0000
@@ -0,0 +1,43 @@
+
+
+ Quality control using FastP and Bracken
+
+
+ python
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/quast_get_fasta.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/spamr_vet_tools_v2/quast_get_fasta.py Tue Mar 25 13:35:00 2025 +0000
@@ -0,0 +1,134 @@
+import json
+import csv
+import sys
+import os
+
+def extract_software_data(json_data, software_name):
+ """
+ Extract QUAST data from JSON and create a CSV with assembly metrics.
+ For "quast", include specific columns and calculate a Filter_N50 based on "N50".
+ """
+ # Ensure json_data is a dictionary
+ if isinstance(json_data, list):
+ json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == software_name), None)
+
+ if not isinstance(json_data, dict):
+ print(f"Invalid JSON format for {software_name} extraction.")
+ return
+
+ results = json_data.get("results", [])
+ extracted_data = []
+ headers = [
+ "Assembly",
+ "contigs_(>=_0_bp)",
+ "contigs_(>=_1000_bp)",
+ "Total_length_(>=_0_bp)",
+ "Total_length_(>=_1000_bp)",
+ "contigs",
+ "Largest_contig",
+ "Total_length",
+ "GC",
+ "N50",
+ "Filter_N50",
+ "N90",
+ "auN",
+ "L50",
+ "L90",
+ "total_reads",
+ "left",
+ "right",
+ "Mapped",
+ "Properly_paired",
+ "Avg._coverage_depth",
+ "Coverage_>=_1x",
+ "N's_per_100_kbp"
+ ]
+ output_csv_file = f"{software_name}_output.csv"
+
+ for entry in results:
+ if "content" in entry and isinstance(entry["content"], list):
+ for content_item in entry["content"]:
+ n50 = content_item.get("N50", "")
+ try:
+ n50_value = float(n50) if n50 else 0
+ filter_n50 = "pass" if n50_value > 20000 else "fail"
+ except ValueError:
+ filter_n50 = "fail" # If the value is non-numeric, consider it as "fail"
+
+ extracted_data.append({
+ "Assembly": content_item.get("Assembly", ""),
+ "contigs_(>=_0_bp)": content_item.get("contigs_(>=_0_bp)", ""),
+ "contigs_(>=_1000_bp)": content_item.get("contigs_(>=_1000_bp)", ""),
+ "Total_length_(>=_0_bp)": content_item.get("Total_length_(>=_0_bp)", ""),
+ "Total_length_(>=_1000_bp)": content_item.get("Total_length_(>=_1000_bp)", ""),
+ "contigs": content_item.get("contigs", ""),
+ "Largest_contig": content_item.get("Largest_contig", ""),
+ "Total_length": content_item.get("Total_length", ""),
+ "GC": content_item.get("GC", ""),
+ "N50": content_item.get("N50", ""),
+ "Filter_N50": filter_n50,
+ "N90": content_item.get("N90", ""),
+ "auN": content_item.get("auN", ""),
+ "L50": content_item.get("L50", ""),
+ "L90": content_item.get("L90", ""),
+ "total_reads": content_item.get("total_reads", ""),
+ "left": content_item.get("left", ""),
+ "right": content_item.get("right", ""),
+ "Mapped": content_item.get("Mapped", ""),
+ "Properly_paired": content_item.get("Properly_paired", ""),
+ "Avg._coverage_depth": content_item.get("Avg._coverage_depth", ""),
+ "Coverage_>=_1x": content_item.get("Coverage_>=_1x", ""),
+ "N's_per_100_kbp": content_item.get("N's_per_100_kbp", "")
+ })
+
+ with open(output_csv_file, "w", newline="", encoding="utf-8") as f:
+ writer = csv.DictWriter(f, fieldnames=headers)
+ writer.writeheader()
+ writer.writerows(extracted_data)
+
+ print(f"CSV file successfully generated: {output_csv_file}")
+
+def extract_contigs_to_fasta(json_data):
+ """
+ Extract contigs information from "shovill" and save it as a FASTA file.
+ """
+ if isinstance(json_data, list):
+ json_data = next((entry for entry in json_data if "analysis_software_name" in entry and entry["analysis_software_name"] == "shovill"), None)
+
+ if not isinstance(json_data, dict):
+ print("Invalid JSON format for shovill extraction.")
+ return
+
+ results = json_data.get("results", [])
+ output_fasta_file = "shovill_contigs.fasta"
+
+ with open(output_fasta_file, "w", encoding="utf-8") as f:
+ for entry in results:
+ if "content" in entry and isinstance(entry["content"], list):
+ for content_item in entry["content"]:
+ name = content_item.get("name", "unknown")
+ length = content_item.get("length", "unknown")
+ coverage = content_item.get("coverage", "unknown")
+ sequence = content_item.get("sequence", "")
+
+ header = f">{name}_{length}_{coverage}"
+ f.write(f"{header}\n{sequence}\n")
+
+ print(f"FASTA file successfully generated: {output_fasta_file}")
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print("Usage: python script.py input.json")
+ sys.exit(1)
+
+ input_json_file = sys.argv[1]
+
+ try:
+ with open(input_json_file, "r", encoding="utf-8") as file:
+ json_data = json.load(file)
+ extract_software_data(json_data, "quast")
+ extract_contigs_to_fasta(json_data)
+ sys.exit(0)
+ except Exception as e:
+ print(f"Error processing file: {e}")
+ sys.exit(1)
diff -r e57a908b9d3d -r d7b099fbb003 spamr_vet_tools_v2/quast_get_fasta.xml
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/spamr_vet_tools_v2/quast_get_fasta.xml Tue Mar 25 13:35:00 2025 +0000
@@ -0,0 +1,67 @@
+
+
+ Extracts QUAST metrics and generates FASTA files from JSON input.
+
+
+ python
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ `_ - Quality assessment tool for genome assemblies.
+- `Shovill `_ - A tool for rapid bacterial genome assembly using SPAdes.
+
+For questions or issues, please contact the tool maintainers.
+ ]]>
+
+