diff scripts/modules/typing.py @ 0:c6bab5103a14 draft

"planemo upload commit 6abf3e299d82d07e6c3cf8642bdea80e96df64c3-dirty"
author iss
date Mon, 21 Mar 2022 15:23:09 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scripts/modules/typing.py	Mon Mar 21 15:23:09 2022 +0000
@@ -0,0 +1,84 @@
+import os.path
+import functools
+
+try:
+    import modules.utils as utils
+except ImportError:
+    from pathotyping.modules import utils as utils
+
+
+# {1: {'gene_identity': 0, 'gene_mean_read_coverage': 0.0, 'gene_number_positions_multiple_alleles': 0, 'header': 'fyuA', 'gene_coverage': 0.0, 'gene_low_coverage': 100.0}}
+
+
+def simplify_data_by_gene(data_by_gene):
+    cleaned_data_by_gene = {}
+    for counter, data in list(data_by_gene.items()):
+        cleaned_data_by_gene[data['header'].lower()] = {'gene_identity': data['gene_identity'],
+                                                        'gene_coverage': data['gene_coverage'],
+                                                        'gene_depth': data['gene_mean_read_coverage']}
+    return cleaned_data_by_gene
+
+
+def possible_types(data_by_gene, typing_rules_file, min_gene_coverage, min_gene_identity, min_gene_depth):
+    data_by_gene = simplify_data_by_gene(data_by_gene)
+
+    possible_pathotypes = []
+    genes_present = []
+    with open(typing_rules_file, 'rtU') as reader:
+        genes = []
+        for line in reader:
+            line = line.splitlines()[0]
+            if len(line) > 0:
+                line = line.split('\t')
+                if not line[0].startswith('##'):
+                    if line[0].startswith('#'):
+                        genes = line[1:]
+                    else:
+                        profile = line[1:]
+                        congruence = []
+                        for x, gene_requirement in enumerate(profile):
+                            if data_by_gene[genes[x].lower()]['gene_coverage'] >= min_gene_coverage and \
+                                    data_by_gene[genes[x].lower()]['gene_identity'] >= min_gene_identity and \
+                                    data_by_gene[genes[x].lower()]['gene_depth'] >= min_gene_depth:
+                                gene_present = True
+                                genes_present.append(genes[x])
+                            else:
+                                gene_present = False
+
+                            gene_requirement = \
+                                True if gene_requirement == '1' else False if gene_requirement == '0' else None
+                            if gene_requirement is not None:
+                                if gene_present == gene_requirement:
+                                    congruence.append(True)
+                                else:
+                                    congruence.append(False)
+                            else:
+                                congruence.append(True)
+                        if all(congruence):
+                            possible_pathotypes.append(line[0])
+    return possible_pathotypes, list(set(genes_present))
+
+
+module_timer = functools.partial(utils.timer, name='Module Typing')
+
+
+@module_timer
+def typing(data_by_gene, typing_rules_file, min_gene_coverage, min_gene_identity, min_gene_depth, outdir):
+    possible_pathotypes, genes_present = possible_types(data_by_gene, typing_rules_file, min_gene_coverage,
+                                                        min_gene_identity, min_gene_depth)
+    with open(os.path.join(outdir, 'patho_typing.report.txt'), 'wt') as writer_report:
+        with open(os.path.join(outdir, 'patho_typing.extended_report.txt'), 'wt') as writer_extended_report:
+            writer_extended_report.write('#Pathotypes_found' + '\n')
+            if len(possible_pathotypes) > 0:
+                writer_report.write('\n'.join(possible_pathotypes) + '\n')
+                writer_extended_report.write('\n'.join(possible_pathotypes) + '\n')
+                print('\n' + 'Pathotypes found:' + '\n')
+                print('\n'.join(possible_pathotypes) + '\n')
+            else:
+                writer_report.write('TND' + '\n')  # Type Not Determined
+                writer_extended_report.write('TND' + '\n')  # Type Not Determined
+                print('\n' + 'It was not possible to identify any possible pathotype match' + '\n')
+            writer_extended_report.write('\n' + '#Genes_present' + '\n')
+            writer_extended_report.write('\n'.join(genes_present) + '\n')
+
+    return None, None