Mercurial > repos > maciek > spamr_vet_tools
diff spamr_vet_tools_v2/mlst_amrfinder_staramr.py @ 2:d7b099fbb003 draft default tip
Corrected file names and updated tool wrappers for consistency.
author | maciek |
---|---|
date | Tue, 25 Mar 2025 13:35:00 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/spamr_vet_tools_v2/mlst_amrfinder_staramr.py Tue Mar 25 13:35:00 2025 +0000 @@ -0,0 +1,73 @@ +import json +import csv +import sys + +def generate_csv_from_json(json_data): + """ + Parse the JSON and generate CSV files based on the analysis_software_name Abricate, AMRfinder plus and STARamr. + Additionally, extract and process the 'mlst_file' content into its own CSV. + """ + for entry in json_data: + analysis_software = entry.get("analysis_software_name", "unknown") + results = entry.get("results", []) + + if results: + csv_file = f"{analysis_software}_output.csv" + extracted_data = [] + headers = [] + + for result in results: + if result.get("name") == "mlst_file": + mlst_file_path = "mlst.csv" + mlst_content = result.get("content", []) + mlst_headers = ["Isolate ID", "Scheme", "Sequence Type", "Locus"] + + # Write the MLST CSV file + if mlst_content: + with open(mlst_file_path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=mlst_headers) + writer.writeheader() + for row in mlst_content: + writer.writerow({ + "Isolate ID": row.get("Isolate ID", ""), + "Scheme": row.get("Scheme", ""), + "Sequence Type": row.get("Sequence Type", ""), + "Locus": "; ".join(row.get("Locus", [])) + }) + + print(f"MLST CSV file successfully generated: {mlst_file_path}") + + if "content" in result and isinstance(result["content"], list): + for content_item in result["content"]: + extracted_data.append(content_item) + for key in content_item.keys(): + if key not in headers: + headers.append(key) # Maintain the original order of the JSON keys + + # Write the CSV file if there is data + if extracted_data: + with open(csv_file, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=headers) + writer.writeheader() + for row in extracted_data: + writer.writerow({key: row.get(key, "") for key in headers}) + + print(f"CSV file successfully generated: {csv_file}") + else: + print(f"No content found for {analysis_software}.") + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python script.py input.json") + sys.exit(1) + + input_json_file = sys.argv[1] + + try: + with open(input_json_file, "r", encoding="utf-8") as file: + json_data = json.load(file) + generate_csv_from_json(json_data) + sys.exit(0) + except Exception as e: + print(f"Error processing file: {e}") + sys.exit(1)