Mercurial > repos > iuc > data_manager_selection_background
comparison data_manager/data_manager_selection_background.py @ 0:bb5794942b5e draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_selection_background commit af3bfbbd3f1236bf96a25bcb8483f2889295ec0c"
author | iuc |
---|---|
date | Fri, 20 Aug 2021 21:03:25 +0000 |
parents | |
children | 0bd1cd5eac51 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:bb5794942b5e |
---|---|
1 # -*- coding: utf-8 -*- | |
2 import argparse | |
3 import bz2 | |
4 import gzip | |
5 import json | |
6 import os | |
7 import shutil | |
8 import sys | |
9 import uuid | |
10 import zipfile | |
11 | |
12 from six.moves.urllib.request import urlretrieve | |
13 | |
14 # Nice solution to opening compressed files (zip/bz2/gz) transparently | |
15 # https://stackoverflow.com/a/13045892/638445 | |
16 | |
17 | |
18 class CompressedFile(object): | |
19 magic = None | |
20 file_type = None | |
21 mime_type = None | |
22 proper_extension = None | |
23 | |
24 def __init__(self, f): | |
25 # f is an open file or file like object | |
26 self.f = f | |
27 self.accessor = self.open() | |
28 | |
29 @classmethod | |
30 def is_magic(self, data): | |
31 return data.startswith(self.magic) | |
32 | |
33 def open(self): | |
34 return None | |
35 | |
36 | |
37 class ZIPFile(CompressedFile): | |
38 magic = '\x50\x4b\x03\x04' | |
39 file_type = 'zip' | |
40 mime_type = 'compressed/zip' | |
41 | |
42 def open(self): | |
43 return zipfile.ZipFile(self.f) | |
44 | |
45 | |
46 class BZ2File(CompressedFile): | |
47 magic = '\x42\x5a\x68' | |
48 file_type = 'bz2' | |
49 mime_type = 'compressed/bz2' | |
50 | |
51 def open(self): | |
52 return bz2.BZ2File(self.f) | |
53 | |
54 | |
55 class GZFile(CompressedFile): | |
56 magic = '\x1f\x8b\x08' | |
57 file_type = 'gz' | |
58 mime_type = 'compressed/gz' | |
59 | |
60 def open(self): | |
61 return gzip.GzipFile(self.f) | |
62 | |
63 | |
64 # factory function to create a suitable instance for accessing files | |
65 def get_compressed_file(filename): | |
66 with open(filename, 'rb') as f: | |
67 start_of_file = f.read(1024) | |
68 f.seek(0) | |
69 for cls in (ZIPFile, BZ2File, GZFile): | |
70 if cls.is_magic(start_of_file): | |
71 f.close() | |
72 return cls(filename) | |
73 | |
74 return None | |
75 | |
76 | |
77 def url_download(url): | |
78 """Attempt to download gene annotation file from a given url | |
79 :param url: full url to gene annotation file | |
80 :type url: str. | |
81 :returns: name of downloaded gene annotation file | |
82 :raises: ContentDecodingError, IOError | |
83 """ | |
84 | |
85 # Generate file_name | |
86 file_name = url.split('/')[-1] | |
87 | |
88 try: | |
89 # download URL (FTP and HTTP work, probably local and data too) | |
90 urlretrieve(url, file_name) | |
91 | |
92 # uncompress file if needed | |
93 cf = get_compressed_file(file_name) | |
94 if cf is not None: | |
95 uncompressed_file_name = os.path.splitext(file_name)[0] | |
96 with open(uncompressed_file_name, 'w+') as uncompressed_file: | |
97 shutil.copyfileobj(cf.accessor, uncompressed_file) | |
98 os.remove(file_name) | |
99 file_name = uncompressed_file_name | |
100 except IOError as e: | |
101 sys.stderr.write('Error occured downloading reference file: %s' % e) | |
102 os.remove(file_name) | |
103 return file_name | |
104 | |
105 | |
106 def main(): | |
107 parser = argparse.ArgumentParser(description='Create data manager JSON.') | |
108 parser.add_argument('--output', dest='output', action='store', required=True, help='JSON filename') | |
109 parser.add_argument('--dbkey', dest='dbkey', action='store', default=uuid.uuid4(), help='Data table entry unique ID') | |
110 parser.add_argument('--label', dest='label', action='store', required=True, help='Label to display') | |
111 parser.add_argument('--uri', dest='uri', action='store', help='URI for the sequences') | |
112 parser.add_argument('--dataset', dest='dataset', action='store', help='Path for the sequences') | |
113 | |
114 args = parser.parse_args() | |
115 | |
116 work_dir = os.getcwd() | |
117 | |
118 if args.uri is not None: | |
119 background_fasta = url_download(args.uri) | |
120 else: | |
121 background_fasta = args.dataset | |
122 | |
123 table_entry = '%s.fa' % args.dbkey | |
124 shutil.copy(background_fasta, os.path.join(work_dir, table_entry)) | |
125 | |
126 # Update Data Manager JSON and write to file | |
127 data_manager_entry = { | |
128 'data_tables': { | |
129 'bealign_selection': {'value': args.dbkey, 'label': args.label, 'path': table_entry} | |
130 } | |
131 } | |
132 | |
133 with open(os.path.join(args.output), 'w+') as fh: | |
134 json.dump(data_manager_entry, fh, sort_keys=True) | |
135 | |
136 | |
137 if __name__ == '__main__': | |
138 main() |