Mercurial > repos > xuebing > sharplabtool
comparison tools/data_source/upload.py @ 0:9071e359b9a3
Uploaded
author | xuebing |
---|---|
date | Fri, 09 Mar 2012 19:37:19 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9071e359b9a3 |
---|---|
1 #!/usr/bin/env python | |
2 #Processes uploads from the user. | |
3 | |
4 # WARNING: Changes in this tool (particularly as related to parsing) may need | |
5 # to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools | |
6 | |
7 import urllib, sys, os, gzip, tempfile, shutil, re, gzip, zipfile, codecs, binascii | |
8 from galaxy import eggs | |
9 # need to import model before sniff to resolve a circular import dependency | |
10 import galaxy.model | |
11 from galaxy.datatypes.checkers import * | |
12 from galaxy.datatypes import sniff | |
13 from galaxy.datatypes.binary import * | |
14 from galaxy.datatypes.images import Pdf | |
15 from galaxy.datatypes.registry import Registry | |
16 from galaxy import util | |
17 from galaxy.datatypes.util.image_util import * | |
18 from galaxy.util.json import * | |
19 | |
20 try: | |
21 import Image as PIL | |
22 except ImportError: | |
23 try: | |
24 from PIL import Image as PIL | |
25 except: | |
26 PIL = None | |
27 | |
28 try: | |
29 import bz2 | |
30 except: | |
31 bz2 = None | |
32 | |
33 assert sys.version_info[:2] >= ( 2, 4 ) | |
34 | |
35 def stop_err( msg, ret=1 ): | |
36 sys.stderr.write( msg ) | |
37 sys.exit( ret ) | |
38 def file_err( msg, dataset, json_file ): | |
39 json_file.write( to_json_string( dict( type = 'dataset', | |
40 ext = 'data', | |
41 dataset_id = dataset.dataset_id, | |
42 stderr = msg ) ) + "\n" ) | |
43 # never remove a server-side upload | |
44 if dataset.type in ( 'server_dir', 'path_paste' ): | |
45 return | |
46 try: | |
47 os.remove( dataset.path ) | |
48 except: | |
49 pass | |
50 def safe_dict(d): | |
51 """ | |
52 Recursively clone json structure with UTF-8 dictionary keys | |
53 http://mellowmachines.com/blog/2009/06/exploding-dictionary-with-unicode-keys-as-python-arguments/ | |
54 """ | |
55 if isinstance(d, dict): | |
56 return dict([(k.encode('utf-8'), safe_dict(v)) for k,v in d.iteritems()]) | |
57 elif isinstance(d, list): | |
58 return [safe_dict(x) for x in d] | |
59 else: | |
60 return d | |
61 def check_bam( file_path ): | |
62 return Bam().sniff( file_path ) | |
63 def check_sff( file_path ): | |
64 return Sff().sniff( file_path ) | |
65 def check_pdf( file_path ): | |
66 return Pdf().sniff( file_path ) | |
67 def check_bigwig( file_path ): | |
68 return BigWig().sniff( file_path ) | |
69 def check_bigbed( file_path ): | |
70 return BigBed().sniff( file_path ) | |
71 def parse_outputs( args ): | |
72 rval = {} | |
73 for arg in args: | |
74 id, files_path, path = arg.split( ':', 2 ) | |
75 rval[int( id )] = ( path, files_path ) | |
76 return rval | |
77 def add_file( dataset, registry, json_file, output_path ): | |
78 data_type = None | |
79 line_count = None | |
80 converted_path = None | |
81 stdout = None | |
82 link_data_only = dataset.get( 'link_data_only', 'copy_files' ) | |
83 | |
84 try: | |
85 ext = dataset.file_type | |
86 except AttributeError: | |
87 file_err( 'Unable to process uploaded file, missing file_type parameter.', dataset, json_file ) | |
88 return | |
89 | |
90 if dataset.type == 'url': | |
91 try: | |
92 temp_name, dataset.is_multi_byte = sniff.stream_to_file( urllib.urlopen( dataset.path ), prefix='url_paste' ) | |
93 except Exception, e: | |
94 file_err( 'Unable to fetch %s\n%s' % ( dataset.path, str( e ) ), dataset, json_file ) | |
95 return | |
96 dataset.path = temp_name | |
97 # See if we have an empty file | |
98 if not os.path.exists( dataset.path ): | |
99 file_err( 'Uploaded temporary file (%s) does not exist.' % dataset.path, dataset, json_file ) | |
100 return | |
101 if not os.path.getsize( dataset.path ) > 0: | |
102 file_err( 'The uploaded file is empty', dataset, json_file ) | |
103 return | |
104 if not dataset.type == 'url': | |
105 # Already set is_multi_byte above if type == 'url' | |
106 try: | |
107 dataset.is_multi_byte = util.is_multi_byte( codecs.open( dataset.path, 'r', 'utf-8' ).read( 100 ) ) | |
108 except UnicodeDecodeError, e: | |
109 dataset.is_multi_byte = False | |
110 # Is dataset an image? | |
111 image = check_image( dataset.path ) | |
112 if image: | |
113 if not PIL: | |
114 image = None | |
115 # get_image_ext() returns None if nor a supported Image type | |
116 ext = get_image_ext( dataset.path, image ) | |
117 data_type = ext | |
118 # Is dataset content multi-byte? | |
119 elif dataset.is_multi_byte: | |
120 data_type = 'multi-byte char' | |
121 ext = sniff.guess_ext( dataset.path, is_multi_byte=True ) | |
122 # Is dataset content supported sniffable binary? | |
123 elif check_bam( dataset.path ): | |
124 ext = 'bam' | |
125 data_type = 'bam' | |
126 elif check_sff( dataset.path ): | |
127 ext = 'sff' | |
128 data_type = 'sff' | |
129 elif check_pdf( dataset.path ): | |
130 ext = 'pdf' | |
131 data_type = 'pdf' | |
132 elif check_bigwig( dataset.path ): | |
133 ext = 'bigwig' | |
134 data_type = 'bigwig' | |
135 elif check_bigbed( dataset.path ): | |
136 ext = 'bigbed' | |
137 data_type = 'bigbed' | |
138 if not data_type: | |
139 # See if we have a gzipped file, which, if it passes our restrictions, we'll uncompress | |
140 is_gzipped, is_valid = check_gzip( dataset.path ) | |
141 if is_gzipped and not is_valid: | |
142 file_err( 'The gzipped uploaded file contains inappropriate content', dataset, json_file ) | |
143 return | |
144 elif is_gzipped and is_valid: | |
145 if link_data_only == 'copy_files': | |
146 # We need to uncompress the temp_name file, but BAM files must remain compressed in the BGZF format | |
147 CHUNK_SIZE = 2**20 # 1Mb | |
148 fd, uncompressed = tempfile.mkstemp( prefix='data_id_%s_upload_gunzip_' % dataset.dataset_id, dir=os.path.dirname( output_path ), text=False ) | |
149 gzipped_file = gzip.GzipFile( dataset.path, 'rb' ) | |
150 while 1: | |
151 try: | |
152 chunk = gzipped_file.read( CHUNK_SIZE ) | |
153 except IOError: | |
154 os.close( fd ) | |
155 os.remove( uncompressed ) | |
156 file_err( 'Problem decompressing gzipped data', dataset, json_file ) | |
157 return | |
158 if not chunk: | |
159 break | |
160 os.write( fd, chunk ) | |
161 os.close( fd ) | |
162 gzipped_file.close() | |
163 # Replace the gzipped file with the decompressed file if it's safe to do so | |
164 if dataset.type in ( 'server_dir', 'path_paste' ): | |
165 dataset.path = uncompressed | |
166 else: | |
167 shutil.move( uncompressed, dataset.path ) | |
168 dataset.name = dataset.name.rstrip( '.gz' ) | |
169 data_type = 'gzip' | |
170 if not data_type and bz2 is not None: | |
171 # See if we have a bz2 file, much like gzip | |
172 is_bzipped, is_valid = check_bz2( dataset.path ) | |
173 if is_bzipped and not is_valid: | |
174 file_err( 'The gzipped uploaded file contains inappropriate content', dataset, json_file ) | |
175 return | |
176 elif is_bzipped and is_valid: | |
177 if link_data_only == 'copy_files': | |
178 # We need to uncompress the temp_name file | |
179 CHUNK_SIZE = 2**20 # 1Mb | |
180 fd, uncompressed = tempfile.mkstemp( prefix='data_id_%s_upload_bunzip2_' % dataset.dataset_id, dir=os.path.dirname( output_path ), text=False ) | |
181 bzipped_file = bz2.BZ2File( dataset.path, 'rb' ) | |
182 while 1: | |
183 try: | |
184 chunk = bzipped_file.read( CHUNK_SIZE ) | |
185 except IOError: | |
186 os.close( fd ) | |
187 os.remove( uncompressed ) | |
188 file_err( 'Problem decompressing bz2 compressed data', dataset, json_file ) | |
189 return | |
190 if not chunk: | |
191 break | |
192 os.write( fd, chunk ) | |
193 os.close( fd ) | |
194 bzipped_file.close() | |
195 # Replace the bzipped file with the decompressed file if it's safe to do so | |
196 if dataset.type in ( 'server_dir', 'path_paste' ): | |
197 dataset.path = uncompressed | |
198 else: | |
199 shutil.move( uncompressed, dataset.path ) | |
200 dataset.name = dataset.name.rstrip( '.bz2' ) | |
201 data_type = 'bz2' | |
202 if not data_type: | |
203 # See if we have a zip archive | |
204 is_zipped = check_zip( dataset.path ) | |
205 if is_zipped: | |
206 if link_data_only == 'copy_files': | |
207 CHUNK_SIZE = 2**20 # 1Mb | |
208 uncompressed = None | |
209 uncompressed_name = None | |
210 unzipped = False | |
211 z = zipfile.ZipFile( dataset.path ) | |
212 for name in z.namelist(): | |
213 if name.endswith('/'): | |
214 continue | |
215 if unzipped: | |
216 stdout = 'ZIP file contained more than one file, only the first file was added to Galaxy.' | |
217 break | |
218 fd, uncompressed = tempfile.mkstemp( prefix='data_id_%s_upload_zip_' % dataset.dataset_id, dir=os.path.dirname( output_path ), text=False ) | |
219 if sys.version_info[:2] >= ( 2, 6 ): | |
220 zipped_file = z.open( name ) | |
221 while 1: | |
222 try: | |
223 chunk = zipped_file.read( CHUNK_SIZE ) | |
224 except IOError: | |
225 os.close( fd ) | |
226 os.remove( uncompressed ) | |
227 file_err( 'Problem decompressing zipped data', dataset, json_file ) | |
228 return | |
229 if not chunk: | |
230 break | |
231 os.write( fd, chunk ) | |
232 os.close( fd ) | |
233 zipped_file.close() | |
234 uncompressed_name = name | |
235 unzipped = True | |
236 else: | |
237 # python < 2.5 doesn't have a way to read members in chunks(!) | |
238 try: | |
239 outfile = open( uncompressed, 'wb' ) | |
240 outfile.write( z.read( name ) ) | |
241 outfile.close() | |
242 uncompressed_name = name | |
243 unzipped = True | |
244 except IOError: | |
245 os.close( fd ) | |
246 os.remove( uncompressed ) | |
247 file_err( 'Problem decompressing zipped data', dataset, json_file ) | |
248 return | |
249 z.close() | |
250 # Replace the zipped file with the decompressed file if it's safe to do so | |
251 if uncompressed is not None: | |
252 if dataset.type in ( 'server_dir', 'path_paste' ): | |
253 dataset.path = uncompressed | |
254 else: | |
255 shutil.move( uncompressed, dataset.path ) | |
256 dataset.name = uncompressed_name | |
257 data_type = 'zip' | |
258 if not data_type: | |
259 if check_binary( dataset.path ): | |
260 # We have a binary dataset, but it is not Bam, Sff or Pdf | |
261 data_type = 'binary' | |
262 #binary_ok = False | |
263 parts = dataset.name.split( "." ) | |
264 if len( parts ) > 1: | |
265 ext = parts[1].strip().lower() | |
266 if ext not in unsniffable_binary_formats: | |
267 file_err( 'The uploaded binary file contains inappropriate content', dataset, json_file ) | |
268 return | |
269 elif ext in unsniffable_binary_formats and dataset.file_type != ext: | |
270 err_msg = "You must manually set the 'File Format' to '%s' when uploading %s files." % ( ext.capitalize(), ext ) | |
271 file_err( err_msg, dataset, json_file ) | |
272 return | |
273 if not data_type: | |
274 # We must have a text file | |
275 if check_html( dataset.path ): | |
276 file_err( 'The uploaded file contains inappropriate HTML content', dataset, json_file ) | |
277 return | |
278 if data_type != 'binary': | |
279 if link_data_only == 'copy_files': | |
280 in_place = True | |
281 if dataset.type in ( 'server_dir', 'path_paste' ) and data_type not in [ 'gzip', 'bz2', 'zip' ]: | |
282 in_place = False | |
283 if dataset.space_to_tab: | |
284 line_count, converted_path = sniff.convert_newlines_sep2tabs( dataset.path, in_place=in_place ) | |
285 else: | |
286 line_count, converted_path = sniff.convert_newlines( dataset.path, in_place=in_place ) | |
287 if dataset.file_type == 'auto': | |
288 ext = sniff.guess_ext( dataset.path, registry.sniff_order ) | |
289 else: | |
290 ext = dataset.file_type | |
291 data_type = ext | |
292 # Save job info for the framework | |
293 if ext == 'auto' and dataset.ext: | |
294 ext = dataset.ext | |
295 if ext == 'auto': | |
296 ext = 'data' | |
297 datatype = registry.get_datatype_by_extension( ext ) | |
298 if dataset.type in ( 'server_dir', 'path_paste' ) and link_data_only == 'link_to_files': | |
299 # Never alter a file that will not be copied to Galaxy's local file store. | |
300 if datatype.dataset_content_needs_grooming( dataset.path ): | |
301 err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \ | |
302 '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.' | |
303 file_err( err_msg, dataset, json_file ) | |
304 return | |
305 if link_data_only == 'copy_files' and dataset.type in ( 'server_dir', 'path_paste' ) and data_type not in [ 'gzip', 'bz2', 'zip' ]: | |
306 # Move the dataset to its "real" path | |
307 if converted_path is not None: | |
308 shutil.copy( converted_path, output_path ) | |
309 try: | |
310 os.remove( converted_path ) | |
311 except: | |
312 pass | |
313 else: | |
314 # This should not happen, but it's here just in case | |
315 shutil.copy( dataset.path, output_path ) | |
316 elif link_data_only == 'copy_files': | |
317 shutil.move( dataset.path, output_path ) | |
318 # Write the job info | |
319 stdout = stdout or 'uploaded %s file' % data_type | |
320 info = dict( type = 'dataset', | |
321 dataset_id = dataset.dataset_id, | |
322 ext = ext, | |
323 stdout = stdout, | |
324 name = dataset.name, | |
325 line_count = line_count ) | |
326 json_file.write( to_json_string( info ) + "\n" ) | |
327 if link_data_only == 'copy_files' and datatype.dataset_content_needs_grooming( output_path ): | |
328 # Groom the dataset content if necessary | |
329 datatype.groom_dataset_content( output_path ) | |
330 def add_composite_file( dataset, registry, json_file, output_path, files_path ): | |
331 if dataset.composite_files: | |
332 os.mkdir( files_path ) | |
333 for name, value in dataset.composite_files.iteritems(): | |
334 value = util.bunch.Bunch( **value ) | |
335 if dataset.composite_file_paths[ value.name ] is None and not value.optional: | |
336 file_err( 'A required composite data file was not provided (%s)' % name, dataset, json_file ) | |
337 break | |
338 elif dataset.composite_file_paths[value.name] is not None: | |
339 dp = dataset.composite_file_paths[value.name][ 'path' ] | |
340 isurl = dp.find('://') <> -1 # todo fixme | |
341 if isurl: | |
342 try: | |
343 temp_name, dataset.is_multi_byte = sniff.stream_to_file( urllib.urlopen( dp ), prefix='url_paste' ) | |
344 except Exception, e: | |
345 file_err( 'Unable to fetch %s\n%s' % ( dp, str( e ) ), dataset, json_file ) | |
346 return | |
347 dataset.path = temp_name | |
348 dp = temp_name | |
349 if not value.is_binary: | |
350 if dataset.composite_file_paths[ value.name ].get( 'space_to_tab', value.space_to_tab ): | |
351 sniff.convert_newlines_sep2tabs( dp ) | |
352 else: | |
353 sniff.convert_newlines( dp ) | |
354 shutil.move( dp, os.path.join( files_path, name ) ) | |
355 # Move the dataset to its "real" path | |
356 shutil.move( dataset.primary_file, output_path ) | |
357 # Write the job info | |
358 info = dict( type = 'dataset', | |
359 dataset_id = dataset.dataset_id, | |
360 stdout = 'uploaded %s file' % dataset.file_type ) | |
361 json_file.write( to_json_string( info ) + "\n" ) | |
362 | |
363 def __main__(): | |
364 | |
365 if len( sys.argv ) < 4: | |
366 print >>sys.stderr, 'usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...' | |
367 sys.exit( 1 ) | |
368 | |
369 output_paths = parse_outputs( sys.argv[4:] ) | |
370 json_file = open( 'galaxy.json', 'w' ) | |
371 | |
372 registry = Registry( sys.argv[1], sys.argv[2] ) | |
373 | |
374 for line in open( sys.argv[3], 'r' ): | |
375 dataset = from_json_string( line ) | |
376 dataset = util.bunch.Bunch( **safe_dict( dataset ) ) | |
377 try: | |
378 output_path = output_paths[int( dataset.dataset_id )][0] | |
379 except: | |
380 print >>sys.stderr, 'Output path for dataset %s not found on command line' % dataset.dataset_id | |
381 sys.exit( 1 ) | |
382 if dataset.type == 'composite': | |
383 files_path = output_paths[int( dataset.dataset_id )][1] | |
384 add_composite_file( dataset, registry, json_file, output_path, files_path ) | |
385 else: | |
386 add_file( dataset, registry, json_file, output_path ) | |
387 # clean up paramfile | |
388 try: | |
389 os.remove( sys.argv[3] ) | |
390 except: | |
391 pass | |
392 | |
393 if __name__ == '__main__': | |
394 __main__() |