diff cpt_convert_xmfa/xmfa.py @ 0:06d8e28d0bd7 draft default tip

Uploaded
author cpt
date Fri, 10 Jun 2022 08:49:43 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cpt_convert_xmfa/xmfa.py	Fri Jun 10 08:49:43 2022 +0000
@@ -0,0 +1,144 @@
+# This file is licensed seperately of the rest of the codebase. This is due to
+# BioPython's failure to merge https://github.com/biopython/biopython/pull/544
+# in a timely fashion. Please use this file however you see fit!
+#
+#
+# Copyright (c) 2015-2017 Center for Phage Technology. All rights reserved.
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this
+# list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY CENTER FOR PHAGE TECHNOLOGY "AS IS" AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL CENTER FOR PHAGE TECHNOLOGY OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
+# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+from Bio import SeqIO
+import tempfile
+import sys
+
+
+def parse_xmfa(xmfa):
+    """Simple XMFA parser until https://github.com/biopython/biopython/pull/544
+    """
+    current_lcb = []
+    current_seq = {}
+    for line in xmfa.readlines():
+        if line.startswith("#"):
+            continue
+
+        if line.strip() == "=":
+            if "id" in current_seq:
+                current_lcb.append(current_seq)
+                current_seq = {}
+            yield current_lcb
+            current_lcb = []
+        else:
+            line = line.strip()
+            if line.startswith(">"):
+                if "id" in current_seq:
+                    current_lcb.append(current_seq)
+                    current_seq = {}
+                data = line.strip().split()
+                # 0 1           2 3      4 5
+                # > 1:5986-6406 + CbK.fa # CbK_gp011
+                id, loc = data[1].split(":")
+                start, end = loc.split("-")
+                current_seq = {
+                    "rid": "_".join(data[1:]),
+                    "id": id,
+                    "start": int(start),
+                    "end": int(end),
+                    "strand": 1 if data[2] == "+" else -1,
+                    "file": data[3],
+                    "seq": "",
+                    "comment": "",
+                }
+                if len(data) > 5:
+                    current_seq["comment"] = " ".join(data[5:])
+            else:
+                current_seq["seq"] += line.strip()
+
+
+HEADER_TPL = "> {id}:{start}-{end} {strand} {file} # {comment}\n"
+
+
+def split_by_n(seq, n):
+    """A generator to divide a sequence into chunks of n units."""
+    # http://stackoverflow.com/questions/9475241/split-python-string-every-nth-character
+    while seq:
+        yield seq[:n]
+        seq = seq[n:]
+
+
+def to_xmfa(lcbs, handle=sys.stdout):
+    handle.write("#FormatVersion Mauve1\n")
+    for lcb in lcbs:
+        for aln in lcb:
+            handle.write(
+                HEADER_TPL.format(
+                    id=aln["id"],
+                    start=aln["start"],
+                    end=aln["end"],
+                    strand="+" if aln["strand"] > 0 else "-",
+                    file=aln["file"],
+                    comment=aln["comment"],
+                )
+            )
+
+            for line in split_by_n(aln["seq"], 80):
+                handle.write(line + "\n")
+        handle.write("=\n")
+
+
+def percent_identity(a, b):
+    """Calculate % identity, ignoring gaps in the host sequence
+    """
+    match = 0
+    mismatch = 0
+    for char_a, char_b in zip(list(a), list(b)):
+        if char_a == "-":
+            continue
+        if char_a == char_b:
+            match += 1
+        else:
+            mismatch += 1
+
+    if match + mismatch == 0:
+        return 0.0
+    return 100 * float(match) / (match + mismatch)
+
+
+def id_tn_dict(sequences, tmpfile=False):
+    """Figure out sequence IDs
+    """
+    label_convert = {}
+    correct_chrom = None
+    if not isinstance(sequences, list):
+        sequences = [sequences]
+
+    i = 0
+    for sequence_file in sequences:
+        for record in SeqIO.parse(sequence_file, "fasta"):
+            if correct_chrom is None:
+                correct_chrom = record.id
+
+            i += 1
+            key = str(i)
+            label_convert[key] = {"record_id": record.id, "len": len(record.seq)}
+
+            if tmpfile:
+                label_convert[key] = tempfile.NamedTemporaryFile(delete=False)
+
+    return label_convert