Mercurial > repos > bioit_sciensano > phagetermvirome
diff _modules/utilities.py @ 0:69e8f12c8b31 draft
"planemo upload"
author | bioit_sciensano |
---|---|
date | Fri, 11 Mar 2022 15:06:20 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/_modules/utilities.py Fri Mar 11 15:06:20 2022 +0000 @@ -0,0 +1,106 @@ +## @file utilities.py +# +# Gather here utility methods for phageterm. Used in both CPU and GPU version. +#from string import maketrans +import re +import random +import sys + +import numpy as np +import datetime + +if sys.version_info < (3,): + import string + TRANSTAB = string.maketrans("ACGTN", "TGCAN") +else: + TRANSTAB = str.maketrans("ACGTN", "TGCAN") + +def checkReportTitle(report_title): + """Normalise report title (take out any special char)""" + default_title="Analysis_" + right_now=datetime.datetime.now() + default_title+=str(right_now.month) + default_title+=str(right_now.day) + default_title+="_" + default_title+=str(right_now.hour) + default_title+=str(right_now.minute) + titleNorm = "" + charok = list(range(48,58)) + list(range(65,91)) + list(range(97,123)) + [45,95] + for char in report_title: + if ord(char) in charok: + titleNorm += char + if len(titleNorm) > 1: + return titleNorm[:20] + else: + return default + +### SEQUENCE manipulation function +def changeCase(seq): + """Change lower case to UPPER CASE for a sequence string.""" + return seq.upper() + + +def reverseComplement(seq, transtab=str.maketrans('ATGCN', 'TACGN')): + """Reverse Complement a sequence.""" + return changeCase(seq).translate(transtab)[::-1] + +def longest_common_substring(read, refseq): + """Longest common substring between two strings.""" + m = [[0] * (1 + len(refseq)) for i in range(1 + len(read))] + longest, x_longest = 0, 0 + for x in range(1, 1 + len(read)): + for y in range(1, 1 + len(refseq)): + if read[x - 1] == refseq[y - 1]: + m[x][y] = m[x - 1][y - 1] + 1 + if m[x][y] > longest: + longest = m[x][y] + x_longest = x + else: + m[x][y] = 0 + return read[x_longest - longest: x_longest] + +def hybridCoverage(read, sequence, hybrid_coverage, start, end): + """Return hybrid coverage.""" + aligned_part_only = longest_common_substring(read, sequence[start:end]) + for i in range(start, min(len(sequence),start+len(aligned_part_only))): + hybrid_coverage[i]+=1 + return hybrid_coverage + +## Determines if readPart maps against Sequence. +# +# @param readPart A part of a read (seed characters usually) +# @param sequence (a contig) +# It choses randomly a mapping position amongst all mappings found. +# It returns 2 numbers: the start and stop position of the chosen mapping location. +def applyCoverage(readPart, sequence): + """Return a random match of a read onto the sequence. """ + position = [] + for pos in re.finditer(readPart,sequence): + position.append(pos) + if len(position) > 0: + match = random.choice(position) + return match.start(), match.end() + else: + return -1, -1 + +def correctEdge(coverage, edge): + """Correction of the Edge coverage. """ + correctCov = np.array([len(coverage[0])*[0], len(coverage[0])*[0]]) + End = len(coverage[0]) + covSta = range(edge) + covEnd = range(End-edge,End) + for i in range(len(coverage)): + for j in range(len(coverage[i])): + correctCov[i][j] = coverage[i][j] + for k in covSta: + correctCov[i][k+edge] += coverage[i][k+End-edge] + for l in covEnd: + correctCov[i][l-edge] += coverage[i][l-End+edge] + return correctCov + +# utility class for storing results of decisionProcess function +class DecisionProcessOutput: + def __init__(self, Redundant, Permuted, P_class, P_type, P_seqcoh, P_concat, + P_orient, P_left, P_right, Mu_like): + pass +