Mercurial > repos > artbio > yac_clipper
view yac.py @ 7:396bc2c718a1 draft default tip
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/yac_clipper commit a30e10dfaf7ba8a1b6e38075f058b11186790c74
author | artbio |
---|---|
date | Tue, 29 Nov 2022 08:58:44 +0000 |
parents | acbf910cd2ae |
children |
line wrap: on
line source
#!/usr/bin/python3 # yac = yet another clipper # Christophe Antoniewski <drosofff@gmail.com> import argparse from itertools import islice def Parser(): the_parser = argparse.ArgumentParser() the_parser.add_argument( '--input', action="store", nargs='+', help="input fastq files") the_parser.add_argument( '--output', action="store", type=str, help="output, clipped fasta file") the_parser.add_argument( '--output_format', action="store", type=str, help="output format, fasta or fastq") the_parser.add_argument( '--adapter_to_clip', action="store", type=str, help="adapter sequence to clip") the_parser.add_argument( '--min', action="store", type=int, help="minimal size of clipped sequence to keep") the_parser.add_argument( '--max', action="store", type=int, help="maximal size of clipped sequence to keep") the_parser.add_argument('--Nmode', action="store", type=str, choices=[ "accept", "reject"], help="accept or reject Ns in clipped sequences") args = the_parser.parse_args() args.adapter_to_clip = args.adapter_to_clip.upper() return args class Clip: def __init__(self, inputfile, outputfile, output_format, adapter, minsize, maxsize, Nmode): self.inputfile = inputfile self.outputfile = outputfile self.output_format = output_format self.adapter = adapter self.minsize = int(minsize) self.maxsize = int(maxsize) self.Nmode = Nmode for line in open(inputfile): if line[0] == "@": self.inputformat = "fastq" break elif line[0] == ">": self.inputformat = "fasta" def motives(sequence): ''' return a list of motives for perfect (6nt) or imperfect (7nt with one mismatch) search on import string module ''' sequencevariants = [ sequence[0:6]] # initializes list with 6mer perfect match dicsubst = {"A": "TGCN", "T": "AGCN", "G": "TACN", "C": "GATN"} for pos in enumerate(sequence[:6]): for subst in dicsubst[pos[1]]: sequencevariants.append( sequence[:pos[0]] + subst + sequence[pos[0] + 1:7]) return sequencevariants self.adaptmotifs = motives(self.adapter) def scanadapt(self, adaptmotives=[], sequence="", qscore=""): '''scans sequence for adapter motives''' match_position = sequence.rfind(adaptmotives[0]) if qscore: if match_position != -1: return sequence[:match_position], qscore[:match_position] for motif in adaptmotives[1:]: match_position = sequence.rfind(motif) if match_position != -1: return sequence[:match_position], qscore[:match_position] return sequence, qscore else: if match_position != -1: return sequence[:match_position] for motif in adaptmotives[1:]: match_position = sequence.rfind(motif) if match_position != -1: return sequence[:match_position] return sequence def write_output(self, id, read, qscore, output): if self.output_format == "fasta": block = ">{0}\n{1}\n".format(id, read) else: block = "@HWI-{0}\n{1}\n+\n{2}\n".format(id, read, qscore) output.write(block) def fasta_in_write_output(self, id, read, output): output.write(">{0}\n{1}\n".format(id, read)) def handle_io_fastq(self): '''Open input fastq file, pass read sequence and read qscore to scanadapt function. Pass clipped read and qscore to output function.''' id = 0 output = open(self.outputfile, "a") with open(self.inputfile, "r") as input: block_gen = islice(input, 1, None, 2) for i, line in enumerate(block_gen): if i % 2: qscore = line.rstrip() else: read = line.rstrip() continue try: trimmed_read, trimmed_qscore = self.scanadapt( self.adaptmotifs, read, qscore) except ValueError: continue if self.minsize <= len(trimmed_read) <= self.maxsize: if (self.Nmode == "reject") and ("N" in trimmed_read): continue id += 1 self.write_output(id, trimmed_read, trimmed_qscore, output) output.close() def handle_io_fasta(self): '''Open input fasta file, pass header and read sequence to scanadapt function. Pass clipped read and qscore to output function.''' id = 0 output = open(self.outputfile, "a") with open(self.inputfile, "r") as input: block_gen = islice(input, 1, None, 2) for i, line in enumerate(block_gen): read = line.rstrip() trimmed_read = self.scanadapt(self.adaptmotifs, read) if self.minsize <= len(trimmed_read) <= self.maxsize: if (self.Nmode == "reject") and ("N" in trimmed_read): continue id += 1 self.fasta_in_write_output(id, trimmed_read, output) output.close() def main(*argv): instanceClip = Clip(*argv) if instanceClip.inputformat == "fasta": instanceClip.handle_io_fasta() else: instanceClip.handle_io_fastq() if __name__ == "__main__": args = Parser() for inputfile in args.input: main(inputfile, args.output, args.output_format, args.adapter_to_clip, args.min, args.max, args.Nmode)