Mercurial > repos > triasteran > ribogalaxy_umi_processing
comparison UMI_riboseq_processing/UMI.py @ 8:701804f5ad4b draft
Uploaded
| author | triasteran | 
|---|---|
| date | Tue, 21 Jun 2022 13:22:08 +0000 | 
| parents | be394fb47250 | 
| children | 31438c26afec | 
   comparison
  equal
  deleted
  inserted
  replaced
| 7:be394fb47250 | 8:701804f5ad4b | 
|---|---|
| 2 from mimetypes import guess_type | 2 from mimetypes import guess_type | 
| 3 from functools import partial | 3 from functools import partial | 
| 4 from sys import argv, exit | 4 from sys import argv, exit | 
| 5 import itertools | 5 import itertools | 
| 6 from itertools import zip_longest | 6 from itertools import zip_longest | 
| 7 | 7 import subprocess | 
| 8 from subprocess import call | |
| 8 | 9 | 
| 9 def grouper(iterable, n, fillvalue=None): | 10 def grouper(iterable, n, fillvalue=None): | 
| 10 args = [iter(iterable)] * n | 11 args = [iter(iterable)] * n | 
| 11 return zip_longest(*args, fillvalue=fillvalue) | 12 return zip_longest(*args, fillvalue=fillvalue) | 
| 12 | 13 | 
| 13 | 14 | 
| 14 chunk_size=4 | 15 def is_gz_file(filepath): | 
| 16 with open(filepath, 'rb') as test_f: | |
| 17 return test_f.read(2) == b'\x1f\x8b' | |
| 15 | 18 | 
| 16 def copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output): | |
| 17 # find wheather its plain or gzipped fastq | |
| 18 encoding = guess_type(pathToFastaFile)[1] # uses file extension | |
| 19 _open = partial(gzip.open, mode='rt') if encoding == 'gzip' else open | |
| 20 # output file will be in gz format | |
| 21 output = gzip.open(output,"wt") | |
| 22 # open and parse | |
| 23 with _open(pathToFastaFile) as f: | |
| 24 for lines in grouper(f, chunk_size, ""): | |
| 25 #lines = record.format('fastq').split('\n') # list of each record: id, seq, '+', quality | |
| 26 header = lines[0] | |
| 27 seq = lines[1] | |
| 28 sep = lines[2] | |
| 29 qual = lines[3] | |
| 30 # check if header is OK | |
| 31 if (header.startswith('@')): | |
| 32 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode | |
| 33 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN | |
| 34 split_header = header.split(" ") | |
| 35 print (split_header) | |
| 36 new_header = split_header[0]+"_"+UMI+" "+split_header[1] | |
| 37 if qual[-1:] == "\n": | |
| 38 new_qual = qual[2:-6]+"\n" | |
| 39 else: | |
| 40 new_qual = qual[2:-6] | |
| 41 output.write(new_header) | |
| 42 output.write(trimmed_seq) | |
| 43 output.write(sep) | |
| 44 output.write(new_qual) | |
| 45 | 19 | 
| 20 def lines_parse(f, output_path): | |
| 21 output = open(output_path,"w") | |
| 22 for lines in grouper(f, 4, "\n"): | |
| 23 header = lines[0] | |
| 24 #print (header) | |
| 25 seq = lines[1] | |
| 26 sep = lines[2] | |
| 27 qual = lines[3] | |
| 28 # check if header is OK | |
| 29 if (header.startswith('@')): | |
| 30 trimmed_seq = seq[2:-6]+"\n" # fooprint + barcode | |
| 31 UMI = seq[0:2]+seq.rstrip()[-5:] #7nt in total; 5'NN and last 3'NNNNN | |
| 32 split_header = header.split(" ") | |
| 33 new_header = split_header[0]+"_"+UMI+" "+split_header[1] | |
| 34 if qual[-1:] == "\n": | |
| 35 new_qual = qual[2:-6]+"\n" | |
| 36 else: | |
| 37 new_qual = qual[2:-6] | |
| 38 output.write(new_header) | |
| 39 output.write(trimmed_seq) | |
| 40 output.write(sep) | |
| 41 output.write(new_qual) | |
| 46 output.close() | 42 output.close() | 
| 47 | 43 | 
| 44 | |
| 45 def UMI_processing(pathToFastaFile, output_path): | |
| 48 | 46 | 
| 47 if is_gz_file(pathToFastaFile) == True: | |
| 48 with gzip.open(pathToFastaFile, 'rb') as file: | |
| 49 f = [x.decode("utf-8") for x in file.readlines()] | |
| 50 | |
| 51 else: | |
| 52 with open(pathToFastaFile, 'r') as file: | |
| 53 f = file.readlines() | |
| 54 | |
| 55 lines_parse(f, output_path) | |
| 56 | |
| 49 def main(): | 57 def main(): | 
| 50 if len(argv) != 3: | 58 if len(argv) != 3: | 
| 51 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") | 59 exit("Usage: 2 arguments required\n1: Path to fasta file \n2: name of output file") | 
| 52 | 60 | 
| 53 # Get paths | 61 # Get paths | 
| 54 pathToFastaFile = argv[1] | 62 pathToFastaFile = argv[1] | 
| 55 output = argv[2] | 63 output = argv[2] | 
| 56 copy_UMI_to_header_and_output_trimmed_read(pathToFastaFile, output) | 64 UMI_processing(pathToFastaFile, output) | 
| 57 | 65 | 
| 58 if __name__ == "__main__": | 66 if __name__ == "__main__": | 
| 59 main() | 67 main() | 
