diff env/lib/python3.7/site-packages/galaxy/util/rules_dsl.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/galaxy/util/rules_dsl.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,588 +0,0 @@
-import abc
-import itertools
-import re
-
-import six
-from six.moves import map
-
-from galaxy.util import strip_control_characters_nested
-
-
-def _ensure_rule_contains_keys(rule, keys):
-    for key, instance_class in keys.items():
-        if key not in rule:
-            raise ValueError("Rule of type [%s] does not contain key [%s]." % (rule["type"], key))
-        value = rule[key]
-        if not isinstance(value, instance_class):
-            raise ValueError("Rule of type [%s] does not contain correct value type for key [%s]." % (rule["type"], key))
-
-
-def _ensure_key_value_in(rule, key, values):
-    value = rule[key]
-    if value not in values:
-        raise ValueError("Invalid value [%s] for [%s] encountered." % (value, key))
-
-
-def _ensure_valid_pattern(expression):
-    re.compile(expression)
-
-
-def apply_regex(regex, target, data, replacement=None, group_count=None):
-    pattern = re.compile(regex)
-
-    def new_row(row):
-        source = row[target]
-        if replacement is None:
-            match = pattern.search(source)
-            if not match:
-                raise Exception("Problem applying regular expression [%s] to [%s]." % (regex, source))
-
-            if group_count:
-                if len(match.groups()) != group_count:
-                    raise Exception("Problem applying regular expression, wrong number of groups found.")
-
-                result = row + list(match.groups())
-            else:
-                result = row + [match.group(0)]
-        else:
-            result = row + [pattern.search(source).expand(replacement)]
-
-        return result
-
-    new_data = list(map(new_row, data))
-    return new_data
-
-
-@six.add_metaclass(abc.ABCMeta)
-class BaseRuleDefinition(object):
-
-    @abc.abstractproperty
-    def rule_type(self):
-        """Short string describing type of rule (plugin class) to use."""
-
-    @abc.abstractmethod
-    def validate_rule(self, rule):
-        """Validate dictified rule definition of this type."""
-
-    @abc.abstractmethod
-    def apply(self, rule, data, sources):
-        """Apply validated, dictified rule definition to supplied data."""
-
-
-class AddColumnMetadataRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_metadata"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {"value": six.string_types})
-
-    def apply(self, rule, data, sources):
-        rule_value = rule["value"]
-        if rule_value.startswith("identifier"):
-            identifier_index = int(rule_value[len("identifier"):])
-
-            new_rows = []
-            for index, row in enumerate(data):
-                new_rows.append(row + [sources[index]["identifiers"][identifier_index]])
-
-        elif rule_value == "tags":
-
-            def sorted_tags(index):
-                tags = sorted(sources[index]["tags"])
-                return [",".join(tags)]
-
-            new_rows = []
-            for index, row in enumerate(data):
-                new_rows.append(row + sorted_tags(index))
-
-        return new_rows, sources
-
-
-class AddColumnGroupTagValueRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_group_tag_value"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {"value": six.string_types})
-
-    def apply(self, rule, data, sources):
-        rule_value = rule["value"]
-        tag_prefix = "group:%s:" % rule_value
-
-        new_rows = []
-        for index, row in enumerate(data):
-            group_tag_value = None
-            source = sources[index]
-            tags = source["tags"]
-            for tag in sorted(tags):
-                if tag.startswith(tag_prefix):
-                    group_tag_value = tag[len(tag_prefix):]
-                    break
-
-            if group_tag_value is None:
-                group_tag_value = rule.get("default_value", "")
-
-            new_rows.append(row + [group_tag_value])
-
-        return new_rows, sources
-
-
-class AddColumnConcatenateRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_concatenate"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {"target_column_0": int, "target_column_1": int})
-
-    def apply(self, rule, data, sources):
-        column_0 = rule["target_column_0"]
-        column_1 = rule["target_column_1"]
-
-        new_rows = []
-        for index, row in enumerate(data):
-            new_rows.append(row + [row[column_0] + row[column_1]])
-
-        return new_rows, sources
-
-
-class AddColumnBasenameRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_basename"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {"target_column": int})
-
-    def apply(self, rule, data, sources):
-        column = rule["target_column"]
-        re = r"[^/]*$"
-        return apply_regex(re, column, data), sources
-
-
-class AddColumnRegexRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_regex"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {"target_column": int, "expression": six.string_types})
-        _ensure_valid_pattern(rule["expression"])
-
-    def apply(self, rule, data, sources):
-        target = rule["target_column"]
-        expression = rule["expression"]
-        replacement = rule.get("replacement")
-        group_count = rule.get("group_count")
-
-        return apply_regex(expression, target, data, replacement, group_count), sources
-
-
-class AddColumnRownumRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_rownum"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {"start": int})
-
-    def apply(self, rule, data, sources):
-        start = rule["start"]
-
-        new_rows = []
-        for index, row in enumerate(data):
-            new_rows.append(row + ["%d" % (index + start)])
-
-        return new_rows, sources
-
-
-class AddColumnValueRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_value"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {"value": six.string_types})
-
-    def apply(self, rule, data, sources):
-        value = rule["value"]
-
-        new_rows = []
-        for index, row in enumerate(data):
-            new_rows.append(row + [str(value)])
-
-        return new_rows, sources
-
-
-class AddColumnSubstrRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_column_substr"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_column": int,
-            "length": int,
-            "substr_type": six.string_types,
-        })
-        _ensure_key_value_in(rule, "substr_type", ["keep_prefix", "drop_prefix", "keep_suffix", "drop_suffix"])
-
-    def apply(self, rule, data, sources):
-        target = rule["target_column"]
-        length = rule["length"]
-        substr_type = rule["substr_type"]
-
-        def new_row(row):
-            original_value = row[target]
-            start = 0
-            end = len(original_value)
-
-            if substr_type == "keep_prefix":
-                end = length
-            elif substr_type == "drop_prefix":
-                start = length
-            elif substr_type == "keep_suffix":
-                start = end - length
-                if start < 0:
-                    start = 0
-            else:
-                end = end - length
-                if end < 0:
-                    end = 0
-
-            return row + [original_value[start:end]]
-
-        return list(map(new_row, data)), sources
-
-
-class RemoveColumnsRuleDefinition(BaseRuleDefinition):
-    rule_type = "remove_columns"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_columns": list,
-        })
-
-    def apply(self, rule, data, sources):
-        target_columns = rule["target_columns"]
-
-        def new_row(row):
-            new = []
-            for index, val in enumerate(row):
-                if index not in target_columns:
-                    new.append(val)
-            return new
-
-        return list(map(new_row, data)), sources
-
-
-def _filter_index(func, iterable):
-    result = []
-    for index, x in enumerate(iterable):
-        if func(index):
-            result.append(x)
-
-    return result
-
-
-class AddFilterRegexRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_filter_regex"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_column": int,
-            "invert": bool,
-            "expression": six.string_types,
-        })
-        _ensure_valid_pattern(rule["expression"])
-
-    def apply(self, rule, data, sources):
-        target_column = rule["target_column"]
-        invert = rule["invert"]
-        regex = rule["expression"]
-
-        def _filter(index):
-            row = data[index]
-            val = row[target_column]
-            pattern = re.compile(regex)
-            return not invert if pattern.search(val) else invert
-
-        return _filter_index(_filter, data), _filter_index(_filter, sources)
-
-
-class AddFilterCountRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_filter_count"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "count": int,
-            "invert": bool,
-            "which": six.string_types,
-        })
-        _ensure_key_value_in(rule, "which", ["first", "last"])
-
-    def apply(self, rule, data, sources):
-        num_rows = len(data)
-        invert = rule["invert"]
-        n = rule["count"]
-        which = rule["which"]
-
-        def _filter(index):
-            if which == "first":
-                matches = index >= n
-            else:
-                matches = index < (num_rows - n)
-            return not invert if matches else invert
-
-        return _filter_index(_filter, data), _filter_index(_filter, sources)
-
-
-class AddFilterEmptyRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_filter_empty"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_column": int,
-            "invert": bool
-        })
-
-    def apply(self, rule, data, sources):
-        invert = rule["invert"]
-        target_column = rule["target_column"]
-
-        def _filter(index):
-            non_empty = len(data[index][target_column]) != 0
-            return not invert if non_empty else invert
-
-        return _filter_index(_filter, data), _filter_index(_filter, sources)
-
-
-class AddFilterMatchesRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_filter_matches"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_column": int,
-            "invert": bool,
-            "value": six.string_types,
-        })
-
-    def apply(self, rule, data, sources):
-        invert = rule["invert"]
-        target_column = rule["target_column"]
-        value = rule["value"]
-
-        def _filter(index):
-            row = data[index]
-            val = row[target_column]
-            return not invert if val == value else invert
-
-        return _filter_index(_filter, data), _filter_index(_filter, sources)
-
-
-class AddFilterCompareRuleDefinition(BaseRuleDefinition):
-    rule_type = "add_filter_compare"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_column": int,
-            "value": int,
-            "compare_type": six.string_types,
-        })
-        _ensure_key_value_in(rule, "compare_type", ["less_than", "less_than_equal", "greater_than", "greater_than_equal"])
-
-    def apply(self, rule, data, sources):
-        target_column = rule["target_column"]
-        value = rule["value"]
-        compare_type = rule["compare_type"]
-
-        def _filter(index):
-            row = data[index]
-            target_value = float(row[target_column])
-            if compare_type == "less_than":
-                matches = target_value < value
-            elif compare_type == "less_than_equal":
-                matches = target_value <= value
-            elif compare_type == "greater_than":
-                matches = target_value > value
-            elif compare_type == "greater_than_equal":
-                matches = target_value >= value
-
-            return matches
-
-        return _filter_index(_filter, data), _filter_index(_filter, sources)
-
-
-class SortRuleDefinition(BaseRuleDefinition):
-    rule_type = "sort"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_column": int,
-            "numeric": bool,
-        })
-
-    def apply(self, rule, data, sources):
-        target = rule["target_column"]
-        numeric = rule["numeric"]
-
-        sortable = zip(data, sources)
-
-        def sort_func(item):
-            a_val = item[0][target]
-            if numeric:
-                a_val = float(a_val)
-            return a_val
-
-        sorted_data = sorted(sortable, key=sort_func)
-
-        new_data = []
-        new_sources = []
-
-        for (row, source) in sorted_data:
-            new_data.append(row)
-            new_sources.append(source)
-
-        return new_data, new_sources
-
-
-class SwapColumnsRuleDefinition(BaseRuleDefinition):
-    rule_type = "swap_columns"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_column_0": int,
-            "target_column_1": int,
-        })
-
-    def apply(self, rule, data, sources):
-        target_column_0 = rule["target_column_0"]
-        target_column_1 = rule["target_column_1"]
-
-        def new_row(row):
-            row_copy = row[:]
-            row_copy[target_column_0] = row[target_column_1]
-            row_copy[target_column_1] = row[target_column_0]
-            return row_copy
-
-        return list(map(new_row, data)), sources
-
-
-class SplitColumnsRuleDefinition(BaseRuleDefinition):
-    rule_type = "split_columns"
-
-    def validate_rule(self, rule):
-        _ensure_rule_contains_keys(rule, {
-            "target_columns_0": list,
-            "target_columns_1": list,
-        })
-
-    def apply(self, rule, data, sources):
-        target_columns_0 = rule["target_columns_0"]
-        target_columns_1 = rule["target_columns_1"]
-
-        def split_row(row):
-            new_row_0 = []
-            new_row_1 = []
-            for index, el in enumerate(row):
-                if index in target_columns_0:
-                    new_row_0.append(el)
-                elif index in target_columns_1:
-                    new_row_1.append(el)
-                else:
-                    new_row_0.append(el)
-                    new_row_1.append(el)
-
-            return [new_row_0, new_row_1]
-
-        data = flat_map(split_row, data)
-        sources = flat_map(lambda x: [x, x], sources)
-
-        return data, sources
-
-
-def flat_map(f, items):
-    return list(itertools.chain.from_iterable(map(f, items)))
-
-
-class RuleSet(object):
-
-    def __init__(self, rule_set_as_dict):
-        self.raw_rules = strip_control_characters_nested(rule_set_as_dict["rules"])
-        self.raw_mapping = rule_set_as_dict.get("mapping", [])
-
-    @property
-    def rules(self):
-        return self.raw_rules
-
-    def _rules_with_definitions(self):
-        for rule in self.raw_rules:
-            yield (rule, RULES_DEFINITIONS[rule["type"]])
-
-    def apply(self, data, sources):
-        for rule, rule_definition in self._rules_with_definitions():
-            rule_definition.validate_rule(rule)
-            data, sources = rule_definition.apply(rule, data, sources)
-
-        return data, sources
-
-    @property
-    def has_errors(self):
-        errored = False
-        try:
-            for rule, rule_definition in self._rules_with_definitions():
-                rule_definition.validate_rule(rule)
-        except Exception:
-            errored = True
-        return errored
-
-    @property
-    def mapping_as_dict(self):
-        as_dict = {}
-        for mapping in self.raw_mapping:
-            as_dict[mapping["type"]] = mapping
-
-        return as_dict
-
-    # Rest of this is generic, things here are Galaxy collection specific, think about about
-    # subclass of RuleSet for collection creation.
-    @property
-    def identifier_columns(self):
-        mapping_as_dict = self.mapping_as_dict
-        identifier_columns = []
-        if "list_identifiers" in mapping_as_dict:
-            identifier_columns.extend(mapping_as_dict["list_identifiers"]["columns"])
-        if "paired_identifier" in mapping_as_dict:
-            identifier_columns.append(mapping_as_dict["paired_identifier"]["columns"][0])
-
-        return identifier_columns
-
-    @property
-    def collection_type(self):
-        mapping_as_dict = self.mapping_as_dict
-        list_columns = mapping_as_dict.get("list_identifiers", {"columns": []})["columns"]
-        collection_type = ":".join(map(lambda c: "list", list_columns))
-        if "paired_identifier" in mapping_as_dict:
-            if collection_type:
-                collection_type += ":paired"
-            else:
-                collection_type = "paired"
-        return collection_type
-
-    @property
-    def display(self):
-        message = "Rules:\n"
-        message += "".join(["- %s\n" % r for r in self.raw_rules])
-        message += "Column Definitions:\n"
-        message += "".join(["- %s\n" % m for m in self.raw_mapping])
-        return message
-
-
-RULES_DEFINITION_CLASSES = [
-    AddColumnMetadataRuleDefinition,
-    AddColumnGroupTagValueRuleDefinition,
-    AddColumnConcatenateRuleDefinition,
-    AddColumnBasenameRuleDefinition,
-    AddColumnRegexRuleDefinition,
-    AddColumnRownumRuleDefinition,
-    AddColumnValueRuleDefinition,
-    AddColumnSubstrRuleDefinition,
-    RemoveColumnsRuleDefinition,
-    AddFilterRegexRuleDefinition,
-    AddFilterCountRuleDefinition,
-    AddFilterEmptyRuleDefinition,
-    AddFilterMatchesRuleDefinition,
-    AddFilterCompareRuleDefinition,
-    SortRuleDefinition,
-    SwapColumnsRuleDefinition,
-    SplitColumnsRuleDefinition,
-]
-RULES_DEFINITIONS = {}
-for rule_class in RULES_DEFINITION_CLASSES:
-    RULES_DEFINITIONS[rule_class.rule_type] = rule_class()