comparison detect_circular_sequences.py @ 0:faec698e3f98 draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/main/tools/detect_circular_sequences commit 7ea9f729b44c6351c52b6295c780f496d239488e
author iuc
date Thu, 11 Dec 2025 08:53:37 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:faec698e3f98
1 #!/usr/bin/env python3
2
3 #########################################################################################
4 # This script detect circular contigs by looking for exact identical k-mer at the two
5 # ends of the sequences provided in fasta file. In order to be able to predict genes
6 # spanning the origin of circular contigs, the first 1,000 nucleotides of each circular
7 # contigs are duplicated and added at the contig's end.
8 #
9 # Inspired by Simon Roux work for Metavir2 (2014) and Corentin Hochart work in PlasSuite
10 #
11 #########################################################################################
12
13 import argparse
14 import logging
15 from pathlib import Path
16
17 from Bio import SeqIO
18
19 log_levels = {
20 0: logging.CRITICAL,
21 1: logging.ERROR,
22 2: logging.WARN,
23 3: logging.INFO,
24 4: logging.DEBUG,
25 }
26 logging.basicConfig(level=log_levels[3])
27 logger = logging.getLogger()
28
29
30 def setup_logger(verbosity: int) -> None:
31 """
32 Configure the logger based on verbosity level.
33
34 :param verbosity: verbosity level
35 """
36 logging.basicConfig(
37 format="%(asctime)s - %(levelname)s - %(message)s",
38 level=log_levels.get(verbosity, logging.INFO),
39 )
40
41
42 def find_occurrences(s, substring) -> list:
43 """
44 Find all starting positions of a substring in a string
45
46 :param s: String to be searched
47 :param substring: Substring to search in s
48 """
49 return [i for i in range(len(s)) if s.startswith(substring, i)]
50
51
52 def is_circular(sequence, length, pos) -> bool:
53 """
54 Determines if a sequence is circular by comparing segments starting at `start_pos`.
55
56 A sequence is considered circular if the `length` elements at the beginning of the sequence match the `length`
57 elements starting at `start_pos` in the sequence. This is useful for detecting repeating patterns or cycles
58 in sequences.
59
60 :param sequence: The input sequence
61 :param length: The number of elements to compare for circularity.
62 :param pos: The starting index in the sequence to begin the comparison.
63
64 :return bool: True if circular, False otherwise
65 """
66 for i in range(length):
67 if sequence[i] != sequence[pos + i]:
68 return False
69 return True
70
71
72 def check_circularity(seq_record, subseq_length=10) -> int:
73 """
74 Process a single sequence to detect circularity and return the overlap length if circular.
75
76 :param seq_record: SeqRecord object
77 :param subseq_length: Length of 3' fragment to check on the 5' end
78
79 :return: overlap length if circular, 0 otherwise
80 """
81 seq_len = len(seq_record)
82
83 if seq_len < subseq_length:
84 logging.error(f"Sequence too short ({seq_len}bp): {seq_record.id}")
85 return 0
86
87 begin = "".join(seq_record[:subseq_length])
88 end = "".join(seq_record[subseq_length:])
89 positions = [x + subseq_length for x in find_occurrences(end, begin)]
90
91 for pos in positions:
92 overlap_length = seq_len - pos
93 if is_circular(seq_record, overlap_length, pos):
94 return overlap_length
95 return 0
96
97
98 def extend_sequence(
99 seq_record,
100 overlap_length,
101 duplication_length=1000,
102 ):
103 """
104 Extends the 5' end of a sequence by duplicating a fragment from the 3' end.
105
106 This function is useful for simulating circular sequences by extending the 5' end
107 with a fragment from the 3' end, based on the specified `overlap_length` and `duplication_length`.
108
109 :param seq_record: The input sequence record to be extended.
110 :param overlap_length: The length of the overlapping segment that was previously identified as circular.
111 :param duplication_length: The length of the 3' end fragment to duplicate and add to the 5' end.
112
113 :return: The modified sequence record with the extended 5' end.
114 """
115 # Remove the overlapping segment from the 3' end
116 modified_seq = seq_record.seq[: len(seq_record.seq) - overlap_length]
117 # Duplicate the first `duplication_length` nucleotides from the original sequence
118 # and append them to the 5' end of the modified sequence
119 if len(modified_seq) < duplication_length:
120 # If the modified sequence is shorter than `duplication_length`,
121 # duplicate the entire modified sequence
122 extension = modified_seq
123 else:
124 # Otherwise, duplicate the first `duplication_length` nucleotides
125 extension = seq_record.seq[:duplication_length]
126 # Combine the modified sequence with the duplicated fragment
127 extended_seq = modified_seq + extension
128 # Update the sequence in the SeqRecord object
129 seq_record.seq = extended_seq
130 return seq_record
131
132
133 def detect_circular(
134 fasta_in,
135 fasta_out,
136 id_out,
137 subseq_length=10,
138 duplication_length=1000,
139 ):
140 """
141 Detect and process circular sequences in a FASTA file.
142
143 This function reads sequences from `fasta_in`, checks for circularity,
144 extends circular sequences, and writes the results to `fasta_out` and `id_out`.
145
146 :param fasta_in: Path to the input FASTA file.
147 :param fasta_out: Path to the output FASTA file for extended circular sequences.
148 :param id_out: Path to the output file for recording IDs of circular sequences.
149 :param subseq_length: Length of the 3' fragment to check for circularity.
150 :param duplication_length: Length of the 3' fragment to duplicate and add to the 5' end.
151 """
152 records = []
153 ids = []
154 try:
155 with fasta_in.open("r") as fasta_in_f:
156 for seq_record in SeqIO.parse(fasta_in_f, "fasta"):
157 overlap_length = check_circularity(
158 seq_record,
159 subseq_length=subseq_length,
160 )
161 if overlap_length > 0:
162 records.append(
163 extend_sequence(
164 seq_record,
165 overlap_length,
166 duplication_length,
167 )
168 )
169 ids.append(seq_record.id)
170 except Exception as e:
171 logging.error(f"Error processing {fasta_in}: {e}")
172 raise
173
174 if not records:
175 logging.warning("Warning: No circular sequences found.")
176
177 try:
178 with fasta_out.open("w") as fasta_out_f:
179 SeqIO.write(records, fasta_out_f, "fasta")
180 with id_out.open("w") as id_out_f:
181 id_out_f.write("\n".join(ids) + "\n")
182 except IOError as e:
183 logging.error(f"Error writing output files: {e}")
184 raise
185
186
187 def main():
188 """
189 Main function to detect circular contigs in a FASTA file.
190
191 This function parses command-line arguments, launches function to read the input
192 FASTA file, process each sequence to detect circular contigs, and generate the
193 output files.
194 """
195 parser = argparse.ArgumentParser(
196 description="""
197 Detect circular contigs by looking for exact identical subsequences at the two
198 ends of the sequences provided in a FASTA file and output the circular contigs
199 extended on 5' end by duplication of the first nucleotides on 3' end to be able
200 to predict genes spanning the origin of circular contigs.
201 """
202 )
203 parser.add_argument("--fasta-in", required=True, help="Input FASTA file")
204 parser.add_argument(
205 "--subseq-length",
206 type=int,
207 default=10,
208 help="Length of 3' fragment to check on the 5' end (default: 10)",
209 )
210 parser.add_argument(
211 "--duplication-length",
212 type=int,
213 default=1000,
214 help="Length of the 3' end fragment to duplicate and add on the 5' end (default: 1000)",
215 )
216 parser.add_argument(
217 "-v",
218 "--verbose",
219 type=int,
220 default=3,
221 choices=log_levels.keys(),
222 help="Verbosity level (0=CRITICAL, 1=ERROR, 2=WARN, 3=INFO, 4=DEBUG)",
223 )
224 parser.add_argument(
225 "--fasta-out",
226 required=True,
227 help="Output FASTA file with extended circular contigs",
228 )
229 parser.add_argument(
230 "--id-out", required=True, help="Output TXT file with circular sequence IDs"
231 )
232
233 args = parser.parse_args()
234 setup_logger(args.verbose)
235
236 logging.info("Starting script execution.")
237 detect_circular(
238 Path(args.fasta_in),
239 Path(args.fasta_out),
240 Path(args.id_out),
241 subseq_length=args.subseq_length,
242 duplication_length=args.duplication_length,
243 )
244 logging.info("Script execution completed.")
245
246
247 if __name__ == "__main__":
248 main()