Mercurial > repos > bioit_sciensano > phagetermvirome
view _modules/utilities.py @ 0:69e8f12c8b31 draft
"planemo upload"
author | bioit_sciensano |
---|---|
date | Fri, 11 Mar 2022 15:06:20 +0000 |
parents | |
children |
line wrap: on
line source
## @file utilities.py # # Gather here utility methods for phageterm. Used in both CPU and GPU version. #from string import maketrans import re import random import sys import numpy as np import datetime if sys.version_info < (3,): import string TRANSTAB = string.maketrans("ACGTN", "TGCAN") else: TRANSTAB = str.maketrans("ACGTN", "TGCAN") def checkReportTitle(report_title): """Normalise report title (take out any special char)""" default_title="Analysis_" right_now=datetime.datetime.now() default_title+=str(right_now.month) default_title+=str(right_now.day) default_title+="_" default_title+=str(right_now.hour) default_title+=str(right_now.minute) titleNorm = "" charok = list(range(48,58)) + list(range(65,91)) + list(range(97,123)) + [45,95] for char in report_title: if ord(char) in charok: titleNorm += char if len(titleNorm) > 1: return titleNorm[:20] else: return default ### SEQUENCE manipulation function def changeCase(seq): """Change lower case to UPPER CASE for a sequence string.""" return seq.upper() def reverseComplement(seq, transtab=str.maketrans('ATGCN', 'TACGN')): """Reverse Complement a sequence.""" return changeCase(seq).translate(transtab)[::-1] def longest_common_substring(read, refseq): """Longest common substring between two strings.""" m = [[0] * (1 + len(refseq)) for i in range(1 + len(read))] longest, x_longest = 0, 0 for x in range(1, 1 + len(read)): for y in range(1, 1 + len(refseq)): if read[x - 1] == refseq[y - 1]: m[x][y] = m[x - 1][y - 1] + 1 if m[x][y] > longest: longest = m[x][y] x_longest = x else: m[x][y] = 0 return read[x_longest - longest: x_longest] def hybridCoverage(read, sequence, hybrid_coverage, start, end): """Return hybrid coverage.""" aligned_part_only = longest_common_substring(read, sequence[start:end]) for i in range(start, min(len(sequence),start+len(aligned_part_only))): hybrid_coverage[i]+=1 return hybrid_coverage ## Determines if readPart maps against Sequence. # # @param readPart A part of a read (seed characters usually) # @param sequence (a contig) # It choses randomly a mapping position amongst all mappings found. # It returns 2 numbers: the start and stop position of the chosen mapping location. def applyCoverage(readPart, sequence): """Return a random match of a read onto the sequence. """ position = [] for pos in re.finditer(readPart,sequence): position.append(pos) if len(position) > 0: match = random.choice(position) return match.start(), match.end() else: return -1, -1 def correctEdge(coverage, edge): """Correction of the Edge coverage. """ correctCov = np.array([len(coverage[0])*[0], len(coverage[0])*[0]]) End = len(coverage[0]) covSta = range(edge) covEnd = range(End-edge,End) for i in range(len(coverage)): for j in range(len(coverage[i])): correctCov[i][j] = coverage[i][j] for k in covSta: correctCov[i][k+edge] += coverage[i][k+End-edge] for l in covEnd: correctCov[i][l-edge] += coverage[i][l-End+edge] return correctCov # utility class for storing results of decisionProcess function class DecisionProcessOutput: def __init__(self, Redundant, Permuted, P_class, P_type, P_seqcoh, P_concat, P_orient, P_left, P_right, Mu_like): pass