Mercurial > repos > petr-novak > re_utils
view pairScan.py @ 25:5dba804e6884 draft
planemo upload commit 20bdf879b52796d3fb251a20807191ff02084d3c-dirty
author | petr-novak |
---|---|
date | Wed, 02 Aug 2023 12:42:08 +0000 |
parents | e320ef2d105a |
children |
line wrap: on
line source
#!/usr/bin/env python import sys import os from optparse import OptionParser import Levenshtein class Error(Exception): """Base class for exceptions in this module.""" pass def readSingleSeq(file): line = file.readline() if not line: return False # end of file if line[0] != ">": raise Error("no header on the first line") seqname = line[1:].strip() seq = "" # read sequences while True: last_pos = file.tell() line = file.readline() if not line: break if line[0] == ">": file.seek(last_pos) break seq = seq + line.strip() return {'name': seqname, 'sequence': seq} def writeSingleSeq(fileobject, seq): fileobject.write(">") fileobject.write(seq['name'] + "\n") fileobject.write(seq['sequence'] + "\n") def comparePairs(seq1, seq2, max_mismatch=3, offset=5): s1 = seq1['sequence'].lower() s2 = seq2['sequence'].lower()[::-1] m = 0 intab = "ctagn" outtab = "gatcn" trantab = str.maketrans(intab, outtab) s2 = s2.translate(trantab) s1 = "-" * offset + s1 s2 = s2 + "-" * offset n1 = len(s1) n2 = len(s2) m = 0 for i in range(1, min(n1 + 1, n2 + 1)): #remove tails is any: ss1 = s1[n1 - i:n1] ss2 = s2[0:i] added = ss1.count("-") + ss2.count("-") d = Levenshtein.hamming(ss1, ss2) - added if 100.0 * d / i <= max_mismatch: m = max(m, i - d - added) return m def split_file(filename, N, min_chunk=2): f1 = open(filename, 'r') filenames = [filename + "." + str(i) for i in range(N)] f2 = list(map(open, filenames, 'w' * N)) while True: for i in f2: for j in range(min_chunk): line = f1.readline() if not line: [i.close() for i in f2] f1.close() return filenames i.write(line) def find_overlapping_sequences(seqfile, seqfile2=None, seqfile_good="", seqfile_bad="", min_overlap=30, max_mismatch=2, offset=5): ''' return id ove overlaping pairs - only first id is returned ''' # default names - if empty if seqfile_good == "": seqfile_good = seqfile + ".pass" if seqfile_bad == "": seqfile_bad = seqfile + ".bad" minscore = min_overlap * 2 fgood = open(seqfile_good, 'w') fbad = open(seqfile_bad, 'w') f = open(seqfile, 'r') if seqfile2: f2 = open(seqfile2) else: f2 = f while True: seq1 = readSingleSeq(f) seq2 = readSingleSeq(f2) if not seq1 or not seq2: break # end of file score = comparePairs(seq1, seq2, max_mismatch, offset=offset) if score > min_overlap: writeSingleSeq(fbad, seq1) writeSingleSeq(fbad, seq2) else: writeSingleSeq(fgood, seq1) writeSingleSeq(fgood, seq2) f.close() if not f2.closed: f2.close fgood.close() fbad.close() def main(): parser = OptionParser() parser.add_option("-f", "--fasta_file", dest="seqfile", help="input sequences in fasta format") parser.add_option( "-r", "--fasta_file2", default=None, dest="seqfile2", help= "input sequences in fasta format, second file should be specified if pairs are not interlaced, all pairs must be complete!") parser.add_option("-p", "--fasta_file_pass", dest="seqfile_good", help="output file with good sequences", default='') parser.add_option("-b", "--fasta_file_bad", dest="seqfile_bad", help="output file with bad sequences", default='') parser.add_option("-o", "--minimal_overlap", dest="min_overlap", help="minimal overlap between pair ends", default='30') parser.add_option( "-m", "--max_mismatch", dest="max_mismatch", help="maximum number of mismatches in overlap per 100 nt", default='2') parser.add_option("-s", "--offset", dest="offset", help="maximum offset", default='5') options, args = parser.parse_args() find_overlapping_sequences(seqfile=options.seqfile, seqfile2=options.seqfile2, seqfile_good=options.seqfile_good, seqfile_bad=options.seqfile_bad, min_overlap=int(options.min_overlap), max_mismatch=int(options.max_mismatch), offset=int(options.offset)) if __name__ == "__main__": main()