diff data_manager/data_manager_selection_background.py @ 0:bb5794942b5e draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_selection_background commit af3bfbbd3f1236bf96a25bcb8483f2889295ec0c"
author iuc
date Fri, 20 Aug 2021 21:03:25 +0000
parents
children 0bd1cd5eac51
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/data_manager/data_manager_selection_background.py	Fri Aug 20 21:03:25 2021 +0000
@@ -0,0 +1,138 @@
+# -*- coding: utf-8 -*-
+import argparse
+import bz2
+import gzip
+import json
+import os
+import shutil
+import sys
+import uuid
+import zipfile
+
+from six.moves.urllib.request import urlretrieve
+
+# Nice solution to opening compressed files (zip/bz2/gz) transparently
+# https://stackoverflow.com/a/13045892/638445
+
+
+class CompressedFile(object):
+    magic = None
+    file_type = None
+    mime_type = None
+    proper_extension = None
+
+    def __init__(self, f):
+        # f is an open file or file like object
+        self.f = f
+        self.accessor = self.open()
+
+    @classmethod
+    def is_magic(self, data):
+        return data.startswith(self.magic)
+
+    def open(self):
+        return None
+
+
+class ZIPFile(CompressedFile):
+    magic = '\x50\x4b\x03\x04'
+    file_type = 'zip'
+    mime_type = 'compressed/zip'
+
+    def open(self):
+        return zipfile.ZipFile(self.f)
+
+
+class BZ2File(CompressedFile):
+    magic = '\x42\x5a\x68'
+    file_type = 'bz2'
+    mime_type = 'compressed/bz2'
+
+    def open(self):
+        return bz2.BZ2File(self.f)
+
+
+class GZFile(CompressedFile):
+    magic = '\x1f\x8b\x08'
+    file_type = 'gz'
+    mime_type = 'compressed/gz'
+
+    def open(self):
+        return gzip.GzipFile(self.f)
+
+
+# factory function to create a suitable instance for accessing files
+def get_compressed_file(filename):
+    with open(filename, 'rb') as f:
+        start_of_file = f.read(1024)
+        f.seek(0)
+        for cls in (ZIPFile, BZ2File, GZFile):
+            if cls.is_magic(start_of_file):
+                f.close()
+                return cls(filename)
+
+        return None
+
+
+def url_download(url):
+    """Attempt to download gene annotation file from a given url
+    :param url: full url to gene annotation file
+    :type url: str.
+    :returns: name of downloaded gene annotation file
+    :raises: ContentDecodingError, IOError
+    """
+
+    # Generate file_name
+    file_name = url.split('/')[-1]
+
+    try:
+        # download URL (FTP and HTTP work, probably local and data too)
+        urlretrieve(url, file_name)
+
+        # uncompress file if needed
+        cf = get_compressed_file(file_name)
+        if cf is not None:
+            uncompressed_file_name = os.path.splitext(file_name)[0]
+            with open(uncompressed_file_name, 'w+') as uncompressed_file:
+                shutil.copyfileobj(cf.accessor, uncompressed_file)
+            os.remove(file_name)
+            file_name = uncompressed_file_name
+    except IOError as e:
+        sys.stderr.write('Error occured downloading reference file: %s' % e)
+        os.remove(file_name)
+    return file_name
+
+
+def main():
+    parser = argparse.ArgumentParser(description='Create data manager JSON.')
+    parser.add_argument('--output', dest='output', action='store', required=True, help='JSON filename')
+    parser.add_argument('--dbkey', dest='dbkey', action='store', default=uuid.uuid4(), help='Data table entry unique ID')
+    parser.add_argument('--label', dest='label', action='store', required=True, help='Label to display')
+    parser.add_argument('--uri', dest='uri', action='store', help='URI for the sequences')
+    parser.add_argument('--dataset', dest='dataset', action='store', help='Path for the sequences')
+
+    args = parser.parse_args()
+
+    work_dir = os.getcwd()
+
+    if args.uri is not None:
+        background_fasta = url_download(args.uri)
+    else:
+        background_fasta = args.dataset
+
+    table_entry = '%s.fa' % args.dbkey
+    shutil.copy(background_fasta, os.path.join(work_dir, table_entry))
+
+    # Update Data Manager JSON and write to file
+    data_manager_entry = {
+        'data_tables': {
+            'bealign_selection': {'value': args.dbkey, 'label': args.label, 'path': table_entry}
+        }
+    }
+
+    with open(os.path.join(args.output), 'w+') as fh:
+        json.dump(data_manager_entry, fh, sort_keys=True)
+
+
+if __name__ == '__main__':
+    main()