comparison UMI_riboseq_processing/UMI.py @ 8:701804f5ad4b draft

Uploaded
author triasteran
date Tue, 21 Jun 2022 13:22:08 +0000
parents be394fb47250
children 31438c26afec
comparison
equal deleted inserted replaced
7:be394fb47250 8:701804f5ad4b
2 from mimetypes import guess_type 2 from mimetypes import guess_type
3 from functools import partial 3 from functools import partial
4 from sys import argv, exit 4 from sys import argv, exit
5 import itertools 5 import itertools
6 from itertools import zip_longest 6 from itertools import zip_longest
7 7 import subprocess
8 from subprocess import call
8 9
9 def grouper(iterable, n, fillvalue=None): 10 def grouper(iterable, n, fillvalue=None):
10 args = [iter(iterable)] * n 11 args = [iter(iterable)] * n
11 return zip_longest(*args, fillvalue=fillvalue) 12 return zip_longest(*args, fillvalue=fillvalue)
12 13
13 14
14 chunk_size=4 15 def is_gz_file(filepath):
16 with open(filepath, 'rb') as test_f:
17 return test_f.read(2) == b'\x1f\x8b'
15 18
16 def copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output):
17 # find wheather its plain or gzipped fastq
18 encoding = guess_type(pathToFastaFile)[1] # uses file extension
19 _open = partial(gzip.open, mode='rt') if encoding == 'gzip' else open
20 # output file will be in gz format
21 output = gzip.open(output,"wt")
22 # open and parse
23 with _open(pathToFastaFile) as f:
24 for lines in grouper(f, chunk_size, ""):
25 #lines = record.format('fastq').split('\n') # list of each record: id, seq, '+', quality
26 header = lines[0]
27 seq = lines[1]
28 sep = lines[2]
29 qual = lines[3]
30 # check if header is OK
31 if (header.startswith('@')):
32 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode
33 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN
34 split_header = header.split(" ")
35 print (split_header)
36 new_header = split_header[0]+"_"+UMI+" "+split_header[1]
37 if qual[-1:] == "\n":
38 new_qual = qual[2:-6]+"\n"
39 else:
40 new_qual = qual[2:-6]
41 output.write(new_header)
42 output.write(trimmed_seq)
43 output.write(sep)
44 output.write(new_qual)
45 19
20 def lines_parse(f, output_path):
21 output = open(output_path,"w")
22 for lines in grouper(f, 4, "\n"):
23 header = lines[0]
24 #print (header)
25 seq = lines[1]
26 sep = lines[2]
27 qual = lines[3]
28 # check if header is OK
29 if (header.startswith('@')):
30 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode
31 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN
32 split_header = header.split(" ")
33 new_header = split_header[0]+"_"+UMI+" "+split_header[1]
34 if qual[-1:] == "\n":
35 new_qual = qual[2:-6]+"\n"
36 else:
37 new_qual = qual[2:-6]
38 output.write(new_header)
39 output.write(trimmed_seq)
40 output.write(sep)
41 output.write(new_qual)
46 output.close() 42 output.close()
47 43
44
45 def UMI_processing(pathToFastaFile, output_path):
48 46
47 if is_gz_file(pathToFastaFile) == True:
48 with gzip.open(pathToFastaFile, 'rb') as file:
49 f = [x.decode("utf-8") for x in file.readlines()]
50
51 else:
52 with open(pathToFastaFile, 'r') as file:
53 f = file.readlines()
54
55 lines_parse(f, output_path)
56
49 def main(): 57 def main():
50 if len(argv) != 3: 58 if len(argv) != 3:
51 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") 59 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file")
52 60
53 # Get paths 61 # Get paths
54 pathToFastaFile = argv[1] 62 pathToFastaFile = argv[1]
55 output = argv[2] 63 output = argv[2]
56 copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output) 64 UMI_processing(pathToFastaFile, output)
57 65
58 if __name__ == "__main__": 66 if __name__ == "__main__":
59 main() 67 main()