Mercurial > repos > triasteran > ribogalaxy_umi_processing
view UMI_riboseq_processing/UMI.py @ 7:be394fb47250 draft
Uploaded
author | triasteran |
---|---|
date | Tue, 21 Jun 2022 09:50:49 +0000 |
parents | 1ce4b52212c4 |
children | 701804f5ad4b |
line wrap: on
line source
import gzip from mimetypes import guess_type from functools import partial from sys import argv, exit import itertools from itertools import zip_longest def grouper(iterable, n, fillvalue=None): args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue) chunk_size=4 def copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output): # find wheather its plain or gzipped fastq encoding = guess_type(pathToFastaFile)[1] # uses file extension _open = partial(gzip.open, mode='rt') if encoding == 'gzip' else open # output file will be in gz format output = gzip.open(output,"wt") # open and parse with _open(pathToFastaFile) as f: for lines in grouper(f, chunk_size, ""): #lines = record.format('fastq').split('\n') # list of each record: id, seq, '+', quality header = lines[0] seq = lines[1] sep = lines[2] qual = lines[3] # check if header is OK if (header.startswith('@')): trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN split_header = header.split(" ") print (split_header) new_header = split_header[0]+"_"+UMI+" "+split_header[1] if qual[-1:] == "\n": new_qual = qual[2:-6]+"\n" else: new_qual = qual[2:-6] output.write(new_header) output.write(trimmed_seq) output.write(sep) output.write(new_qual) output.close() def main(): if len(argv) != 3: exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") # Get paths pathToFastaFile = argv[1] output = argv[2] copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output) if __name__ == "__main__": main()