Mercurial > repos > triasteran > ribogalaxy_umi_processing
view UMI_riboseq_processing/UMI.py @ 8:701804f5ad4b draft
Uploaded
author | triasteran |
---|---|
date | Tue, 21 Jun 2022 13:22:08 +0000 |
parents | be394fb47250 |
children | 31438c26afec |
line wrap: on
line source
import gzip from mimetypes import guess_type from functools import partial from sys import argv, exit import itertools from itertools import zip_longest import subprocess from subprocess import call def grouper(iterable, n, fillvalue=None): args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue) def is_gz_file(filepath): with open(filepath, 'rb') as test_f: return test_f.read(2) == b'\x1f\x8b' def lines_parse(f, output_path): output = open(output_path,"w") for lines in grouper(f, 4, "\n"): header = lines[0] #print (header) seq = lines[1] sep = lines[2] qual = lines[3] # check if header is OK if (header.startswith('@')): trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN split_header = header.split(" ") new_header = split_header[0]+"_"+UMI+" "+split_header[1] if qual[-1:] == "\n": new_qual = qual[2:-6]+"\n" else: new_qual = qual[2:-6] output.write(new_header) output.write(trimmed_seq) output.write(sep) output.write(new_qual) output.close() def UMI_processing(pathToFastaFile, output_path): if is_gz_file(pathToFastaFile) == True: with gzip.open(pathToFastaFile, 'rb') as file: f = [x.decode("utf-8") for x in file.readlines()] else: with open(pathToFastaFile, 'r') as file: f = file.readlines() lines_parse(f, output_path) def main(): if len(argv) != 3: exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") # Get paths pathToFastaFile = argv[1] output = argv[2] UMI_processing(pathToFastaFile, output) if __name__ == "__main__": main()