Mercurial > repos > triasteran > ribogalaxy_umi_processing
comparison UMI_riboseq_processing/UMI_test.py @ 5:e370df93715d draft
Uploaded
author | triasteran |
---|---|
date | Tue, 21 Jun 2022 09:22:37 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4:a580e700aac3 | 5:e370df93715d |
---|---|
1 import gzip | |
2 from mimetypes import guess_type | |
3 from functools import partial | |
4 from sys import argv, exit | |
5 import itertools | |
6 from itertools import zip_longest | |
7 | |
8 | |
9 def grouper(iterable, n, fillvalue=None): | |
10 args = [iter(iterable)] * n | |
11 return zip_longest(*args, fillvalue=fillvalue) | |
12 | |
13 | |
14 chunk_size=4 | |
15 | |
16 def copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output): | |
17 # find wheather its plain or gzipped fastq | |
18 encoding = guess_type(pathToFastaFile)[1] # uses file extension | |
19 _open = partial(gzip.open, mode='rt') if encoding == 'gzip' else open | |
20 # output file will be in gz format | |
21 output = gzip.open(output,"wt") | |
22 # open and parse | |
23 with _open(pathToFastaFile) as f: | |
24 for lines in grouper(f, chunk_size, ""): | |
25 #lines = record.format('fastq').split('\n') # list of each record: id, seq, '+', quality | |
26 header = lines[0] | |
27 seq = lines[1] | |
28 sep = lines[2] | |
29 qual = lines[3] | |
30 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode | |
31 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN | |
32 split_header = header.split(" ") | |
33 new_header = split_header[0]+"_"+UMI+" "+split_header[1] | |
34 if qual[-1:] == "\n": | |
35 new_qual = qual[2:-6]+"\n" | |
36 else: | |
37 new_qual = qual[2:-6] | |
38 output.write(new_header) | |
39 output.write(trimmed_seq) | |
40 output.write(sep) | |
41 output.write(new_qual) | |
42 | |
43 output.close() | |
44 | |
45 | |
46 def main(): | |
47 if len(argv) != 3: | |
48 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") | |
49 | |
50 # Get paths | |
51 pathToFastaFile = argv[1] | |
52 output = argv[2] | |
53 copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output) | |
54 | |
55 if __name__ == "__main__": | |
56 main() |