| 
0
 | 
     1 import copy
 | 
| 
 | 
     2 import logging
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 log = logging.getLogger()
 | 
| 
 | 
     5 log.setLevel(logging.WARN)
 | 
| 
 | 
     6 
 | 
| 
 | 
     7 
 | 
| 
 | 
     8 def feature_lambda(
 | 
| 
 | 
     9     feature_list,
 | 
| 
 | 
    10     test,
 | 
| 
 | 
    11     test_kwargs,
 | 
| 
 | 
    12     subfeatures=True,
 | 
| 
 | 
    13     parent=None,
 | 
| 
 | 
    14     invert=False,
 | 
| 
 | 
    15     recurse=True,
 | 
| 
 | 
    16 ):
 | 
| 
 | 
    17     """Recursively search through features, testing each with a test function, yielding matches.
 | 
| 
 | 
    18 
 | 
| 
 | 
    19     GFF3 is a hierachical data structure, so we need to be able to recursively
 | 
| 
 | 
    20     search through features. E.g. if you're looking for a feature with
 | 
| 
 | 
    21     ID='bob.42', you can't just do a simple list comprehension with a test
 | 
| 
 | 
    22     case. You don't know how deeply burried bob.42 will be in the feature tree. This is where feature_lambda steps in.
 | 
| 
 | 
    23 
 | 
| 
 | 
    24     :type feature_list: list
 | 
| 
 | 
    25     :param feature_list: an iterable of features
 | 
| 
 | 
    26 
 | 
| 
 | 
    27     :type test: function reference
 | 
| 
 | 
    28     :param test: a closure with the method signature (feature, **kwargs) where
 | 
| 
 | 
    29                  the kwargs are those passed in the next argument. This
 | 
| 
 | 
    30                  function should return True or False, True if the feature is
 | 
| 
 | 
    31                  to be yielded as part of the main feature_lambda function, or
 | 
| 
 | 
    32                  False if it is to be ignored. This function CAN mutate the
 | 
| 
 | 
    33                  features passed to it (think "apply").
 | 
| 
 | 
    34 
 | 
| 
 | 
    35     :type test_kwargs: dictionary
 | 
| 
 | 
    36     :param test_kwargs: kwargs to pass to your closure when it is called.
 | 
| 
 | 
    37 
 | 
| 
 | 
    38     :type subfeatures: boolean
 | 
| 
 | 
    39     :param subfeatures: when a feature is matched, should just that feature be
 | 
| 
 | 
    40                         yielded to the caller, or should the entire sub_feature
 | 
| 
 | 
    41                         tree for that feature be included? subfeatures=True is
 | 
| 
 | 
    42                         useful in cases such as searching for a gene feature,
 | 
| 
 | 
    43                         and wanting to know what RBS/Shine_Dalgarno_sequences
 | 
| 
 | 
    44                         are in the sub_feature tree (which can be accomplished
 | 
| 
 | 
    45                         with two feature_lambda calls). subfeatures=False is
 | 
| 
 | 
    46                         useful in cases when you want to process (and possibly
 | 
| 
 | 
    47                         return) the entire feature tree, such as applying a
 | 
| 
 | 
    48                         qualifier to every single feature.
 | 
| 
 | 
    49 
 | 
| 
 | 
    50     :type invert: boolean
 | 
| 
 | 
    51     :param invert: Negate/invert the result of the filter.
 | 
| 
 | 
    52 
 | 
| 
 | 
    53     :rtype: yielded list
 | 
| 
 | 
    54     :return: Yields a list of matching features.
 | 
| 
 | 
    55     """
 | 
| 
 | 
    56     # Either the top level set of [features] or the subfeature attribute
 | 
| 
 | 
    57     for feature in feature_list:
 | 
| 
 | 
    58         feature._parent = parent
 | 
| 
 | 
    59         if not parent:
 | 
| 
 | 
    60             # Set to self so we cannot go above root.
 | 
| 
 | 
    61             feature._parent = feature
 | 
| 
 | 
    62         test_result = test(feature, **test_kwargs)
 | 
| 
 | 
    63         # if (not invert and test_result) or (invert and not test_result):
 | 
| 
 | 
    64         if invert ^ test_result:
 | 
| 
 | 
    65             if not subfeatures:
 | 
| 
 | 
    66                 feature_copy = copy.deepcopy(feature)
 | 
| 
 | 
    67                 feature_copy.sub_features = list()
 | 
| 
 | 
    68                 yield feature_copy
 | 
| 
 | 
    69             else:
 | 
| 
 | 
    70                 yield feature
 | 
| 
 | 
    71 
 | 
| 
 | 
    72         if recurse and hasattr(feature, "sub_features"):
 | 
| 
 | 
    73             for x in feature_lambda(
 | 
| 
 | 
    74                 feature.sub_features,
 | 
| 
 | 
    75                 test,
 | 
| 
 | 
    76                 test_kwargs,
 | 
| 
 | 
    77                 subfeatures=subfeatures,
 | 
| 
 | 
    78                 parent=feature,
 | 
| 
 | 
    79                 invert=invert,
 | 
| 
 | 
    80                 recurse=recurse,
 | 
| 
 | 
    81             ):
 | 
| 
 | 
    82                 yield x
 | 
| 
 | 
    83 
 | 
| 
 | 
    84 
 | 
| 
 | 
    85 def fetchParent(feature):
 | 
| 
 | 
    86     if not hasattr(feature, "_parent") or feature._parent is None:
 | 
| 
 | 
    87         return feature
 | 
| 
 | 
    88     else:
 | 
| 
 | 
    89         return fetchParent(feature._parent)
 | 
| 
 | 
    90 
 | 
| 
 | 
    91 
 | 
| 
 | 
    92 def feature_test_true(feature, **kwargs):
 | 
| 
 | 
    93     return True
 | 
| 
 | 
    94 
 | 
| 
 | 
    95 
 | 
| 
 | 
    96 def feature_test_type(feature, **kwargs):
 | 
| 
 | 
    97     if "type" in kwargs:
 | 
| 
 | 
    98         return str(feature.type).upper() == str(kwargs["type"]).upper()
 | 
| 
 | 
    99     elif "types" in kwargs:
 | 
| 
 | 
   100       for x in kwargs["types"]:
 | 
| 
 | 
   101         if str(feature.type).upper() == str(x).upper():
 | 
| 
 | 
   102           return True
 | 
| 
 | 
   103       return False
 | 
| 
 | 
   104     raise Exception("Incorrect feature_test_type call, need type or types")
 | 
| 
 | 
   105 
 | 
| 
 | 
   106 
 | 
| 
 | 
   107 def feature_test_qual_value(feature, **kwargs):
 | 
| 
 | 
   108     """Test qualifier values.
 | 
| 
 | 
   109 
 | 
| 
 | 
   110     For every feature, check that at least one value in
 | 
| 
 | 
   111     feature.quailfiers(kwargs['qualifier']) is in kwargs['attribute_list']
 | 
| 
 | 
   112     """
 | 
| 
 | 
   113     if isinstance(kwargs["qualifier"], list):
 | 
| 
 | 
   114         for qualifier in kwargs["qualifier"]:
 | 
| 
 | 
   115             for attribute_value in feature.qualifiers.get(qualifier, []):
 | 
| 
 | 
   116                 if attribute_value in kwargs["attribute_list"]:
 | 
| 
 | 
   117                     return True
 | 
| 
 | 
   118     else:
 | 
| 
 | 
   119         for attribute_value in feature.qualifiers.get(kwargs["qualifier"], []):
 | 
| 
 | 
   120             if attribute_value in kwargs["attribute_list"]:
 | 
| 
 | 
   121                 return True
 | 
| 
 | 
   122     return False
 | 
| 
 | 
   123 
 | 
| 
 | 
   124 
 | 
| 
 | 
   125 def feature_test_location(feature, **kwargs):
 | 
| 
 | 
   126     if "strand" in kwargs:
 | 
| 
 | 
   127         if feature.location.strand != kwargs["strand"]:
 | 
| 
 | 
   128             return False
 | 
| 
 | 
   129 
 | 
| 
 | 
   130     return feature.location.start <= kwargs["loc"] <= feature.location.end
 | 
| 
 | 
   131 
 | 
| 
 | 
   132 
 | 
| 
 | 
   133 def feature_test_quals(feature, **kwargs):
 | 
| 
 | 
   134     """
 | 
| 
 | 
   135     Example::
 | 
| 
 | 
   136 
 | 
| 
 | 
   137         a = Feature(qualifiers={'Note': ['Some notes', 'Aasdf']})
 | 
| 
 | 
   138 
 | 
| 
 | 
   139         # Check if a contains a Note
 | 
| 
 | 
   140         feature_test_quals(a, {'Note': None})  # Returns True
 | 
| 
 | 
   141         feature_test_quals(a, {'Product': None})  # Returns False
 | 
| 
 | 
   142 
 | 
| 
 | 
   143         # Check if a contains a note with specific value
 | 
| 
 | 
   144         feature_test_quals(a, {'Note': ['ome']})  # Returns True
 | 
| 
 | 
   145 
 | 
| 
 | 
   146         # Check if a contains a note with specific value
 | 
| 
 | 
   147         feature_test_quals(a, {'Note': ['other']})  # Returns False
 | 
| 
 | 
   148     """
 | 
| 
 | 
   149     for key in kwargs:
 | 
| 
 | 
   150         if key not in feature.qualifiers:
 | 
| 
 | 
   151             return False
 | 
| 
 | 
   152 
 | 
| 
 | 
   153         # Key is present, no value specified
 | 
| 
 | 
   154         if kwargs[key] is None:
 | 
| 
 | 
   155             return True
 | 
| 
 | 
   156 
 | 
| 
 | 
   157         # Otherwise there is a key value we're looking for.
 | 
| 
 | 
   158         # so we make a list of matches
 | 
| 
 | 
   159         matches = []
 | 
| 
 | 
   160         # And check all of the feature qualifier valuse
 | 
| 
 | 
   161         for value in feature.qualifiers[key]:
 | 
| 
 | 
   162             # For that kwargs[key] value
 | 
| 
 | 
   163             for x in kwargs[key]:
 | 
| 
 | 
   164                 matches.append(x in value)
 | 
| 
 | 
   165 
 | 
| 
 | 
   166         # If none matched, then we return false.
 | 
| 
 | 
   167         if not any(matches):
 | 
| 
 | 
   168             return False
 | 
| 
 | 
   169 
 | 
| 
 | 
   170     return True
 | 
| 
 | 
   171 
 | 
| 
 | 
   172 
 | 
| 
 | 
   173 def feature_test_contains(feature, **kwargs):
 | 
| 
 | 
   174     if "index" in kwargs:
 | 
| 
 | 
   175         return feature.location.start < kwargs["index"] < feature.location.end
 | 
| 
 | 
   176     elif "range" in kwargs:
 | 
| 
 | 
   177         return (
 | 
| 
 | 
   178             feature.location.start < kwargs["range"]["start"] < feature.location.end
 | 
| 
 | 
   179             and feature.location.start < kwargs["range"]["end"] < feature.location.end
 | 
| 
 | 
   180         )
 | 
| 
 | 
   181     else:
 | 
| 
 | 
   182         raise RuntimeError("Must use index or range keyword")
 | 
| 
 | 
   183 
 | 
| 
 | 
   184 
 | 
| 
 | 
   185 def get_id(feature=None, parent_prefix=None):
 | 
| 
 | 
   186     result = ""
 | 
| 
 | 
   187     if parent_prefix is not None:
 | 
| 
 | 
   188         result += parent_prefix + "|"
 | 
| 
 | 
   189     if "locus_tag" in feature.qualifiers:
 | 
| 
 | 
   190         result += feature.qualifiers["locus_tag"][0]
 | 
| 
 | 
   191     elif "gene" in feature.qualifiers:
 | 
| 
 | 
   192         result += feature.qualifiers["gene"][0]
 | 
| 
 | 
   193     elif "Gene" in feature.qualifiers:
 | 
| 
 | 
   194         result += feature.qualifiers["Gene"][0]
 | 
| 
 | 
   195     elif "product" in feature.qualifiers:
 | 
| 
 | 
   196         result += feature.qualifiers["product"][0]
 | 
| 
 | 
   197     elif "Product" in feature.qualifiers:
 | 
| 
 | 
   198         result += feature.qualifiers["Product"][0]
 | 
| 
 | 
   199     elif "Name" in feature.qualifiers:
 | 
| 
 | 
   200         result += feature.qualifiers["Name"][0]
 | 
| 
 | 
   201     else:
 | 
| 
 | 
   202         return feature.id
 | 
| 
 | 
   203         # Leaving in case bad things happen.
 | 
| 
 | 
   204         # result += '%s_%s_%s_%s' % (
 | 
| 
 | 
   205         # feature.id,
 | 
| 
 | 
   206         # feature.location.start,
 | 
| 
 | 
   207         # feature.location.end,
 | 
| 
 | 
   208         # feature.location.strand
 | 
| 
 | 
   209         # )
 | 
| 
 | 
   210     return result
 | 
| 
 | 
   211 
 | 
| 
 | 
   212 
 | 
| 
 | 
   213 def get_gff3_id(gene):
 | 
| 
 | 
   214     return gene.qualifiers.get("Name", [gene.id])[0]
 | 
| 
 | 
   215 
 | 
| 
 | 
   216 
 | 
| 
 | 
   217 def ensure_location_in_bounds(start=0, end=0, parent_length=0):
 | 
| 
 | 
   218     # This prevents frameshift errors
 | 
| 
 | 
   219     while start < 0:
 | 
| 
 | 
   220         start += 3
 | 
| 
 | 
   221     while end < 0:
 | 
| 
 | 
   222         end += 3
 | 
| 
 | 
   223     while start > parent_length:
 | 
| 
 | 
   224         start -= 3
 | 
| 
 | 
   225     while end > parent_length:
 | 
| 
 | 
   226         end -= 3
 | 
| 
 | 
   227     return (start, end)
 | 
| 
 | 
   228 
 | 
| 
 | 
   229 
 | 
| 
 | 
   230 def coding_genes(feature_list):
 | 
| 
 | 
   231     for x in genes(feature_list):
 | 
| 
 | 
   232         if (
 | 
| 
 | 
   233             len(
 | 
| 
 | 
   234                 list(
 | 
| 
 | 
   235                     feature_lambda(
 | 
| 
 | 
   236                         x.sub_features,
 | 
| 
 | 
   237                         feature_test_type,
 | 
| 
 | 
   238                         {"type": "CDS"},
 | 
| 
 | 
   239                         subfeatures=False,
 | 
| 
 | 
   240                     )
 | 
| 
 | 
   241                 )
 | 
| 
 | 
   242             )
 | 
| 
 | 
   243             > 0
 | 
| 
 | 
   244         ):
 | 
| 
 | 
   245             yield x
 | 
| 
 | 
   246 
 | 
| 
 | 
   247 
 | 
| 
 | 
   248 def genes(feature_list, feature_type="gene", sort=False):
 | 
| 
 | 
   249     """
 | 
| 
 | 
   250     Simple filter to extract gene features from the feature set.
 | 
| 
 | 
   251     """
 | 
| 
 | 
   252 
 | 
| 
 | 
   253     if not sort:
 | 
| 
 | 
   254         for x in feature_lambda(
 | 
| 
 | 
   255             feature_list, feature_test_type, {"type": feature_type}, subfeatures=True
 | 
| 
 | 
   256         ):
 | 
| 
 | 
   257             yield x
 | 
| 
 | 
   258     else:
 | 
| 
 | 
   259         data = list(genes(feature_list, feature_type=feature_type, sort=False))
 | 
| 
 | 
   260         data = sorted(data, key=lambda feature: feature.location.start)
 | 
| 
 | 
   261         for x in data:
 | 
| 
 | 
   262             yield x
 | 
| 
 | 
   263 
 | 
| 
 | 
   264 
 | 
| 
 | 
   265 def wa_unified_product_name(feature):
 | 
| 
 | 
   266     """
 | 
| 
 | 
   267     Try and figure out a name. We gave conflicting instructions, so
 | 
| 
 | 
   268     this isn't as trivial as it should be. Sometimes it will be in
 | 
| 
 | 
   269     'product' or 'Product', othertimes in 'Name'
 | 
| 
 | 
   270     """
 | 
| 
 | 
   271     # Manually applied tags.
 | 
| 
 | 
   272     protein_product = feature.qualifiers.get(
 | 
| 
 | 
   273         "product", feature.qualifiers.get("Product", [None])
 | 
| 
 | 
   274     )[0]
 | 
| 
 | 
   275 
 | 
| 
 | 
   276     # If neither of those are available ...
 | 
| 
 | 
   277     if protein_product is None:
 | 
| 
 | 
   278         # And there's a name...
 | 
| 
 | 
   279         if "Name" in feature.qualifiers:
 | 
| 
 | 
   280             if not is_uuid(feature.qualifiers["Name"][0]):
 | 
| 
 | 
   281                 protein_product = feature.qualifiers["Name"][0]
 | 
| 
 | 
   282 
 | 
| 
 | 
   283     return protein_product
 | 
| 
 | 
   284 
 | 
| 
 | 
   285 
 | 
| 
 | 
   286 def is_uuid(name):
 | 
| 
 | 
   287     return name.count("-") == 4 and len(name) == 36
 | 
| 
 | 
   288 
 | 
| 
 | 
   289 
 | 
| 
 | 
   290 def get_rbs_from(gene):
 | 
| 
 | 
   291     # Normal RBS annotation types
 | 
| 
 | 
   292     rbs_rbs = list(
 | 
| 
 | 
   293         feature_lambda(
 | 
| 
 | 
   294             gene.sub_features, feature_test_type, {"type": "RBS"}, subfeatures=False
 | 
| 
 | 
   295         )
 | 
| 
 | 
   296     )
 | 
| 
 | 
   297     rbs_sds = list(
 | 
| 
 | 
   298         feature_lambda(
 | 
| 
 | 
   299             gene.sub_features,
 | 
| 
 | 
   300             feature_test_type,
 | 
| 
 | 
   301             {"type": "Shine_Dalgarno_sequence"},
 | 
| 
 | 
   302             subfeatures=False,
 | 
| 
 | 
   303         )
 | 
| 
 | 
   304     )
 | 
| 
 | 
   305     # Fraking apollo
 | 
| 
 | 
   306     apollo_exons = list(
 | 
| 
 | 
   307         feature_lambda(
 | 
| 
 | 
   308             gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False
 | 
| 
 | 
   309         )
 | 
| 
 | 
   310     )
 | 
| 
 | 
   311     apollo_exons = [x for x in apollo_exons if len(x) < 10]
 | 
| 
 | 
   312     # These are more NCBI's style
 | 
| 
 | 
   313     regulatory_elements = list(
 | 
| 
 | 
   314         feature_lambda(
 | 
| 
 | 
   315             gene.sub_features,
 | 
| 
 | 
   316             feature_test_type,
 | 
| 
 | 
   317             {"type": "regulatory"},
 | 
| 
 | 
   318             subfeatures=False,
 | 
| 
 | 
   319         )
 | 
| 
 | 
   320     )
 | 
| 
 | 
   321     rbs_regulatory = list(
 | 
| 
 | 
   322         feature_lambda(
 | 
| 
 | 
   323             regulatory_elements,
 | 
| 
 | 
   324             feature_test_quals,
 | 
| 
 | 
   325             {"regulatory_class": ["ribosome_binding_site"]},
 | 
| 
 | 
   326             subfeatures=False,
 | 
| 
 | 
   327         )
 | 
| 
 | 
   328     )
 | 
| 
 | 
   329     # Here's hoping you find just one ;)
 | 
| 
 | 
   330     return rbs_rbs + rbs_sds + rbs_regulatory + apollo_exons
 | 
| 
 | 
   331 
 | 
| 
 | 
   332 
 | 
| 
 | 
   333 def nice_name(record):
 | 
| 
 | 
   334     """
 | 
| 
 | 
   335     get the real name rather than NCBI IDs and so on. If fails, will return record.id
 | 
| 
 | 
   336     """
 | 
| 
 | 
   337     name = record.id
 | 
| 
 | 
   338     likely_parental_contig = list(genes(record.features, feature_type="contig"))
 | 
| 
 | 
   339     if len(likely_parental_contig) == 1:
 | 
| 
 | 
   340         name = likely_parental_contig[0].qualifiers.get("organism", [name])[0]
 | 
| 
 | 
   341     return name
 | 
| 
 | 
   342 
 | 
| 
 | 
   343 
 | 
| 
 | 
   344 def fsort(it):
 | 
| 
 | 
   345     for i in sorted(it, key=lambda x: int(x.location.start)):
 | 
| 
 | 
   346         yield i
 |