view @ 3:5b3c08710e47 draft

"planemo upload for repository commit c635df659fe1835679438589ded43136b0e515c6"
author bgruening
date Sat, 09 May 2020 05:38:23 -0400
parents 9bf25dbe00ad
children afec8c595124
line wrap: on
line source

Prepare the workflow paths to be used by downstream
machine learning algorithm. The paths are divided
into the test and training sets

import os
import collections
import numpy as np
import random

import predict_tool_usage
import utils

main_path = os.getcwd()

class PrepareData:

    def __init__(self, max_seq_length, test_data_share):
        """ Init method. """
        self.max_tool_sequence_len = max_seq_length
        self.test_share = test_data_share

    def process_workflow_paths(self, workflow_paths):
        Get all the tools and complete set of individual paths for each workflow
        tokens = list()
        raw_paths = workflow_paths
        raw_paths = [x.replace("\n", '') for x in raw_paths]
        for item in raw_paths:
            split_items = item.split(",")
            for token in split_items:
                if token is not "":
        tokens = list(set(tokens))
        tokens = np.array(tokens)
        tokens = np.reshape(tokens, [-1, ])
        return tokens, raw_paths

    def create_new_dict(self, new_data_dict):
        Create new data dictionary
        reverse_dict = dict((v, k) for k, v in new_data_dict.items())
        return new_data_dict, reverse_dict

    def assemble_dictionary(self, new_data_dict, old_data_dictionary={}):
        Create/update tools indices in the forward and backward dictionary
        new_data_dict, reverse_dict = self.create_new_dict(new_data_dict)
        return new_data_dict, reverse_dict

    def create_data_dictionary(self, words, old_data_dictionary={}):
        Create two dictionaries having tools names and their indexes
        count = collections.Counter(words).most_common()
        dictionary = dict()
        for word, _ in count:
            dictionary[word] = len(dictionary) + 1
        dictionary, reverse_dictionary = self.assemble_dictionary(dictionary, old_data_dictionary)
        return dictionary, reverse_dictionary

    def decompose_paths(self, paths, dictionary):
        Decompose the paths to variable length sub-paths keeping the first tool fixed
        sub_paths_pos = list()
        for index, item in enumerate(paths):
            tools = item.split(",")
            len_tools = len(tools)
            if len_tools <= self.max_tool_sequence_len:
                for window in range(1, len_tools):
                    sequence = tools[0: window + 1]
                    tools_pos = [str(dictionary[str(tool_item)]) for tool_item in sequence]
                    if len(tools_pos) > 1:
        sub_paths_pos = list(set(sub_paths_pos))
        return sub_paths_pos

    def prepare_paths_labels_dictionary(self, dictionary, reverse_dictionary, paths, compatible_next_tools):
        Create a dictionary of sequences with their labels for training and test paths
        paths_labels = dict()
        for item in paths:
            if item and item not in "":
                tools = item.split(",")
                label = tools[-1]
                train_tools = tools[:len(tools) - 1]
                last_but_one_name = reverse_dictionary[int(train_tools[-1])]
                    compatible_tools = compatible_next_tools[last_but_one_name].split(",")
                except Exception:
                if len(compatible_tools) > 0:
                    compatible_tools_ids = [str(dictionary[x]) for x in compatible_tools]
                    composite_labels = ",".join(compatible_tools_ids)
                train_tools = ",".join(train_tools)
                if train_tools in paths_labels:
                    paths_labels[train_tools] += "," + composite_labels
                    paths_labels[train_tools] = composite_labels
        for item in paths_labels:
            paths_labels[item] = ",".join(list(set(paths_labels[item].split(","))))
        return paths_labels

    def pad_test_paths(self, paths_dictionary, num_classes):
        Add padding to the tools sequences and create multi-hot encoded labels
        size_data = len(paths_dictionary)
        data_mat = np.zeros([size_data, self.max_tool_sequence_len])
        label_mat = np.zeros([size_data, num_classes + 1])
        train_counter = 0
        for train_seq, train_label in list(paths_dictionary.items()):
            positions = train_seq.split(",")
            start_pos = self.max_tool_sequence_len - len(positions)
            for id_pos, pos in enumerate(positions):
                data_mat[train_counter][start_pos + id_pos] = int(pos)
            for label_item in train_label.split(","):
                label_mat[train_counter][int(label_item)] = 1.0
            train_counter += 1
        return data_mat, label_mat

    def pad_paths(self, paths_dictionary, num_classes, standard_connections, reverse_dictionary):
        Add padding to the tools sequences and create multi-hot encoded labels
        size_data = len(paths_dictionary)
        data_mat = np.zeros([size_data, self.max_tool_sequence_len])
        label_mat = np.zeros([size_data, 2 * (num_classes + 1)])
        pos_flag = 1.0
        train_counter = 0
        for train_seq, train_label in list(paths_dictionary.items()):
            pub_connections = list()
            positions = train_seq.split(",")
            last_tool_id = positions[-1]
            last_tool_name = reverse_dictionary[int(last_tool_id)]
            start_pos = self.max_tool_sequence_len - len(positions)
            for id_pos, pos in enumerate(positions):
                data_mat[train_counter][start_pos + id_pos] = int(pos)
            if last_tool_name in standard_connections:
                pub_connections = standard_connections[last_tool_name]
            for label_item in train_label.split(","):
                label_pos = int(label_item)
                label_row = label_mat[train_counter]
                if reverse_dictionary[label_pos] in pub_connections:
                    label_row[label_pos] = pos_flag
                    label_row[label_pos + num_classes + 1] = pos_flag
            train_counter += 1
        return data_mat, label_mat

    def split_test_train_data(self, multilabels_paths):
        Split into test and train data randomly for each run
        train_dict = dict()
        test_dict = dict()
        all_paths = multilabels_paths.keys()
        split_number = int(self.test_share * len(all_paths))
        for index, path in enumerate(list(all_paths)):
            if index < split_number:
                test_dict[path] = multilabels_paths[path]
                train_dict[path] = multilabels_paths[path]
        return train_dict, test_dict

    def get_predicted_usage(self, data_dictionary, predicted_usage):
        Get predicted usage for tools
        usage = dict()
        epsilon = 0.0
        # index 0 does not belong to any tool
        usage[0] = epsilon
        for k, v in data_dictionary.items():
                usg = predicted_usage[k]
                if usg < epsilon:
                    usg = epsilon
                usage[v] = usg
            except Exception:
                usage[v] = epsilon
        return usage

    def assign_class_weights(self, n_classes, predicted_usage):
        Compute class weights using usage
        class_weights = dict()
        class_weights[str(0)] = 0.0
        for key in range(1, n_classes + 1):
            u_score = predicted_usage[key]
            if u_score < 1.0:
                u_score += 1.0
            class_weights[key] = np.round(np.log(u_score), 6)
        return class_weights

    def get_train_last_tool_freq(self, train_paths, reverse_dictionary):
        Get the frequency of last tool of each tool sequence
        to estimate the frequency of tool sequences
        last_tool_freq = dict()
        inv_freq = dict()
        for path in train_paths:
            last_tool = path.split(",")[-1]
            if last_tool not in last_tool_freq:
                last_tool_freq[last_tool] = 0
            last_tool_freq[last_tool] += 1
        max_freq = max(last_tool_freq.values())
        for t in last_tool_freq:
            inv_freq[t] = int(np.round(max_freq / float(last_tool_freq[t]), 0))
        return last_tool_freq, inv_freq

    def get_toolid_samples(self, train_data, l_tool_freq):
        l_tool_tr_samples = dict()
        for tool_id in l_tool_freq:
            for index, tr_sample in enumerate(train_data):
                last_tool_id = str(int(tr_sample[-1]))
                if last_tool_id == tool_id:
                    if last_tool_id not in l_tool_tr_samples:
                        l_tool_tr_samples[last_tool_id] = list()
        return l_tool_tr_samples

    def get_data_labels_matrices(self, workflow_paths, tool_usage_path, cutoff_date, compatible_next_tools, standard_connections, old_data_dictionary={}):
        Convert the training and test paths into corresponding numpy matrices
        processed_data, raw_paths = self.process_workflow_paths(workflow_paths)
        dictionary, rev_dict = self.create_data_dictionary(processed_data, old_data_dictionary)
        num_classes = len(dictionary)

        print("Raw paths: %d" % len(raw_paths))

        print("Decomposing paths...")
        all_unique_paths = self.decompose_paths(raw_paths, dictionary)

        print("Creating dictionaries...")
        multilabels_paths = self.prepare_paths_labels_dictionary(dictionary, rev_dict, all_unique_paths, compatible_next_tools)

        print("Complete data: %d" % len(multilabels_paths))
        train_paths_dict, test_paths_dict = self.split_test_train_data(multilabels_paths)

        # get sample frequency
        l_tool_freq, inv_last_tool_freq = self.get_train_last_tool_freq(train_paths_dict, rev_dict)

        print("Train data: %d" % len(train_paths_dict))
        print("Test data: %d" % len(test_paths_dict))

        print("Padding train and test data...")
        # pad training and test data with leading zeros
        test_data, test_labels = self.pad_paths(test_paths_dict, num_classes, standard_connections, rev_dict)
        train_data, train_labels = self.pad_paths(train_paths_dict, num_classes, standard_connections, rev_dict)

        l_tool_tr_samples = self.get_toolid_samples(train_data, l_tool_freq)

        # Predict tools usage
        print("Predicting tools' usage...")
        usage_pred = predict_tool_usage.ToolPopularity()
        usage = usage_pred.extract_tool_usage(tool_usage_path, cutoff_date, dictionary)
        tool_usage_prediction = usage_pred.get_pupularity_prediction(usage)
        t_pred_usage = self.get_predicted_usage(dictionary, tool_usage_prediction)

        # get class weights using the predicted usage for each tool
        class_weights = self.assign_class_weights(num_classes, t_pred_usage)

        return train_data, train_labels, test_data, test_labels, dictionary, rev_dict, class_weights, t_pred_usage, l_tool_freq, l_tool_tr_samples