Mercurial > repos > dereeper > ragoo
diff RaGOO/sam2delta.py @ 13:b9a3aeb162ab draft default tip
Uploaded
author | dereeper |
---|---|
date | Mon, 26 Jul 2021 18:22:37 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/RaGOO/sam2delta.py Mon Jul 26 18:22:37 2021 +0000 @@ -0,0 +1,239 @@ +#!/usr/bin/env python +import argparse +from collections import defaultdict + +""" +This utility converts a SAM file to a nucmer delta file. + +SAM files must contain an NM tag, which is default in minimap2 alignments. +""" + + +class SAMAlignment: + + def __init__(self, in_ref_header, in_query_header, in_ref_start, in_cigar, in_flag, in_seq_len, in_num_mismatches): + self.num_mismatches = in_num_mismatches + self.seq_len = in_seq_len + self.flag = int(in_flag) + self.is_rc = False + + # Check if the query was reverse complemented + if self.flag & 16: + self.is_rc = True + + self.cigar = in_cigar + self.parsed_cigar = [] + self._parse_cigar() + + if self.is_rc: + self.parsed_cigar = self.parsed_cigar[::-1] + + self.ref_header = in_ref_header + self.query_header = in_query_header + self.ref_start = int(in_ref_start) + self.ref_end = None + self.query_start = None + self.query_end = None + self.query_aln_len = 0 + self.ref_aln_len = 0 + + num_hard_clipped = 0 + for i in self.parsed_cigar: + if i[-1] == 'H': + num_hard_clipped += int(i[:-1]) + + if self.seq_len == 1: + self.seq_len = 0 + for i in self.parsed_cigar: + if i[-1] == 'M' or i[-1] == 'I' or i[-1] == 'S' or i[-1] == 'X' or i[-1] == '=': + self.seq_len += int(i[:-1]) + + self.query_len = self.seq_len + num_hard_clipped + + self._get_query_start() + self._get_query_aln_len() + self._get_ref_aln_len() + self._get_query_end() + self._get_ref_end() + + if self.is_rc: + self.query_start, self.query_end = self.query_end, self.query_start + # Lazily taking care of off-by-one errors + self.query_end += 1 + self.query_start -= 1 + + self.query_end -= 1 + self.ref_end -= 1 + + def __str__(self): + return ' '.join([self.ref_header, self.query_header]) + + def __repr__(self): + return '<' + ' '.join([self.ref_header, self.query_header]) + '>' + + def _get_query_start(self): + """ + An alignment will either start with soft clipping, hard clipping, or with one of the alignment + codes (e.g. Match/Mismatch (M) or InDel (I/D)). If hard or soft clipped, the start of the alignment is + the very next base after clipping. If the alignment has commenced, then the start of the alignment is 1. + + Notes: + At least for now, I am only specifying this for minimap2 output, which I believe only has the following + characters in cigar strings: + {'I', 'S', 'M', 'D', 'H'} + so I will only handle these cases. + """ + if self.parsed_cigar[0][-1] == 'I' or self.parsed_cigar[0][-1] == 'D' or self.parsed_cigar[0][-1] == 'M': + self.query_start = 1 + else: + # The alignment has started with either soft or hard clipping + # Need to double check if a 1 needs to be added here + self.query_start = int(self.parsed_cigar[0][:-1]) + 1 + + def _get_query_aln_len(self): + """ Just the addition of all cigar codes which "consume" the query (M, I)""" + for i in self.parsed_cigar: + if i[-1] == 'M' or i[-1] == 'I': + self.query_aln_len += int(i[:-1]) + + def _get_ref_aln_len(self): + for i in self.parsed_cigar: + if i[-1] == 'M' or i[-1] == 'D': + self.ref_aln_len += int(i[:-1]) + + def _get_query_end(self): + """ This is the length of the alignment + query start """ + self.query_end = self.query_start + self.query_aln_len + + def _get_ref_end(self): + """ This is the length of the alignment + query start """ + self.ref_end = self.ref_start + self.ref_aln_len + + def _parse_cigar(self): + cigar_chars = { + 'M', + 'I', + 'D', + 'N', + 'S', + 'H', + 'P', + '=', + 'X' + } + + this_field = '' + for char in self.cigar: + this_field += char + if char in cigar_chars: + self.parsed_cigar.append(this_field) + this_field = '' + + +def write_delta(in_alns, in_file_name): + with open(in_file_name, 'w') as f: + f.write('file1 file2\n') + f.write('NUCMER\n') + for aln in in_alns.keys(): + query_len = in_alns[aln][0].query_len + #print (query_len) + f.write('>%s %s %r %r\n' % (aln[0], aln[1], ref_chr_lens[aln[0]], query_len)) + for i in in_alns[aln]: + f.write('%r %r %r %r %r %r %r\n' % ( + i.ref_start, + i.ref_end, + i.query_start, + i.query_end, + i.num_mismatches, + i.num_mismatches, + 0 + )) + # Continue with the cigar string + offsets = [] + cigar = i.parsed_cigar + if cigar[0][-1] == 'S' or cigar[0][-1] == 'H': + cigar = cigar[1:-1] + else: + cigar = cigar[:-1] + + counter = 1 + for code in cigar: + if code[-1] == 'M': + counter += int(code[:-1]) + elif code[-1] == 'D': + offsets.append(counter) + num_I = int(code[:-1]) + for i in range(1, num_I): + offsets.append(1) + counter = 1 + elif code[-1] == 'I': + offsets.append(-1*counter) + num_I = int(code[:-1]) + for i in range(1, num_I): + offsets.append(-1) + counter = 1 + else: + raise ValueError('Unexpected CIGAR code') + offsets.append(0) + offsets = [str(i) for i in offsets] + f.write('\n'.join(offsets) + '\n') + + +parser = argparse.ArgumentParser(description='Convert a SAM file to a nucmer delta file.') +parser.add_argument("sam_file", metavar="<alns.sam>", type=str, help="SAM file to convert") + +args = parser.parse_args() +sam_file = args.sam_file + +# Make a dictionary storing all alignments between and query and a reference sequence. +# key = (reference_header, query_header) +# value = list of alignments between those sequences. only store info needed for the delta file + +alns = defaultdict(list) + +# Read through the sam file +ref_chr_lens = dict() +with open(sam_file) as f: + for line in f: + # Skip SAM headers + if line.startswith('@SQ'): + header_list = line.split('\t') + chrom = header_list[1].replace('SN:', '') + chr_len = int(header_list[2].replace('LN:', '')) + ref_chr_lens[chrom] = chr_len + continue + + if line.startswith('@'): + continue + + fields = line.split('\t') + ref_header = fields[2] + query_header = fields[0] + ref_start = fields[3] + cigar = fields[5] + flag = fields[1] + seq_len = len(fields[9]) + + if not cigar == '*': + # Get the NM flag + nm = None + for i in fields: + if i.startswith('NM:i'): + nm = int(i[5:]) + continue + + if nm is None: + raise ValueError('SAM file must include NM tag.') + + x = SAMAlignment( + ref_header, + query_header, + ref_start, + cigar, + flag, + seq_len, + nm + ) + alns[(ref_header, query_header)].append(x) + +write_delta(alns, sam_file + '.delta')