comparison xmfa_process.py @ 1:bfe0d989004d draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:54:26 +0000
parents
children
comparison
equal deleted inserted replaced
0:b4d7351df1ff 1:bfe0d989004d
1 #!/usr/bin/env python
2 from Bio import SeqIO
3 import argparse
4 import json
5 import os
6 from CPT_GFFParser import gffParse, gffWrite
7
8
9 def parse_xmfa(xmfa):
10 """Simple XMFA parser until https://github.com/biopython/biopython/pull/544"""
11 current_lcb = []
12 current_seq = {}
13 for line in xmfa.readlines():
14 if line.startswith("#"):
15 continue
16
17 if line.strip() == "=":
18 if "id" in current_seq:
19 current_lcb.append(current_seq)
20 current_seq = {}
21 yield current_lcb
22 current_lcb = []
23 else:
24 line = line.strip()
25 if line.startswith(">"):
26 if "id" in current_seq:
27 current_lcb.append(current_seq)
28 current_seq = {}
29 data = line.strip().split()
30 # 0 1 2 3 4 5
31 # > 1:5986-6406 + CbK.fa # CbK_gp011
32 id, loc = data[1].split(":")
33 start, end = loc.split("-")
34 current_seq = {
35 "rid": "_".join(data[1:]),
36 "id": id,
37 "start": int(start),
38 "end": int(end),
39 "strand": 1 if data[2] == "+" else -1,
40 "seq": "",
41 "comment": "",
42 }
43 if len(data) > 5:
44 current_seq["comment"] = " ".join(data[5:])
45 # else:
46 # current_seq['seq'] += line.strip()
47
48
49 def percent_identity(a, b):
50 """Calculate % identity, ignoring gaps in the host sequence"""
51 match = 0
52 mismatch = 0
53 for char_a, char_b in zip(list(a), list(b)):
54 if char_a == "-":
55 continue
56 if char_a == char_b:
57 match += 1
58 else:
59 mismatch += 1
60
61 if match + mismatch == 0:
62 return 0.0
63 return 100 * float(match) / (match + mismatch)
64
65
66 def get_fasta_ids(sequences):
67 """Returns a list of fasta records in the order they appear"""
68 ids = []
69 for seq in SeqIO.parse(sequences, "fasta"):
70 ids.append(seq.id)
71 return ids
72
73
74 if __name__ == "__main__":
75 parser = argparse.ArgumentParser(description="parse xmfa file")
76 parser.add_argument("gff3", type=argparse.FileType("r"), help="Multi-GFF3 File")
77 parser.add_argument("fasta", type=argparse.FileType("r"), help="Multi-FA file")
78 parser.add_argument("xmfa", type=argparse.FileType("r"), help="XMFA File")
79 parser.add_argument("output_dir", type=str, help="output directory")
80 args = parser.parse_args()
81
82 fasta_list = get_fasta_ids(args.fasta)
83 lcbs = parse_xmfa(args.xmfa)
84
85 if not os.path.exists(args.output_dir):
86 os.makedirs(args.output_dir)
87
88 output = {"fasta": [], "gff3": [], "xmfa": None}
89
90 processed_xmfa = os.path.join(args.output_dir, "regions.json")
91 with open(processed_xmfa, "w") as handle:
92 json.dump([lcb for lcb in lcbs if len(lcb) > 1], handle, sort_keys=True)
93
94 output["xmfa"] = processed_xmfa
95
96 # Have to seek because we already access args.fasta once in id_tn_dict
97 args.fasta.seek(0)
98 # Load up sequence(s) for GFF3 data
99 seq_dict = SeqIO.to_dict(SeqIO.parse(args.fasta, "fasta"))
100 # Parse GFF3 records
101 gffs = gffParse(args.gff3, base_dict=seq_dict)
102 for record in sorted(gffs, key=lambda rec: fasta_list.index(rec.id)):
103 gff_output = os.path.join(args.output_dir, record.id + ".gff")
104 with open(gff_output, "w") as handle:
105 gffWrite([record], handle)
106 output["gff3"].append(gff_output)
107
108 fa_output = os.path.join(args.output_dir, record.id + ".txt")
109 with open(fa_output, "w") as handle:
110 handle.write(str(record.seq))
111 output["fasta"].append(
112 {"path": fa_output, "length": len(record.seq), "name": record.id}
113 )
114
115 print(json.dumps(output, sort_keys=True))