Repository 'flow_datatypes'
hg clone https://toolshed.g2.bx.psu.edu/repos/immport-devteam/flow_datatypes

Changeset 0:ed90d166300e (2017-02-27)
Commit message:
Uploaded
added:
flow.py
b
diff -r 000000000000 -r ed90d166300e flow.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/flow.py Mon Feb 27 15:18:40 2017 -0500
[
b'@@ -0,0 +1,307 @@\n+# -*- coding: utf-8 -*-\n+######################################################################\n+#                  Copyright (c) 2016 Northrop Grumman.\n+#                          All rights reserved.\n+######################################################################\n+\n+"""\n+Flow analysis datatypes.\n+"""\n+\n+import gzip\n+import json\n+import logging\n+import os\n+import re\n+import subprocess\n+import tempfile\n+import rpy2.interactive as r\n+import rpy2.interactive.packages\n+\n+from galaxy.datatypes.binary import Binary\n+from galaxy.datatypes.tabular import Tabular\n+from galaxy.datatypes.data import get_file_peek, Text\n+from galaxy.datatypes.metadata import MetadataElement\n+from galaxy.util import nice_size, string_as_bool\n+from . import data\n+\n+log = logging.getLogger(__name__)\n+\n+\n+def is_number(s):\n+    try:\n+        float(s)\n+        return True\n+    except ValueError:\n+        return False\n+\n+\n+class FCS(Binary):\n+    """Class describing an FCS binary file"""\n+    file_ext = "fcs"\n+\n+    def set_peek(self, dataset, is_multi_byte=False):\n+        if not dataset.dataset.purged:\n+            dataset.peek = "Binary FCS file"\n+            dataset.blurb = data.nice_size(dataset.get_size())\n+        else:\n+            dataset.peek = \'file does not exist\'\n+            dataset.blurb = \'file purged from disk\'\n+\n+    def display_peek(self, dataset):\n+        try:\n+            return dataset.peek\n+        except:\n+            return "Binary FCSfile (%s)" % (data.nice_size(dataset.get_size()))\n+\n+    def sniff(self, filename):\n+        """\n+        Checking if the file is in FCS format. Should read FCS2.0, FCS3.0\n+        and FCS3.1\n+        """\n+        r.packages.importr("flowCore")\n+        rlib = r.packages.packages\n+        try:\n+            fcsobject = rlib.flowCore.isFCSfile(filename)\n+            return list(fcsobject)[0]\n+        except:\n+            return False\n+\n+    def get_mime(self):\n+        """Returns the mime type of the datatype"""\n+        return \'application/octet-stream\'\n+Binary.register_sniffable_binary_format("fcs","fcs",FCS)\n+\n+class FlowText(Tabular):\n+    """Class describing an Flow Text file"""\n+    file_ext = "flowtext"\n+\n+    def set_peek(self, dataset, is_multi_byte=False):\n+        if not dataset.dataset.purged:\n+            dataset.peek = "Text Flow file"\n+            dataset.blurb = data.nice_size(dataset.get_size())\n+        else:\n+            dataset.peek = \'file does not exist\'\n+            dataset.blurb = \'file purged from disk\'\n+\n+    def display_peek(self, dataset):\n+        try:\n+            return dataset.peek\n+        except:\n+            return "Text Flow file (%s)" % (data.nice_size(dataset.get_size()))\n+\n+    def sniff(self, filename):\n+        """Quick test on file formatting and values"""\n+        with open(filename, "r") as f:\n+            f.readline()\n+            values = f.readline().strip().split("\\t")\n+            for vals in values:\n+                if not is_number(vals):\n+                    return False\n+            return True\n+\n+    def get_mime(self):\n+        """Returns the mime type of the datatype"""\n+        return \'text/tab-separated-values\'\n+\n+\n+class FlowClustered(Tabular):\n+    """Class describing a Flow Text that has been clustered through FLOCK"""\n+    file_ext = "flowclr"\n+\n+    def set_peek(self, dataset, is_multi_byte=False):\n+        if not dataset.dataset.purged:\n+            dataset.peek = "Text Flow Clustered file"\n+            dataset.blurb = data.nice_size(dataset.get_size())\n+        else:\n+            dataset.peek = \'file does not exist\'\n+            dataset.blurb = \'file purged from disk\'\n+\n+    def display_peek(self, dataset):\n+        try:\n+            return dataset.peek\n+        except:\n+            return "Flow Text Clustered file (%s)" % (data.nice_size(dataset.get_size()))\n+\n+    def sniff(self, filename):\n+        """Quick test on headers and values"""\n+        with open(filename, "r") as f:\n+            population = f.readline'..b'ataset):\n+        try:\n+            return dataset.peek\n+        except:\n+            return "Flow Stats1 file (%s)" % (data.nice_size(dataset.get_size()))\n+\n+    def sniff(self, filename):\n+        """Quick test on file formatting and values"""\n+        with open(filename, "r") as f:\n+            first_header = f.readline().strip().split("\\t")[0]\n+            if first_header != "FileID":\n+                return False\n+            return True\n+\n+    def get_mime(self):\n+        """Returns the mime type of the datatype"""\n+        return \'text/tab-separated-values\'\n+\n+\n+class FlowStats2(Tabular):\n+    """Class describing a Flow Stats file"""\n+    file_ext = "flowstat2"\n+\n+    def set_peek(self, dataset, is_multi_byte=False):\n+        if not dataset.dataset.purged:\n+            dataset.peek = "Flow Stats2 file"\n+            dataset.blurb = data.nice_size(dataset.get_size())\n+        else:\n+            dataset.peek = \'file does not exist\'\n+            dataset.blurb = \'file purged from disk\'\n+\n+    def display_peek(self, dataset):\n+        try:\n+            return dataset.peek\n+        except:\n+            return "Flow Stats2 file (%s)" % (data.nice_size(dataset.get_size()))\n+\n+    def sniff(self, filename):\n+        """Quick test on file formatting and values"""\n+        with open(filename, "r") as f:\n+            smp_name = f.readline().strip().split("\\t")[-1]\n+            if smp_name != "SampleName":\n+                return False\n+            return True\n+\n+    def get_mime(self):\n+        """Returns the mime type of the datatype"""\n+        return \'text/tab-separated-values\'\n+\n+\n+class FlowStats3(Tabular):\n+    """Class describing a Flow Stats file"""\n+    file_ext = "flowstat3"\n+\n+    def set_peek(self, dataset, is_multi_byte=False):\n+        if not dataset.dataset.purged:\n+            dataset.peek = "Flow Stats3 file"\n+            dataset.blurb = data.nice_size(dataset.get_size())\n+        else:\n+            dataset.peek = \'file does not exist\'\n+            dataset.blurb = \'file purged from disk\'\n+\n+    def display_peek(self, dataset):\n+        try:\n+            return dataset.peek\n+        except:\n+            return "Flow Stats3 file (%s)" % (data.nice_size(dataset.get_size()))\n+\n+    def sniff(self, filename):\n+        """Quick test on file formatting and values"""\n+        with open(filename, "r") as f:\n+            last_col = f.readline().strip().split("\\t")[-1]\n+            if last_col != "Percentage_stdev":\n+                return False\n+            values = f.readline().strip().split("\\t")\n+            for vals in values:\n+                if not is_number(vals):\n+                    return False\n+            return True\n+\n+    def get_mime(self):\n+        """Returns the mime type of the datatype"""\n+        return \'text/tab-separated-values\'\n+\n+\n+class FlowScore(Tabular):\n+    """Class describing a Flow Score file"""\n+    file_ext = "flowscore"\n+\n+    def set_peek(self, dataset, is_multi_byte=False):\n+        if not dataset.dataset.purged:\n+            dataset.peek = "Flow Score file"\n+            dataset.blurb = data.nice_size(dataset.get_size())\n+        else:\n+            dataset.peek = \'file does not exist\'\n+            dataset.blurb = \'file purged from disk\'\n+\n+    def display_peek(self, dataset):\n+        try:\n+            return dataset.peek\n+        except:\n+            return "Flow Score file (%s)" % (data.nice_size(dataset.get_size()))\n+\n+    def sniff(self, filename):\n+        """Quick test on file formatting and values"""\n+        with open(filename, "r") as f:\n+            population = f.readline().strip().split("\\t")[0]\n+            if population != "Population_ID":\n+                return False\n+            values = f.readline().strip().split("\\t")\n+            for vals in values:\n+                if not is_number(vals):\n+                    return False\n+            return True\n+\n+    def get_mime(self):\n+        """Returns the mime type of the datatype"""\n+        return \'text/tab-separated-values\'\n'