Mercurial > repos > iuc > detect_circular_sequences
comparison detect_circular_sequences.py @ 0:faec698e3f98 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/main/tools/detect_circular_sequences commit 7ea9f729b44c6351c52b6295c780f496d239488e
| author | iuc |
|---|---|
| date | Thu, 11 Dec 2025 08:53:37 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:faec698e3f98 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 ######################################################################################### | |
| 4 # This script detect circular contigs by looking for exact identical k-mer at the two | |
| 5 # ends of the sequences provided in fasta file. In order to be able to predict genes | |
| 6 # spanning the origin of circular contigs, the first 1,000 nucleotides of each circular | |
| 7 # contigs are duplicated and added at the contig's end. | |
| 8 # | |
| 9 # Inspired by Simon Roux work for Metavir2 (2014) and Corentin Hochart work in PlasSuite | |
| 10 # | |
| 11 ######################################################################################### | |
| 12 | |
| 13 import argparse | |
| 14 import logging | |
| 15 from pathlib import Path | |
| 16 | |
| 17 from Bio import SeqIO | |
| 18 | |
| 19 log_levels = { | |
| 20 0: logging.CRITICAL, | |
| 21 1: logging.ERROR, | |
| 22 2: logging.WARN, | |
| 23 3: logging.INFO, | |
| 24 4: logging.DEBUG, | |
| 25 } | |
| 26 logging.basicConfig(level=log_levels[3]) | |
| 27 logger = logging.getLogger() | |
| 28 | |
| 29 | |
| 30 def setup_logger(verbosity: int) -> None: | |
| 31 """ | |
| 32 Configure the logger based on verbosity level. | |
| 33 | |
| 34 :param verbosity: verbosity level | |
| 35 """ | |
| 36 logging.basicConfig( | |
| 37 format="%(asctime)s - %(levelname)s - %(message)s", | |
| 38 level=log_levels.get(verbosity, logging.INFO), | |
| 39 ) | |
| 40 | |
| 41 | |
| 42 def find_occurrences(s, substring) -> list: | |
| 43 """ | |
| 44 Find all starting positions of a substring in a string | |
| 45 | |
| 46 :param s: String to be searched | |
| 47 :param substring: Substring to search in s | |
| 48 """ | |
| 49 return [i for i in range(len(s)) if s.startswith(substring, i)] | |
| 50 | |
| 51 | |
| 52 def is_circular(sequence, length, pos) -> bool: | |
| 53 """ | |
| 54 Determines if a sequence is circular by comparing segments starting at `start_pos`. | |
| 55 | |
| 56 A sequence is considered circular if the `length` elements at the beginning of the sequence match the `length` | |
| 57 elements starting at `start_pos` in the sequence. This is useful for detecting repeating patterns or cycles | |
| 58 in sequences. | |
| 59 | |
| 60 :param sequence: The input sequence | |
| 61 :param length: The number of elements to compare for circularity. | |
| 62 :param pos: The starting index in the sequence to begin the comparison. | |
| 63 | |
| 64 :return bool: True if circular, False otherwise | |
| 65 """ | |
| 66 for i in range(length): | |
| 67 if sequence[i] != sequence[pos + i]: | |
| 68 return False | |
| 69 return True | |
| 70 | |
| 71 | |
| 72 def check_circularity(seq_record, subseq_length=10) -> int: | |
| 73 """ | |
| 74 Process a single sequence to detect circularity and return the overlap length if circular. | |
| 75 | |
| 76 :param seq_record: SeqRecord object | |
| 77 :param subseq_length: Length of 3' fragment to check on the 5' end | |
| 78 | |
| 79 :return: overlap length if circular, 0 otherwise | |
| 80 """ | |
| 81 seq_len = len(seq_record) | |
| 82 | |
| 83 if seq_len < subseq_length: | |
| 84 logging.error(f"Sequence too short ({seq_len}bp): {seq_record.id}") | |
| 85 return 0 | |
| 86 | |
| 87 begin = "".join(seq_record[:subseq_length]) | |
| 88 end = "".join(seq_record[subseq_length:]) | |
| 89 positions = [x + subseq_length for x in find_occurrences(end, begin)] | |
| 90 | |
| 91 for pos in positions: | |
| 92 overlap_length = seq_len - pos | |
| 93 if is_circular(seq_record, overlap_length, pos): | |
| 94 return overlap_length | |
| 95 return 0 | |
| 96 | |
| 97 | |
| 98 def extend_sequence( | |
| 99 seq_record, | |
| 100 overlap_length, | |
| 101 duplication_length=1000, | |
| 102 ): | |
| 103 """ | |
| 104 Extends the 5' end of a sequence by duplicating a fragment from the 3' end. | |
| 105 | |
| 106 This function is useful for simulating circular sequences by extending the 5' end | |
| 107 with a fragment from the 3' end, based on the specified `overlap_length` and `duplication_length`. | |
| 108 | |
| 109 :param seq_record: The input sequence record to be extended. | |
| 110 :param overlap_length: The length of the overlapping segment that was previously identified as circular. | |
| 111 :param duplication_length: The length of the 3' end fragment to duplicate and add to the 5' end. | |
| 112 | |
| 113 :return: The modified sequence record with the extended 5' end. | |
| 114 """ | |
| 115 # Remove the overlapping segment from the 3' end | |
| 116 modified_seq = seq_record.seq[: len(seq_record.seq) - overlap_length] | |
| 117 # Duplicate the first `duplication_length` nucleotides from the original sequence | |
| 118 # and append them to the 5' end of the modified sequence | |
| 119 if len(modified_seq) < duplication_length: | |
| 120 # If the modified sequence is shorter than `duplication_length`, | |
| 121 # duplicate the entire modified sequence | |
| 122 extension = modified_seq | |
| 123 else: | |
| 124 # Otherwise, duplicate the first `duplication_length` nucleotides | |
| 125 extension = seq_record.seq[:duplication_length] | |
| 126 # Combine the modified sequence with the duplicated fragment | |
| 127 extended_seq = modified_seq + extension | |
| 128 # Update the sequence in the SeqRecord object | |
| 129 seq_record.seq = extended_seq | |
| 130 return seq_record | |
| 131 | |
| 132 | |
| 133 def detect_circular( | |
| 134 fasta_in, | |
| 135 fasta_out, | |
| 136 id_out, | |
| 137 subseq_length=10, | |
| 138 duplication_length=1000, | |
| 139 ): | |
| 140 """ | |
| 141 Detect and process circular sequences in a FASTA file. | |
| 142 | |
| 143 This function reads sequences from `fasta_in`, checks for circularity, | |
| 144 extends circular sequences, and writes the results to `fasta_out` and `id_out`. | |
| 145 | |
| 146 :param fasta_in: Path to the input FASTA file. | |
| 147 :param fasta_out: Path to the output FASTA file for extended circular sequences. | |
| 148 :param id_out: Path to the output file for recording IDs of circular sequences. | |
| 149 :param subseq_length: Length of the 3' fragment to check for circularity. | |
| 150 :param duplication_length: Length of the 3' fragment to duplicate and add to the 5' end. | |
| 151 """ | |
| 152 records = [] | |
| 153 ids = [] | |
| 154 try: | |
| 155 with fasta_in.open("r") as fasta_in_f: | |
| 156 for seq_record in SeqIO.parse(fasta_in_f, "fasta"): | |
| 157 overlap_length = check_circularity( | |
| 158 seq_record, | |
| 159 subseq_length=subseq_length, | |
| 160 ) | |
| 161 if overlap_length > 0: | |
| 162 records.append( | |
| 163 extend_sequence( | |
| 164 seq_record, | |
| 165 overlap_length, | |
| 166 duplication_length, | |
| 167 ) | |
| 168 ) | |
| 169 ids.append(seq_record.id) | |
| 170 except Exception as e: | |
| 171 logging.error(f"Error processing {fasta_in}: {e}") | |
| 172 raise | |
| 173 | |
| 174 if not records: | |
| 175 logging.warning("Warning: No circular sequences found.") | |
| 176 | |
| 177 try: | |
| 178 with fasta_out.open("w") as fasta_out_f: | |
| 179 SeqIO.write(records, fasta_out_f, "fasta") | |
| 180 with id_out.open("w") as id_out_f: | |
| 181 id_out_f.write("\n".join(ids) + "\n") | |
| 182 except IOError as e: | |
| 183 logging.error(f"Error writing output files: {e}") | |
| 184 raise | |
| 185 | |
| 186 | |
| 187 def main(): | |
| 188 """ | |
| 189 Main function to detect circular contigs in a FASTA file. | |
| 190 | |
| 191 This function parses command-line arguments, launches function to read the input | |
| 192 FASTA file, process each sequence to detect circular contigs, and generate the | |
| 193 output files. | |
| 194 """ | |
| 195 parser = argparse.ArgumentParser( | |
| 196 description=""" | |
| 197 Detect circular contigs by looking for exact identical subsequences at the two | |
| 198 ends of the sequences provided in a FASTA file and output the circular contigs | |
| 199 extended on 5' end by duplication of the first nucleotides on 3' end to be able | |
| 200 to predict genes spanning the origin of circular contigs. | |
| 201 """ | |
| 202 ) | |
| 203 parser.add_argument("--fasta-in", required=True, help="Input FASTA file") | |
| 204 parser.add_argument( | |
| 205 "--subseq-length", | |
| 206 type=int, | |
| 207 default=10, | |
| 208 help="Length of 3' fragment to check on the 5' end (default: 10)", | |
| 209 ) | |
| 210 parser.add_argument( | |
| 211 "--duplication-length", | |
| 212 type=int, | |
| 213 default=1000, | |
| 214 help="Length of the 3' end fragment to duplicate and add on the 5' end (default: 1000)", | |
| 215 ) | |
| 216 parser.add_argument( | |
| 217 "-v", | |
| 218 "--verbose", | |
| 219 type=int, | |
| 220 default=3, | |
| 221 choices=log_levels.keys(), | |
| 222 help="Verbosity level (0=CRITICAL, 1=ERROR, 2=WARN, 3=INFO, 4=DEBUG)", | |
| 223 ) | |
| 224 parser.add_argument( | |
| 225 "--fasta-out", | |
| 226 required=True, | |
| 227 help="Output FASTA file with extended circular contigs", | |
| 228 ) | |
| 229 parser.add_argument( | |
| 230 "--id-out", required=True, help="Output TXT file with circular sequence IDs" | |
| 231 ) | |
| 232 | |
| 233 args = parser.parse_args() | |
| 234 setup_logger(args.verbose) | |
| 235 | |
| 236 logging.info("Starting script execution.") | |
| 237 detect_circular( | |
| 238 Path(args.fasta_in), | |
| 239 Path(args.fasta_out), | |
| 240 Path(args.id_out), | |
| 241 subseq_length=args.subseq_length, | |
| 242 duplication_length=args.duplication_length, | |
| 243 ) | |
| 244 logging.info("Script execution completed.") | |
| 245 | |
| 246 | |
| 247 if __name__ == "__main__": | |
| 248 main() |
