changeset 9:f641e52353e8 draft

"planemo upload for repository https://github.com/gregvonkuster/galaxy_tools/tree/master/tools/sequence_analysis/vsnp/vsnp_build_tables commit 1131a7accc36df73eac621f6ae8aa3cb62403bde"
author greg
date Thu, 29 Jul 2021 13:52:48 +0000
parents c3a6795aed09
children 14384fd2a7e2
files vsnp_build_tables.py vsnp_build_tables.xml
diffstat 2 files changed, 225 insertions(+), 114 deletions(-) [+]
line wrap: on
line diff
--- a/vsnp_build_tables.py	Thu Jul 22 18:08:08 2021 +0000
+++ b/vsnp_build_tables.py	Thu Jul 29 13:52:48 2021 +0000
@@ -1,7 +1,9 @@
 #!/usr/bin/env python
 
 import argparse
+import multiprocessing
 import os
+import queue
 import re
 
 import pandas
@@ -16,6 +18,9 @@
 # to use LibreOffice for Excel spreadsheets.
 MAXCOLS = 1024
 OUTPUT_EXCEL_DIR = 'output_excel_dir'
+INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir'
+INPUT_JSON_DIR = 'input_json_dir'
+INPUT_NEWICK_DIR = 'input_newick_dir'
 
 
 def annotate_table(table_df, group, annotation_dict):
@@ -221,74 +226,94 @@
         output_excel(df, type_str, group_str, annotation_dict)
 
 
-def preprocess_tables(newick_file, json_file, json_avg_mq_file, annotation_dict):
-    avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
-    # Map quality to dataframe.
-    mqdf = avg_mq_series.to_frame(name='MQ')
-    mqdf = mqdf.T
-    # Get the group.
-    group = get_sample_name(newick_file)
-    snps_df = pandas.read_json(json_file, orient='split')
-    with open(newick_file, 'r') as fh:
-        for line in fh:
-            line = re.sub('[:,]', '\n', line)
-            line = re.sub('[)(]', '', line)
-            line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
-            line = re.sub('root\n', '', line)
-    sample_order = line.split('\n')
-    sample_order = list([_f for _f in sample_order if _f])
-    sample_order.insert(0, 'root')
-    tree_order = snps_df.loc[sample_order]
-    # Count number of SNPs in each column.
-    snp_per_column = []
-    for column_header in tree_order:
-        count = 0
-        column = tree_order[column_header]
-        for element in column:
-            if element != column[0]:
-                count = count + 1
-        snp_per_column.append(count)
-    row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
-    # Count number of SNPS from the
-    # top of each column in the table.
-    snp_from_top = []
-    for column_header in tree_order:
-        count = 0
-        column = tree_order[column_header]
-        # for each element in the column
-        # skip the first element
-        for element in column[1:]:
-            if element == column[0]:
-                count = count + 1
-            else:
-                break
-        snp_from_top.append(count)
-    row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
-    tree_order = tree_order.append([row1])
-    tree_order = tree_order.append([row2])
-    # In pandas=0.18.1 even this does not work:
-    # abc = row1.to_frame()
-    # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
-    # tree_order.append(abc)
-    # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
-    tree_order = tree_order.T
-    tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
-    tree_order = tree_order.T
-    # Remove snp_per_column and snp_from_top rows.
-    cascade_order = tree_order[:-2]
-    # Output the cascade table.
-    output_cascade_table(cascade_order, mqdf, group, annotation_dict)
-    # Output the sorted table.
-    output_sort_table(cascade_order, mqdf, group, annotation_dict)
+def preprocess_tables(task_queue, annotation_dict, timeout):
+    while True:
+        try:
+            tup = task_queue.get(block=True, timeout=timeout)
+        except queue.Empty:
+            break
+        newick_file, json_file, json_avg_mq_file = tup
+        avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
+        # Map quality to dataframe.
+        mqdf = avg_mq_series.to_frame(name='MQ')
+        mqdf = mqdf.T
+        # Get the group.
+        group = get_sample_name(newick_file)
+        snps_df = pandas.read_json(json_file, orient='split')
+        with open(newick_file, 'r') as fh:
+            for line in fh:
+                line = re.sub('[:,]', '\n', line)
+                line = re.sub('[)(]', '', line)
+                line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
+                line = re.sub('root\n', '', line)
+        sample_order = line.split('\n')
+        sample_order = list([_f for _f in sample_order if _f])
+        sample_order.insert(0, 'root')
+        tree_order = snps_df.loc[sample_order]
+        # Count number of SNPs in each column.
+        snp_per_column = []
+        for column_header in tree_order:
+            count = 0
+            column = tree_order[column_header]
+            for element in column:
+                if element != column[0]:
+                    count = count + 1
+            snp_per_column.append(count)
+        row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
+        # Count number of SNPS from the
+        # top of each column in the table.
+        snp_from_top = []
+        for column_header in tree_order:
+            count = 0
+            column = tree_order[column_header]
+            # for each element in the column
+            # skip the first element
+            for element in column[1:]:
+                if element == column[0]:
+                    count = count + 1
+                else:
+                    break
+            snp_from_top.append(count)
+        row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
+        tree_order = tree_order.append([row1])
+        tree_order = tree_order.append([row2])
+        # In pandas=0.18.1 even this does not work:
+        # abc = row1.to_frame()
+        # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
+        # tree_order.append(abc)
+        # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
+        tree_order = tree_order.T
+        tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
+        tree_order = tree_order.T
+        # Remove snp_per_column and snp_from_top rows.
+        cascade_order = tree_order[:-2]
+        # Output the cascade table.
+        output_cascade_table(cascade_order, mqdf, group, annotation_dict)
+        # Output the sorted table.
+        output_sort_table(cascade_order, mqdf, group, annotation_dict)
+        task_queue.task_done()
+
+
+def set_num_cpus(num_files, processes):
+    num_cpus = int(multiprocessing.cpu_count())
+    if num_files < num_cpus and num_files < processes:
+        return num_files
+    if num_cpus < processes:
+        half_cpus = int(num_cpus / 2)
+        if num_files < half_cpus:
+            return num_files
+        return half_cpus
+    return processes
 
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser()
 
+    parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file')
+    parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file')
+    parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file')
     parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'),
-    parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', help='Average MQ json file')
-    parser.add_argument('--input_newick', action='store', dest='input_newick', help='Newick file')
-    parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', help='SNPs json file')
+    parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
 
     args = parser.parse_args()
 
@@ -299,4 +324,56 @@
     else:
         annotation_dict = None
 
-    preprocess_tables(args.input_newick, args.input_snps_json, args.input_avg_mq_json, annotation_dict)
+    # The assumption here is that the list of files
+    # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are
+    # named such that they are properly matched if
+    # the directories contain more than 1 file (i.e.,
+    # hopefully the newick file names and json file names
+    # will be something like Mbovis-01D6_* so they can be
+    # sorted and properly associated with each other).
+    if args.input_newick is not None:
+        newick_files = [args.input_newick]
+    else:
+        newick_files = []
+        for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)):
+            file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name))
+            newick_files.append(file_path)
+    if args.input_snps_json is not None:
+        json_files = [args.input_snps_json]
+    else:
+        json_files = []
+        for file_name in sorted(os.listdir(INPUT_JSON_DIR)):
+            file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name))
+            json_files.append(file_path)
+    if args.input_avg_mq_json is not None:
+        json_avg_mq_files = [args.input_avg_mq_json]
+    else:
+        json_avg_mq_files = []
+        for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)):
+            file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name))
+            json_avg_mq_files.append(file_path)
+
+    multiprocessing.set_start_method('spawn')
+    queue1 = multiprocessing.JoinableQueue()
+    queue2 = multiprocessing.JoinableQueue()
+    num_files = len(newick_files)
+    cpus = set_num_cpus(num_files, args.processes)
+    # Set a timeout for get()s in the queue.
+    timeout = 0.05
+
+    for i, newick_file in enumerate(newick_files):
+        json_file = json_files[i]
+        json_avg_mq_file = json_avg_mq_files[i]
+        queue1.put((newick_file, json_file, json_avg_mq_file))
+
+    # Complete the preprocess_tables task.
+    processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)]
+    for p in processes:
+        p.start()
+    for p in processes:
+        p.join()
+    queue1.join()
+
+    if queue1.empty():
+        queue1.close()
+        queue1.join_thread()
--- a/vsnp_build_tables.xml	Thu Jul 22 18:08:08 2021 +0000
+++ b/vsnp_build_tables.xml	Thu Jul 29 13:52:48 2021 +0000
@@ -1,4 +1,4 @@
-<tool id="vsnp_build_tables" name="vSNP: build tables" version="@WRAPPER_VERSION@.2+galaxy0" profile="@PROFILE@">
+<tool id="vsnp_build_tables" name="vSNP: build tables" version="@WRAPPER_VERSION@.3+galaxy0" profile="@PROFILE@">
     <description></description>
     <macros>
         <import>macros.xml</import>
@@ -10,35 +10,66 @@
     </requirements>
     <command detect_errors="exit_code"><![CDATA[
 #import re
-
-mkdir 'output_excel_dir' &&
-
-## The input_snps_json and input_avg_mq_json identifiers
-## are typically the same string, so we append a uniquq
-## extension to enable the links.
-#set input_snps_json_identifier = re.sub('[^\s\w\-]', '_', str($input_snps_json.element_identifier)) + '.snps'
-ln -s '${input_snps_json}' '${input_snps_json_identifier}' &&
-#set input_avg_mq_json_identifier = re.sub('[^\s\w\-]', '_', str($input_avg_mq_json.element_identifier)) + '.avg_mq'
-ln -s '${input_avg_mq_json}' '${input_avg_mq_json_identifier}' &&
-#set input_newick_identifier = re.sub('[^\s\w\-]', '_', str($input_newick.element_identifier))
-ln -s '${input_newick}' '${input_newick_identifier}' &&
-
+#set output_excel_dir = 'output_excel_dir'
+#set input_type = $input_type_cond.input_type
+mkdir $output_excel_dir &&
+#if $input_type == "collection":
+    #set input_newick_dir = 'input_newick_dir'
+    mkdir $input_newick_dir &&
+    #set input_json_avg_mq_dir = 'input_json_avg_mq_dir'
+    mkdir $input_json_avg_mq_dir &&
+    #set input_json_dir = 'input_json_dir'
+    mkdir $input_json_dir &&
+    #for $i in $input_type_cond.input_avg_mq_json_collection:
+        #set file_name = $i.file_name
+        #set identifier = re.sub('[^\s\w\-]', '_', str($i.element_identifier))
+        ln -s '$file_name' '$input_json_avg_mq_dir/$identifier' &&
+    #end for
+    #for $i in $input_type_cond.input_snps_json_collection:
+        #set file_name = $i.file_name
+        #set identifier = re.sub('[^\s\w\-]', '_', str($i.element_identifier))
+        ln -s '$file_name' '$input_json_dir/$identifier' &&
+    #end for
+    #for $i in $input_type_cond.input_newick_collection:
+        #set file_name = $i.file_name
+        #set identifier = re.sub('[^\s\w\-]', '_', str($i.element_identifier))
+        ln -s '$file_name' '$input_newick_dir/$identifier' &&
+    #end for
+#end if
 python '$__tool_directory__/vsnp_build_tables.py'
---input_snps_json '${input_snps_json_identifier}'
---input_avg_mq_json '${input_avg_mq_json_identifier}'
---input_newick '${input_newick_identifier}'
-#if str($gbk_cond.gbk_param) == 'yes':
-    #if str($gbk_cond.gbk_source_cond.gbk_source) == 'cached':
-        --gbk_file '$gbk_cond.gbk_source_cond.gbk_file.fields.path'
+#if $input_type == "single":
+    --input_avg_mq_json '$input_avg_mq_json'
+    --input_snps_json '$input_snps_json'
+    --input_newick '$input_newick'
+#end if:
+#if str($gbk_cond.gbk_param) == "yes":
+    #set gbk_source_cond = $gbk_cond.gbk_source_cond
+    #set gbk_source = $gbk_source_cond.gbk_source
+    #if str($gbk_source) == "cached":
+        --gbk_file '$gbk_source_cond.gbk_file.fields.path'
     #else:
-        --gbk_file '$gbk_cond.gbk_source_cond.gbk_file'
+        --gbk_file '$gbk_source_cond.gbk_file'
     #end if
 #end if
+--processes \${GALAXY_SLOTS:-8}
 ]]></command>
     <inputs>
-        <param name="input_snps_json" type="data" format="json" label="SNPs json file"/>
-        <param name="input_avg_mq_json" type="data" format="json" label="Average MQ json file"/>
-        <param name="input_newick" type="data" format="newick" label="Best-scoring ML tree file"/>
+        <conditional name="input_type_cond">
+            <param name="input_type" type="select" label="Choose the category for the files to be analyzed">
+                <option value="single" selected="true">Single files</option>
+                <option value="collection">Collection of files</option>
+            </param>
+            <when value="single">
+                <param name="input_snps_json" type="data" format="json" label="SNPs json file"/>
+                <param name="input_avg_mq_json" type="data" format="json" label="Average MQ json file"/>
+                <param name="input_newick" type="data" format="newick" label="Best-scoring ML tree file"/>
+            </when>
+            <when value="collection">
+                <param name="input_snps_json_collection" format="json" type="data_collection" collection_type="list" label="Collection of SNPs json files"/>
+                <param name="input_avg_mq_json_collection" format="json" type="data_collection" collection_type="list" label="Collection of average MQ json files"/>
+                <param name="input_newick_collection" format="newick" type="data_collection" collection_type="list" label="Collection of best-scoring ML tree files"/>
+            </when>
+        </conditional>
         <conditional name="gbk_cond">
             <param name="gbk_param" type="select" label="Use Genbank file?">
                 <option value="yes" selected="true">yes</option>
@@ -53,7 +84,6 @@
                     <when value="cached">
                         <param name="gbk_file" type="select" label="Genbank file">
                             <options from_data_table="vsnp_genbank">
-                                <filter type="data_meta" column="0" key="dbkey" ref="input_avg_mq_json"/>
                                 <validator type="no_options" message="A cached Genbank file is not available for the build associated with the selected average MQ json file"/>
                             </options>
                         </param>
@@ -80,38 +110,41 @@
             <param name="input_avg_mq_json" value="input_avg_mq_json.json" ftype="json" dbkey="89"/>
             <param name="gbk_param" value="no"/>
             <output_collection name="excel" type="list" count="2">
-                <element name="input_newick_newick_cascade_table" file="cascade_table.xlsx" ftype="xlsx" compare="sim_size"/>
-                <element name="input_newick_newick_sort_table" file="sort_table.xlsx" ftype="xlsx" compare="sim_size"/>
-            </output_collection>
-        </test>
-        <test>
-            <param name="input_snps_json" value="Mbovis-01_snps.json" ftype="json" dbkey="89"/>
-            <param name="input_newick" value="Mbovis-01_snps.newick" ftype="newick" dbkey="89"/>
-            <param name="input_avg_mq_json" value="Mbovis-01_avg_mq.json" ftype="json" dbkey="89"/>
-            <param name="gbk_param" value="no"/>
-            <output_collection name="excel" type="list" count="2">
-                <element name="Mbovis-01_snps_newick_cascade_table" file="Mbovis-01_cascade_table.xlsx" ftype="xlsx" compare="sim_size"/>
-                <element name="Mbovis-01_snps_newick_sort_table" file="Mbovis-01_sort_table.xlsx" ftype="xlsx" compare="sim_size"/>
+                <element name="cascade_table" file="cascade_table.xlsx" ftype="xlsx" compare="sim_size"/>
+                <element name="sort_table" file="sort_table.xlsx" ftype="xlsx" compare="sim_size"/>
             </output_collection>
         </test>
         <test>
-            <param name="input_snps_json" value="Mbovis-01D_snps.json" ftype="json" dbkey="89"/>
-            <param name="input_newick" value="Mbovis-01D_snps.newick" ftype="newick" dbkey="89"/>
-            <param name="input_avg_mq_json" value="Mbovis-01D_avg_mq.json" ftype="json" dbkey="89"/>
+            <param name="input_type" value="collection"/>
+            <param name="input_snps_json_collection">
+                <collection type="list">
+                    <element name="Mbovis-01_snps.json" value="Mbovis-01_snps.json" dbkey="89"/>
+                    <element name="Mbovis-01D_snps.json" value="Mbovis-01D_snps.json" dbkey="89"/>
+                    <element name="Mbovis-01D6_snps.json" value="Mbovis-01D6_snps.json" dbkey="89"/>
+                </collection>
+            </param>
+            <param name="input_newick_collection">
+                <collection type="list">
+                    <element name="Mbovis-01_snps.newick" value="Mbovis-01_snps.newick" dbkey="89"/>
+                    <element name="Mbovis-01D_snps.newick" value="Mbovis-01D_snps.newick" dbkey="89"/>
+                    <element name="Mbovis-01D6_snps.newick" value="Mbovis-01D6_snps.newick" dbkey="89"/>
+                </collection>
+            </param>
+            <param name="input_avg_mq_json_collection">
+                <collection type="list">
+                    <element name="Mbovis-01_snps.json" value="Mbovis-01_avg_mq.json" dbkey="89"/>
+                    <element name="Mbovis-01D_snps.json" value="Mbovis-01D_avg_mq.json" dbkey="89"/>
+                    <element name="Mbovis-01D6_snps.json" value="Mbovis-01D6_avg_mq.json" dbkey="89"/>
+                </collection>
+            </param>
             <param name="gbk_param" value="no"/>
-            <output_collection name="excel" type="list" count="2">
+            <output_collection name="excel" type="list" count="6">
+                <element name="Mbovis-01D6_snps_newick_cascade_table" file="Mbovis-01D6_cascade_table.xlsx" ftype="xlsx" compare="sim_size"/>
+                <element name="Mbovis-01D6_snps_newick_sort_table" file="Mbovis-01D6_sort_table.xlsx" ftype="xlsx" compare="sim_size"/>
                 <element name="Mbovis-01D_snps_newick_cascade_table" file="Mbovis-01D_cascade_table.xlsx" ftype="xlsx" compare="sim_size"/>
                 <element name="Mbovis-01D_snps_newick_sort_table" file="Mbovis-01D_sort_table.xlsx" ftype="xlsx" compare="sim_size"/>
-            </output_collection>
-        </test>
-        <test>
-            <param name="input_snps_json" value="Mbovis-01D6_snps.json" ftype="json" dbkey="89"/>
-            <param name="input_newick" value="Mbovis-01D6_snps.newick" ftype="newick" dbkey="89"/>
-            <param name="input_avg_mq_json" value="Mbovis-01D6_avg_mq.json" ftype="json" dbkey="89"/>
-            <param name="gbk_param" value="no"/>
-            <output_collection name="excel" type="list" count="2">
-                <element name="Mbovis-01D6_snps_newick_cascade_table" file="Mbovis-01D6_cascade_table.xlsx" ftype="xlsx" compare="sim_size"/>
-                <element name="Mbovis-01D6_snps_newick_sort_table" file="Mbovis-01D6_sort_table.xlsx" ftype="xlsx" compare="sim_size"/>
+                <element name="Mbovis-01_snps_newick_cascade_table" file="Mbovis-01_cascade_table.xlsx" ftype="xlsx" compare="sim_size"/>
+                <element name="Mbovis-01_snps_newick_sort_table" file="Mbovis-01_sort_table.xlsx" ftype="xlsx" compare="sim_size"/>
             </output_collection>
         </test>
     </tests>
@@ -146,6 +179,7 @@
 
 **Required Options**
 
+ * **Choose the category for the files to be analyzed** -  select "Single files" or "Collections of files", then select the appropriate history items (single SNPs json, average MQ json and newick files, or collections of each) based on the selected option.
  * **Use Genbank file** - Select "yes" to annotate the tables using the information in the Genbank file.  Locally cached files, if available, provide the most widely used annotations, but more custom Genbank files can be chosen from the current history.
     </help>
     <expand macro="citations"/>