Mercurial > repos > damion > versioned_data
comparison data_stores/kipper.py @ 1:5c5027485f7d draft
Uploaded correct file
author | damion |
---|---|
date | Sun, 09 Aug 2015 16:07:50 -0400 |
parents | |
children | 269d246ce6d0 |
comparison
equal
deleted
inserted
replaced
0:d31a1bd74e63 | 1:5c5027485f7d |
---|---|
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 import subprocess | |
5 import datetime | |
6 import dateutil.parser as parser2 | |
7 import calendar | |
8 import optparse | |
9 import re | |
10 import os | |
11 import sys | |
12 from shutil import copy | |
13 import tempfile | |
14 import json | |
15 import glob | |
16 import gzip | |
17 | |
18 | |
19 | |
20 | |
21 CODE_VERSION = '1.0.0' | |
22 REGEX_NATURAL_SORT = re.compile('([0-9]+)') | |
23 KEYDB_LIST = 1 | |
24 KEYDB_EXTRACT = 2 | |
25 KEYDB_REVERT = 3 | |
26 KEYDB_IMPORT = 4 | |
27 | |
28 class MyParser(optparse.OptionParser): | |
29 """ | |
30 Provides a better class for displaying formatted help info. | |
31 From http://stackoverflow.com/questions/1857346/python-optparse-how-to-include-additional-info-in-usage-output. | |
32 """ | |
33 def format_epilog(self, formatter): | |
34 return self.epilog | |
35 | |
36 def stop_err( msg ): | |
37 sys.stderr.write("%s\n" % msg) | |
38 sys.exit(1) | |
39 | |
40 class Kipper(object): | |
41 | |
42 | |
43 def __init__(self): | |
44 # Provide defaults | |
45 self.db_master_file_name = None | |
46 self.db_master_file_path = None | |
47 self.metadata_file_path = None | |
48 self.db_import_file_path = None | |
49 self.output_file = None # By default, printed to stdout | |
50 self.volume_id = None | |
51 self.version_id = None # Note, this is natural #, starts from 1; | |
52 self.metadata = None | |
53 self.options = None | |
54 self.compression = '' | |
55 | |
56 _nowabout = datetime.datetime.utcnow() | |
57 self.dateTime = long(_nowabout.strftime("%s")) | |
58 | |
59 self.delim = "\t" | |
60 self.nl = "\n" | |
61 | |
62 | |
63 def __main__(self): | |
64 """ | |
65 Handles all command line options for creating kipper archives, and extracting or reverting to a version. | |
66 """ | |
67 options, args = self.get_command_line() | |
68 self.options = options | |
69 | |
70 if options.code_version: | |
71 print CODE_VERSION | |
72 return CODE_VERSION | |
73 | |
74 # *********************** Get Master kipper file *********************** | |
75 if not len(args): | |
76 stop_err('A Kipper database file name needs to be included as first parameter!') | |
77 | |
78 self.db_master_file_name = args[0] #accepts relative path with file name | |
79 | |
80 self.db_master_file_path = self.check_folder(self.db_master_file_name, "Kipper database file") | |
81 # db_master_file_path is used from now on; db_master_file_name is used just for metadata labeling. | |
82 # Adjust it to remove any relative path component. | |
83 self.db_master_file_name = os.path.basename(self.db_master_file_name) | |
84 | |
85 if os.path.isdir(self.db_master_file_path): | |
86 stop_err('Error: Kipper data file "%s" is actually a folder!' % (self.db_master_file_path) ) | |
87 | |
88 self.metadata_file_path = self.db_master_file_path + '.md' | |
89 | |
90 # Returns path but makes sure its folder is real. Must come before get_metadata() | |
91 self.output_file = self.check_folder(options.db_output_file_path) | |
92 | |
93 | |
94 # ************************* Get Metadata ****************************** | |
95 if options.initialize: | |
96 if options.compression: | |
97 self.compression = options.compression | |
98 | |
99 self.set_metadata(type=options.initialize, compression=self.compression) | |
100 | |
101 self.get_metadata(options); | |
102 | |
103 self.check_date_input(options) | |
104 | |
105 if options.version_id or (options.extract and options.version_index): | |
106 if options.version_index: | |
107 vol_ver = self.version_lookup(options.version_index) | |
108 | |
109 else: | |
110 # Note version_id info overrides any date input above. | |
111 vol_ver = self.get_version(options.version_id) | |
112 | |
113 if not vol_ver: | |
114 stop_err("Error: Given version number or name does not exist in this database") | |
115 | |
116 (volume, version) = vol_ver | |
117 self.volume_id = volume['id'] | |
118 self.version_id = version['id'] | |
119 self.dateTime = float(version['created']) | |
120 else: | |
121 # Use latest version by default | |
122 if not self.version_id and len(self.metadata['volumes'][-1]['versions']) > 0: | |
123 self.volume_id = self.metadata['volumes'][-1]['id'] | |
124 self.version_id = self.metadata['volumes'][-1]['versions'][-1]['id'] | |
125 | |
126 # ************************** Action triggers ************************** | |
127 | |
128 if options.volume == True: | |
129 # Add a new volume to the metadata | |
130 self.metadata_create_volume() | |
131 self.write_metadata(self.metadata) | |
132 | |
133 if options.db_import_file_path != None: | |
134 # Any time an import file is specified, this is the only action: | |
135 self.try_import_file(options) | |
136 return | |
137 | |
138 if options.metadata == True: | |
139 # Writes metadata to disk or stdout | |
140 self.write_metadata2(self.metadata) | |
141 return | |
142 | |
143 if options.extract == True: | |
144 # Defaults to pulling latest version | |
145 if not (self.version_id): | |
146 stop_err('Error: Please supply a version id (-n [number]) or date (-d [date]) to extract.') | |
147 | |
148 if self.output_file and os.path.isdir(self.output_file): | |
149 # A general output file name for the data store as a whole | |
150 output_name = self.metadata['file_name'] | |
151 if output_name == '': | |
152 # Get output file name from version's original import file_name | |
153 output_name = self.metadata['volumes'][self.volume_id-1]['versions'][self.version_id-1]['file_name'] | |
154 # But remove the .gz suffix if it is there (refactor later). | |
155 if output_name[-3:] == '.gz': | |
156 output_name = output_name[0:-3] | |
157 self.output_file = os.path.join(self.output_file, output_name) | |
158 | |
159 self.db_scan_action(KEYDB_EXTRACT) | |
160 return | |
161 | |
162 if options.revert == True: | |
163 if not (options.version_id or options.dateTime or options.unixTime): | |
164 stop_err('Error: Please supply a version id (-n [number]) or date (-d [date]) to revert to.') | |
165 | |
166 # Send database back to given revision | |
167 if self.output_file and self.output_file == os.path.dirname(self.db_master_file_path): | |
168 self.output_file = self.get_db_path() | |
169 self.db_scan_action(KEYDB_REVERT) | |
170 return | |
171 | |
172 # Default to list datastore versions | |
173 self.get_list() | |
174 | |
175 | |
176 def get_db_path(self, volume_id = None): | |
177 #Note: metadata must be established before this method is called. | |
178 if volume_id is None: volume_id = self.volume_id | |
179 return self.db_master_file_path + '_' + str(volume_id) + self.metadata['compression'] | |
180 | |
181 | |
182 def get_temp_output_file(self, action = None, path=None): | |
183 # Returns write handle (+name) of temp file. Returns gzip interface if compression is on. | |
184 if path == None: | |
185 path = self.output_file | |
186 | |
187 temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(path) ) | |
188 | |
189 # If compression is called for, then we have to switch to gzip handler on the temp name: | |
190 if action in [KEYDB_REVERT, KEYDB_IMPORT] and self.metadata['compression'] == '.gz': | |
191 temp.close() | |
192 temp = myGzipFile(temp.name, 'wb') | |
193 | |
194 return temp | |
195 | |
196 | |
197 def get_list(self): | |
198 volumes = self.metadata['volumes'] | |
199 for ptr in range(0, len(volumes)): | |
200 volume = volumes[ptr] | |
201 if ptr < len(volumes)-1: | |
202 ceiling = str(volumes[ptr+1]['floor_id'] - 1) | |
203 else: | |
204 ceiling = '' | |
205 print "Volume " + str(ptr+1) + ", Versions " + str(volume['floor_id']) + "-" + ceiling | |
206 | |
207 for version in volume['versions']: | |
208 print str(version['id']) + ": " + self.dateISOFormat(float(version['created'])) + '_v' + version['name'] | |
209 | |
210 | |
211 def set_metadata(self, type='text', compression=''): | |
212 """ | |
213 Request to initialize metadata file | |
214 Output metadata to stdio or to -o output file by way of temp file. | |
215 If one doesn't include -o, then output goes to stdio; | |
216 If one includes only -o, then output overwrites .md file. | |
217 If one includes -o [filename] output overwrites [filename] | |
218 | |
219 Algorithm processes each line as it comes in database. This means there | |
220 is no significance to the version_ids ordering; earlier items in list can | |
221 in fact be later versions of db. So must resort and re-assign ids in end. | |
222 @param type string text or fasta etc. | |
223 """ | |
224 if os.path.isfile(self.metadata_file_path): | |
225 stop_err('Error: Metadata file "%s" exists. You must remove it before generating a new one.' % (self.metadata_file_path) ) | |
226 | |
227 self.metadata_create(type, compression) | |
228 | |
229 volumes = glob.glob(self.db_master_file_path + '_[0-9]*') | |
230 | |
231 volumes.sort(key=lambda x: natural_sort_key(x)) | |
232 for volume in volumes: | |
233 # Note: scanned volumes must be consecutive from 1. No error detection yet. | |
234 self.metadata_create_volume(False) | |
235 versions = self.metadata['volumes'][-1]['versions'] | |
236 import_modified = os.path.getmtime(volume) | |
237 dbReader = bigFileReader(volume) | |
238 version_ids = [] | |
239 db_key_value = dbReader.read() | |
240 old_key = '' | |
241 while db_key_value: | |
242 | |
243 (created_vid, deleted_vid, db_key, restofline) = db_key_value.split(self.delim, 3) | |
244 version = versions[self.version_dict_lookup(version_ids, long(created_vid), import_modified)] | |
245 version['rows'] +=1 | |
246 if old_key != db_key: | |
247 version['keys'] +=1 | |
248 old_key = db_key | |
249 | |
250 version['inserts'] += 1 | |
251 if deleted_vid: | |
252 version = versions[self.version_dict_lookup(version_ids, long(deleted_vid), import_modified)] | |
253 version['deletes'] += 1 | |
254 | |
255 db_key_value = dbReader.read() | |
256 | |
257 # Reorder, and reassign numeric version ids: | |
258 versions.sort(key=lambda x: x['id']) | |
259 for ptr, version in enumerate(versions): | |
260 version['id'] = ptr+1 | |
261 | |
262 # If first master db volume doesn't exist, then this is an initialization situation | |
263 if len(volumes) == 0: | |
264 self.metadata_create_volume() | |
265 self.create_volume_file() | |
266 | |
267 with open(self.metadata_file_path,'w') as metadata_handle: | |
268 metadata_handle.write(json.dumps(self.metadata, sort_keys=True, indent=4, separators=(',', ': '))) | |
269 | |
270 return True | |
271 | |
272 | |
273 def get_metadata(self, options): | |
274 """ | |
275 Read in json metadata from file, and set file processor [fasta|text] engine accordingly. | |
276 """ | |
277 | |
278 if not os.path.isfile(self.metadata_file_path): | |
279 #stop_err('Error: Metadata file "%s" does not exist. You must regenerate it with the -m option before performing other actions.' % (self.metadata_file_path) ) | |
280 stop_err('Error: Unable to locate the "%s" metadata file. It should accompany the "%s" file. Use the -M parameter to initialize or regenerate the basic file.' % (self.metadata_file_path, self.db_master_file_name) ) | |
281 | |
282 with open(self.metadata_file_path,'r') as metadata_handle: | |
283 self.metadata = json.load(metadata_handle) | |
284 | |
285 # ******************* Select Kipper Pre/Post Processor ********************** | |
286 # FUTURE: More processor options here - including custom ones referenced in metadata | |
287 if self.metadata['type'] == 'fasta': | |
288 self.processor = VDBFastaProcessor() # for fasta sequence databases | |
289 else: | |
290 self.processor = VDBProcessor() # default text | |
291 | |
292 # Handle any JSON metadata defaults here for items that aren't present in previous databases. | |
293 if not 'compression' in self.metadata: | |
294 self.metadata['compression'] = '' | |
295 | |
296 | |
297 def write_metadata(self, content): | |
298 """ | |
299 Called when data store changes occur (revert and import). | |
300 If they are going to stdout then don't stream metadata there too. | |
301 """ | |
302 if self.output_file: self.write_metadata2(content) | |
303 | |
304 | |
305 def write_metadata2(self,content): | |
306 | |
307 with (open(self.metadata_file_path,'w') if self.output_file else sys.stdout) as metadata_handle: | |
308 metadata_handle.write(json.dumps(content, sort_keys=True, indent=4, separators=(',', ': '))) | |
309 | |
310 | |
311 def metadata_create(self, type, compression, floor_id=1): | |
312 """ | |
313 Initial metadata structure | |
314 """ | |
315 file_name = self.db_master_file_name.rsplit('.',1) | |
316 self.metadata = { | |
317 'version': CODE_VERSION, | |
318 'name': self.db_master_file_name, | |
319 'db_file_name': self.db_master_file_name, | |
320 # A guess about what best base file name would be to write versions out as | |
321 'file_name': file_name[0] + '.' + type, | |
322 'type': type, | |
323 'description': '', | |
324 'processor': '', # Processing that overrides type-matched processor. | |
325 'compression': self.compression, | |
326 'volumes': [] | |
327 } | |
328 | |
329 | |
330 def metadata_create_volume(self, file_create = True): | |
331 # Only add a volume if previous volume has at least 1 version in it. | |
332 if len(self.metadata['volumes']) == 0 or len(self.metadata['volumes'][-1]['versions']) > 0: | |
333 id = len(self.metadata['volumes']) + 1 | |
334 volume = { | |
335 'floor_id': self.get_last_version()+1, | |
336 'id': id, | |
337 'versions': [] | |
338 } | |
339 self.metadata['volumes'].append(volume) | |
340 self.volume_id = id | |
341 if file_create: | |
342 self.create_volume_file() | |
343 | |
344 | |
345 return id | |
346 | |
347 else: | |
348 stop_err("Error: Didn't create a new volume because last one is empty already.") | |
349 | |
350 | |
351 def create_volume_file(self): | |
352 | |
353 if self.metadata['compression'] == '.gz': | |
354 gzip.open(self.get_db_path(), 'wb').close() | |
355 else: | |
356 open(self.get_db_path(),'w').close() | |
357 | |
358 | |
359 def metadata_create_version(self, mydate, file_name = '', file_size = 0, version_name = None): | |
360 id = self.get_last_version()+1 | |
361 if version_name == None: | |
362 version_name = str(id) | |
363 | |
364 version = { | |
365 'id': id, | |
366 'created': mydate, | |
367 'name': version_name, | |
368 'file_name': file_name, | |
369 'file_size': file_size, | |
370 'inserts': 0, | |
371 'deletes': 0, | |
372 'rows': 0, | |
373 'keys': 0 | |
374 } | |
375 self.metadata['volumes'][-1]['versions'].append(version) | |
376 | |
377 return version | |
378 | |
379 | |
380 def get_version(self, version_id = None): | |
381 if version_id is None: | |
382 version_id = self.version_id | |
383 | |
384 for volume in self.metadata['volumes']: | |
385 for version in volume['versions']: | |
386 if version_id == version['id']: | |
387 return (volume, version) | |
388 | |
389 return False | |
390 | |
391 | |
392 def version_lookup(self, version_name): | |
393 for volume in self.metadata['volumes']: | |
394 for version in volume['versions']: | |
395 if version_name == version['name']: | |
396 return (volume, version) | |
397 | |
398 return False | |
399 | |
400 | |
401 def version_dict_lookup(self, version_ids, id, timestamp = None): | |
402 if id not in version_ids: | |
403 version_ids.append(id) | |
404 version = self.metadata_create_version(timestamp) | |
405 | |
406 return version_ids.index(id) | |
407 | |
408 | |
409 #****************** Methods Involving Scan of Master Kipper file ********************** | |
410 | |
411 def db_scan_action (self, action): | |
412 """ | |
413 #Python 2.6 needs this reopened if it was previously closed. | |
414 #sys.stdout = open("/dev/stdout", "w") | |
415 """ | |
416 dbReader = bigFileReader(self.get_db_path()) | |
417 # Setup temp file: | |
418 if self.output_file: | |
419 temp_file = self.get_temp_output_file(action=action) | |
420 | |
421 # Use temporary file so that db_output_file_path switches to new content only when complete | |
422 with (temp_file if self.output_file else sys.stdout) as output: | |
423 db_key_value = dbReader.read() | |
424 | |
425 while db_key_value: | |
426 if action == KEYDB_EXTRACT: | |
427 okLines = self.version_extract(db_key_value) | |
428 | |
429 elif action == KEYDB_REVERT: | |
430 okLines = self.version_revert(db_key_value) | |
431 | |
432 if okLines: | |
433 output.writelines(okLines) | |
434 | |
435 db_key_value = dbReader.read() | |
436 | |
437 # Issue: metadata lock while quick update with output_file??? | |
438 if self.output_file: | |
439 if action == KEYDB_EXTRACT: | |
440 self.processor.postprocess_file(temp_file.name) | |
441 | |
442 # Is there a case where we fail to get to this point? | |
443 os.rename(temp_file.name, self.output_file) | |
444 | |
445 if action == KEYDB_REVERT: | |
446 # When reverting, clear all volumes having versions > self.version_id | |
447 # Takes out volume structure too. | |
448 volumes = self.metadata['volumes'] | |
449 for volptr in range(len(volumes)-1, -1, -1): | |
450 volume = volumes[volptr] | |
451 if volume['floor_id'] > self.version_id: #TO REVERT IS TO KILL ALL LATER VOLUMES. | |
452 os.remove(self.get_db_path(volume['id'])) | |
453 versions = volume['versions'] | |
454 for verptr in range(len(versions)-1, -1, -1): | |
455 if versions[verptr]['id'] > self.version_id: | |
456 popped = versions.pop(verptr) | |
457 if len(versions) == 0 and volptr > 0: | |
458 volumes.pop(volptr) | |
459 | |
460 self.write_metadata(self.metadata) | |
461 | |
462 | |
463 def db_scan_line(self, db_key_value): | |
464 """ | |
465 FUTURE: transact_code will signal how key/value should be interpreted, to | |
466 allow for differential change storage from previous entries. | |
467 """ | |
468 # (created_vid, deleted_vid, transact_code, restofline) = db_key_value.split(self.delim,3) | |
469 (created_vid, deleted_vid, restofline) = db_key_value.split(self.delim,2) | |
470 if deleted_vid: deleted_vid = long(deleted_vid) | |
471 return (long(created_vid), deleted_vid, restofline) | |
472 | |
473 | |
474 def version_extract(self, db_key_value): | |
475 (created_vid, deleted_vid, restofline) = self.db_scan_line(db_key_value) | |
476 | |
477 if created_vid <= self.version_id and (not deleted_vid or deleted_vid > self.version_id): | |
478 return self.processor.postprocess_line(restofline) | |
479 | |
480 return False | |
481 | |
482 | |
483 def version_revert(self, db_key_value): | |
484 """ | |
485 Reverting database here. | |
486 """ | |
487 (created_vid, deleted_vid, restofline) = self.db_scan_line(db_key_value) | |
488 | |
489 if created_vid <= self.version_id: | |
490 if (not deleted_vid) or deleted_vid <= self.version_id: | |
491 return [str(created_vid) + self.delim + str(deleted_vid) + self.delim + restofline] | |
492 else: | |
493 return [str(created_vid) + self.delim + self.delim + restofline] | |
494 return False | |
495 | |
496 | |
497 def check_date_input(self, options): | |
498 """ | |
499 """ | |
500 if options.unixTime != None: | |
501 try: | |
502 _userTime = float(options.unixTime) | |
503 # if it is not a float, triggers exception | |
504 except ValueError: | |
505 stop_err("Given Unix time could not be parsed [" + options.unixTime + "]. Format should be [integer]") | |
506 | |
507 elif options.dateTime != None: | |
508 | |
509 try: | |
510 _userTime = parse_date(options.dateTime) | |
511 | |
512 except ValueError: | |
513 stop_err("Given date could not be parsed [" + options.dateTime + "]. Format should include at least the year, and any of the other more granular parts, in order: YYYY/MM/DD [H:M:S AM/PM]") | |
514 | |
515 else: | |
516 return False | |
517 | |
518 _dtobject = datetime.datetime.fromtimestamp(float(_userTime)) # | |
519 self.dateTime = long(_dtobject.strftime("%s")) | |
520 | |
521 | |
522 # Now see if we can set version_id by it. We look for version_id that has created <= self.dateTime | |
523 for volume in self.metadata['volumes']: | |
524 for version in volume['versions']: | |
525 if version['created'] <= self.dateTime: | |
526 self.version_id = version['id'] | |
527 self.volume_id = volume['id'] | |
528 else: | |
529 break | |
530 | |
531 return True | |
532 | |
533 | |
534 def check_folder(self, file_path, message = "Output directory for "): | |
535 """ | |
536 Ensures file folder path for output file exists. | |
537 We don't want to create output in a mistaken location. | |
538 """ | |
539 if file_path != None: | |
540 | |
541 path = os.path.normpath(file_path) | |
542 if not os.path.isdir(os.path.dirname(path)): | |
543 # Not an absolute path, so try default folder where script launched from: | |
544 path = os.path.normpath(os.path.join(os.getcwd(), path) ) | |
545 if not os.path.isdir(os.path.dirname(path)): | |
546 stop_err(message + "[" + path + "] does not exist!") | |
547 | |
548 return path | |
549 return None | |
550 | |
551 | |
552 def check_file_path(self, file, message = "File "): | |
553 | |
554 path = os.path.normpath(file) | |
555 # make sure any relative paths are converted to absolute ones | |
556 if not os.path.isdir(os.path.dirname(path)) or not os.path.isfile(path): | |
557 # Not an absolute path, so try default folder where script was called: | |
558 path = os.path.normpath(os.path.join(os.getcwd(),path) ) | |
559 if not os.path.isfile(path): | |
560 stop_err(message + "[" + path + "] doesn't exist!") | |
561 return path | |
562 | |
563 | |
564 def try_import_file(self, options): | |
565 """ | |
566 Create new version from comparison of import data file against Kipper | |
567 Note "-o ." parameter enables writing back to master database. | |
568 """ | |
569 self.db_import_file_path = self.check_file_path(options.db_import_file_path, "Import data file ") | |
570 | |
571 check_file = self.processor.preprocess_validate_file(self.db_import_file_path) | |
572 if not check_file: | |
573 stop_err("Import data file isn't sorted or composed correctly!") | |
574 | |
575 # SET version date to creation date of import file. | |
576 import_modified = os.path.getmtime(self.db_import_file_path) | |
577 | |
578 original_name = os.path.basename(self.db_import_file_path) | |
579 # creates a temporary file, which has conversion into 1 line key-value records | |
580 temp = self.processor.preprocess_file(self.db_import_file_path) | |
581 if (temp): | |
582 | |
583 self.db_import_file_path = temp.name | |
584 | |
585 self.import_file(original_name, import_modified, options.version_index) | |
586 | |
587 os.remove(temp.name) | |
588 | |
589 | |
590 def import_file(self, file_name, import_modified, version_index = None): | |
591 """ | |
592 Imports from an import file (or temp file if transformation done above) to | |
593 temp Kipper version which is copied over to main database on completion. | |
594 | |
595 Import algorithm only works if the import file is already sorted in the same way as the Kipper database file | |
596 | |
597 @uses self.db_import_file_path string A file full of one line key[tab]value records. | |
598 @uses self.output_file string A file to save results in. If empty, then stdio. | |
599 | |
600 @uses dateTime string Date time to mark created/deleted records by. | |
601 @puses delim char Separator between key/value pairs.ake it the function. | |
602 | |
603 @param file_name name of file being imported. This is stored in version record so that output file will be the same. | |
604 """ | |
605 delim = self.delim | |
606 | |
607 | |
608 file_size = os.path.getsize(self.db_import_file_path) | |
609 if version_index == None: | |
610 version_index = str(self.get_last_version()+1) | |
611 | |
612 self.volume_id = self.metadata['volumes'][-1]['id'] #For get_db_path() call below. | |
613 | |
614 if self.output_file: | |
615 temp_file = self.get_temp_output_file(action=KEYDB_IMPORT, path=self.get_db_path()) | |
616 | |
617 # We want to update database here when output file is db itself. | |
618 if os.path.isdir(self.output_file): | |
619 self.output_file = self.get_db_path() | |
620 | |
621 version = self.metadata_create_version(import_modified, file_name, file_size, version_index) | |
622 version_id = str(version['id']) | |
623 | |
624 with (temp_file if self.output_file else sys.stdout) as outputFile : | |
625 dbReader = bigFileReader(self.get_db_path()) | |
626 importReader = bigFileReader(self.db_import_file_path) | |
627 old_import_key='' | |
628 | |
629 while True: | |
630 | |
631 db_key_value = dbReader.turn() | |
632 #if import_key_value | |
633 import_key_value = importReader.turn() | |
634 | |
635 # Skip empty or whitespace lines: | |
636 if import_key_value and len(import_key_value.lstrip()) == 0: | |
637 import_key_value = importReader.read() | |
638 continue | |
639 | |
640 if not db_key_value: # eof | |
641 while import_key_value: # Insert remaining import lines: | |
642 (import_key, import_value) = self.get_key_value(import_key_value) | |
643 outputFile.write(version_id + delim + delim + import_key + delim + import_value) | |
644 import_key_value = importReader.read() | |
645 version['inserts'] += 1 | |
646 version['rows'] += 1 | |
647 | |
648 if import_key != old_import_key: | |
649 version['keys'] += 1 | |
650 old_import_key = import_key | |
651 | |
652 break # Both inputs are eof, so exit | |
653 | |
654 elif not import_key_value: # db has key that import file no longer has, so mark each subsequent db line as a delete of the key (if it isn't already) | |
655 while db_key_value: | |
656 (created_vid, deleted_vid, dbKey, dbValue) = db_key_value.split(delim,3) | |
657 version['rows'] += 1 | |
658 | |
659 if deleted_vid: | |
660 outputFile.write(db_key_value) | |
661 else: | |
662 outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue) | |
663 version['deletes'] += 1 | |
664 | |
665 db_key_value = dbReader.read() | |
666 break | |
667 | |
668 else: | |
669 (import_key, import_value) = self.get_key_value(import_key_value) | |
670 (created_vid, deleted_vid, dbKey, dbValue) = db_key_value.split(delim,3) | |
671 | |
672 if import_key != old_import_key: | |
673 version['keys'] += 1 | |
674 old_import_key = import_key | |
675 | |
676 # All cases below lead to writing a row ... | |
677 version['rows'] += 1 | |
678 | |
679 if import_key == dbKey: | |
680 # When the keys match, we have enough information to act on the current db_key_value content; | |
681 # therefore ensure on next pass that we read it. | |
682 dbReader.step() | |
683 | |
684 if import_value == dbValue: | |
685 outputFile.write(db_key_value) | |
686 | |
687 # All past items marked with insert will also have a delete. Step until we find one | |
688 # not marked as a delete... or a new key. | |
689 if deleted_vid: # Good to go in processing next lines in both files. | |
690 pass | |
691 else: | |
692 importReader.step() | |
693 | |
694 else: # Case where value changed - so process all db_key_values until key no longer matches. | |
695 | |
696 # Some future pass will cause import line to be written to db | |
697 # (when key mismatch occurs) as long as we dont advance it (prematurely). | |
698 if deleted_vid: | |
699 #preserve deletion record. | |
700 outputFile.write(db_key_value) | |
701 | |
702 else: | |
703 # Mark record deletion | |
704 outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue) | |
705 version['deletes'] += 1 | |
706 # Then advance since new key/value means new create | |
707 | |
708 else: | |
709 # Natural sort doesn't do text sort on numeric parts, ignores capitalization. | |
710 dbKeySort = natural_sort_key(dbKey) | |
711 import_keySort = natural_sort_key(import_key) | |
712 # False if dbKey less; Means db key is no longer in sync db, | |
713 if cmp(dbKeySort, import_keySort) == -1: | |
714 | |
715 if deleted_vid: #Already marked as a delete | |
716 outputFile.write(db_key_value) | |
717 | |
718 else: # Write dbKey as a new delete | |
719 outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue) | |
720 version['deletes'] += 1 | |
721 # Advance ... there could be another db_key_value for deletion too. | |
722 dbReader.step() | |
723 | |
724 else: #DB key is greater, so insert import_key,import_value in db. | |
725 # Write a create record | |
726 outputFile.write(version_id + delim + delim + import_key + delim + import_value) | |
727 version['inserts'] += 1 | |
728 importReader.step() # Now compare next two candidates. | |
729 | |
730 if self.output_file: | |
731 # Kipper won't write an empty version - since this is usually a mistake. | |
732 # If user has just added new volume though, then slew of inserts will occur | |
733 # even if version is identical to tail end of previous volume version. | |
734 if version['inserts'] > 0 or version['deletes'] > 0: | |
735 #print "Temp file:" + temp_file.name | |
736 os.rename(temp_file.name, self.output_file) | |
737 self.write_metadata(self.metadata) | |
738 else: | |
739 os.remove(temp_file.name) | |
740 | |
741 | |
742 def get_last_version(self): | |
743 """ | |
744 Returns first Volume version counting from most recent. | |
745 Catch is that some volume might be empty, so have to go to previous one | |
746 """ | |
747 for ptr in range(len(self.metadata['volumes'])-1, -1, -1): | |
748 versions = self.metadata['volumes'][ptr]['versions'] | |
749 if len(versions) > 0: | |
750 return versions[-1]['id'] | |
751 | |
752 return 0 | |
753 | |
754 | |
755 # May want to move this to individual data store processor since it can be sensitive to different kinds of whitespace then. | |
756 def get_key_value(self, key_value): | |
757 # ACCEPTS SPLIT AT ANY WHITESPACE PAST KEY BY DEFAULT | |
758 kvparse = key_value.split(None,1) | |
759 #return (key_value[0:kvptr], key_value[kvptr:].lstrip()) | |
760 return (kvparse[0], kvparse[1] if len(kvparse) >1 else '') | |
761 | |
762 | |
763 def dateISOFormat(self, atimestamp): | |
764 return datetime.datetime.isoformat(datetime.datetime.fromtimestamp(atimestamp)) | |
765 | |
766 | |
767 def get_command_line(self): | |
768 """ | |
769 *************************** Parse Command Line ***************************** | |
770 | |
771 """ | |
772 parser = MyParser( | |
773 description = 'Maintains versions of a file-based database with comparison to full-copy import file updates.', | |
774 usage = 'kipper.py [kipper database file] [options]*', | |
775 epilog=""" | |
776 | |
777 All outputs go to stdout and affect no change in Kipper database unless the '-o' parameter is supplied. (The one exception to this is when the -M regenerate metadata command is provided, as described below.) Thus by default one sees what would happen if an action were taken, but must take an additional step to affect the data. | |
778 | |
779 '-o .' is a special request that leads to: | |
780 * an update of the Kipper database for --import or --revert actions | |
781 * an update of the .md file for -M --rebuild action | |
782 | |
783 As well, when -o parameter is a path, and not a specific filename, then kipper.py looks up what the appropriate output file name is according to the metadata file. | |
784 | |
785 USAGE | |
786 | |
787 Initialize metadata file and Kipper file. | |
788 kipper.py [database file] -M --rebuild [type of database:text|fasta] | |
789 | |
790 View metadata (json) file. | |
791 kipper.py [database file] -m --metadata | |
792 | |
793 Import key/value inserts/deletes based on import file. (Current date used). | |
794 kipper.py [database file] -i --import [import file] | |
795 e.g. | |
796 kipper.py cpn60 -i sequences.fasta # outputs new master database to stdout; doesn't rewrite it. | |
797 kipper.py cpn60 -i sequences.fasta -o . # rewrites cpn60 with new version added. | |
798 | |
799 Extract a version of the file based on given date/time | |
800 kipper.py [database file] -e --extract -d datetime -o [output file] | |
801 | |
802 Extract a version of the file based on given version Id | |
803 kipper.py [database file] -e --extract -n [version id] -o [output file] | |
804 | |
805 List versions of dbFile key/value pairs (by date/time) | |
806 kipper.py [database file] | |
807 kipper.py [database file] -l --list | |
808 | |
809 Have database revert to previous version. Drops future records, unmarks corresponding deletes. | |
810 kipper.py [database file] -r --revert -d datetime -o [output file] | |
811 | |
812 Return version of this code: | |
813 kipper.py -v --version | |
814 """) | |
815 | |
816 # Data/Metadata changing actions | |
817 parser.add_option('-M', '--rebuild', type='choice', dest='initialize', choices=['text','fasta'], | |
818 help='(Re)generate metadata file [name of db].md . Provide the type of db [text|fasta| etc.].') | |
819 | |
820 parser.add_option('-i', '--import', type='string', dest='db_import_file_path', | |
821 help='Import key/value inserts/deletes based on delta comparison with import file') | |
822 | |
823 parser.add_option('-e', '--extract', dest='extract', default=False, action='store_true', | |
824 help='Extract a version of the file based on given date/time') | |
825 | |
826 parser.add_option('-r', '--revert', dest='revert', default=False, action='store_true', | |
827 help='Have database revert to previous version (-d date/time required). Drops future records, unmarks corresponding deletes.') | |
828 | |
829 parser.add_option('-V', '--volume', dest='volume', default=False, action='store_true', | |
830 help='Add a new volume to the metadata. New imports will be added here.') | |
831 | |
832 # Passive actions | |
833 parser.add_option('-m', '--metadata', dest='metadata', default=False, action='store_true', | |
834 help='View metadata file [name of db].md') | |
835 | |
836 parser.add_option('-l', '--list', dest='list', default=False, action='store_true', | |
837 help='List versions of dbFile key/value pairs (by date/time)') | |
838 | |
839 parser.add_option('-c', '--compression', dest='compression', type='choice', choices=['.gz'], | |
840 help='Enable compression of database. options:[.gz]') | |
841 | |
842 # Used "v" for standard code version identifier. | |
843 parser.add_option('-v', '--version', dest='code_version', default=False, action='store_true', | |
844 help='Return version of kipper.py code.') | |
845 | |
846 parser.add_option('-o', '--output', type='string', dest='db_output_file_path', | |
847 help='Output to this file. Default is to stdio') | |
848 | |
849 parser.add_option('-I', '--index', type='string', dest='version_index', | |
850 help='Provide title (index) e.g. "1.4" of version being imported/extracted.') | |
851 | |
852 parser.add_option('-d', '--date', type='string', dest='dateTime', | |
853 help='Provide date/time for sync, extract or revert operations. Defaults to now.') | |
854 parser.add_option('-u', '--unixTime', type='int', dest='unixTime', | |
855 help='Provide Unix time (integer) for sync, extract or revert operations.') | |
856 parser.add_option('-n', '--number', type='int', dest='version_id', | |
857 help='Provide a version id to extract or revert to.') | |
858 | |
859 return parser.parse_args() | |
860 | |
861 | |
862 class VDBProcessor(object): | |
863 | |
864 delim = '\t' | |
865 nl = '\n' | |
866 | |
867 #def preprocess_line(self, line): | |
868 # return [line] | |
869 | |
870 def preprocess_file(self, file_path): | |
871 temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(file_path) ) | |
872 copy (file_path, temp.name) | |
873 temp.close() | |
874 sort_a = subprocess.call(['sort','-sfV','-t\t','-k1,1', '-o',temp.name, temp.name]) | |
875 return temp #Enables temp file name to be used by caller. | |
876 | |
877 | |
878 def preprocess_validate_file(self, file_path): | |
879 | |
880 # Do import file preprocessing: | |
881 # 1) Mechanism to verify if downloaded file is complete - check md5 hash? | |
882 # 4) Could test file.newlines(): returns \r, \n, \r\n if started to read file (1st line). | |
883 # 5) Could auto-uncompress .tar.gz, bz2 etc. | |
884 # Ensures "[key] [value]" entries are sorted | |
885 # "sort --check ..." returns nothing if sorted, or e.g "sort: sequences_A.fastx.sorted:12: disorder: >114 AJ009959.1 … " | |
886 | |
887 # if not subprocess.call(['sort','--check','-V',db_import_file_path]): #very fast check | |
888 # subprocess.call(['sort','-V',db_import_file_path]): | |
889 | |
890 return True | |
891 | |
892 def postprocess_line(self, line): | |
893 #Lines are placed in array so that one can map to many in output file | |
894 return [line] | |
895 | |
896 def postprocess_file(self, file_path): | |
897 return False | |
898 | |
899 def sort(self, a, b): | |
900 pass | |
901 | |
902 | |
903 class VDBFastaProcessor(VDBProcessor): | |
904 | |
905 | |
906 def preprocess_file(self, file_path): | |
907 """ | |
908 Converts input fasta data into one line tab-delimited record format, then sorts. | |
909 """ | |
910 temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(file_path) ) | |
911 fileReader = bigFileReader(file_path) | |
912 line = fileReader.read() | |
913 old_line = '' | |
914 while line: | |
915 line = line.strip() | |
916 if len(line) > 0: | |
917 | |
918 if line[0] == '>': | |
919 if len(old_line): | |
920 temp.write(old_line + self.nl) | |
921 lineparse = line.split(None,1) | |
922 key = lineparse[0].strip() | |
923 if len(lineparse) > 1: | |
924 description = lineparse[1].strip().replace(self.delim, ' ') | |
925 else: | |
926 description = '' | |
927 old_line = key[1:] + self.delim + description + self.delim | |
928 | |
929 else: | |
930 old_line = old_line + line | |
931 | |
932 line = fileReader.read() | |
933 | |
934 if len(old_line)>0: | |
935 temp.write(old_line+self.nl) | |
936 | |
937 temp.close() | |
938 | |
939 # Is this a consideration for natural sort in Python vs bash sort?: | |
940 # *** WARNING *** The locale specified by the environment affects sort order. | |
941 # Set LC_ALL=C to get the traditional sort order that uses native byte values. | |
942 #-s stable; -f ignore case; V natural sort (versioning) ; -k column, -t tab delimiter | |
943 sort_a = subprocess.call(['sort', '-sfV', '-t\t', '-k1,1', '-o',temp.name, temp.name]) | |
944 | |
945 return temp #Enables temp file name to be used by caller. | |
946 | |
947 | |
948 def postprocess_line(self, line): | |
949 """ | |
950 Transform Kipper fasta 1 line format key/value back into output file line(s) - an array | |
951 | |
952 @param line string containing [accession id][TAB][description][TAB][fasta sequence] | |
953 @return string containing lines each ending with newline, except end. | |
954 """ | |
955 line_data = line.split('\t',2) | |
956 # Set up ">[accession id] [description]\n" : | |
957 fasta_header = '>' + ' '.join(line_data[0:2]) + '\n' | |
958 # Put fasta sequences back into multi-line; note trailing item has newline. | |
959 sequences= self.split_len(line_data[2],80) | |
960 if len(sequences) and sequences[-1].strip() == '': | |
961 sequences[-1] = '' | |
962 | |
963 return fasta_header + '\n'.join(sequences) | |
964 | |
965 | |
966 def split_len(self, seq, length): | |
967 return [seq[i:i+length] for i in range(0, len(seq), length)] | |
968 | |
969 | |
970 class bigFileReader(object): | |
971 """ | |
972 This provides some advantage over reading line by line, and as well has a system | |
973 for skipping/not advancing reads - it has a memory via "take_step" about whether | |
974 it should advance or not - this is used when the master database and the import | |
975 database are feeding lines into a new database. | |
976 | |
977 Interestingly, using readlines() with byte hint parameter less | |
978 than file size improves performance by at least 30% over readline(). | |
979 | |
980 FUTURE: Adjust buffer lines dynamically based on file size/lines ratio? | |
981 """ | |
982 | |
983 def __init__(self, filename): | |
984 self.lines = [] | |
985 # This simply allows any .gz repository to be opened | |
986 # It isn't connected to the Kipper metadata['compression'] feature. | |
987 if filename[-3:] == '.gz': | |
988 self.file = gzip.open(filename,'rb') | |
989 else: | |
990 self.file = open(filename, 'rb', 1) | |
991 | |
992 self.line = False | |
993 self.take_step = True | |
994 self.buffer_size=1000 # Number of lines to read into buffer. | |
995 | |
996 | |
997 def turn(self): | |
998 """ | |
999 When accessing bigFileReader via turn mechanism, we get current line if no step; | |
1000 otherwise with step we read new line. | |
1001 """ | |
1002 if self.take_step == True: | |
1003 self.take_step = False | |
1004 return self.read() | |
1005 return self.line | |
1006 | |
1007 | |
1008 def read(self): | |
1009 if len(self.lines) == 0: | |
1010 self.lines = self.file.readlines(self.buffer_size) | |
1011 if len(self.lines) > 0: | |
1012 self.line = self.lines.pop(0) | |
1013 #if len(self.lines) == 0: | |
1014 # self.lines = self.file.readlines(self.buffer_size) | |
1015 #make sure each line doesn't include carriage return | |
1016 return self.line | |
1017 | |
1018 return False | |
1019 | |
1020 | |
1021 def readlines(self): | |
1022 """ | |
1023 Small efficiency: | |
1024 A test on self.lines after readLines() call can control loop. | |
1025 Bulk write of remaining buffer; ensures lines array isn't copied | |
1026 but is preserved when self.lines is removed | |
1027 """ | |
1028 self.line = False | |
1029 if len(self.lines) == 0: | |
1030 self.lines = self.file.readlines(self.buffer_size) | |
1031 if len(self.lines) > 0: | |
1032 shallowCopy = self.lines[:] | |
1033 self.lines = self.file.readlines(self.buffer_size) | |
1034 return shallowCopy | |
1035 return False | |
1036 | |
1037 | |
1038 def step(self): | |
1039 self.take_step = True | |
1040 | |
1041 | |
1042 | |
1043 # Enables use of with ... syntax. See https://mail.python.org/pipermail/tutor/2009-November/072959.html | |
1044 class myGzipFile(gzip.GzipFile): | |
1045 def __enter__(self): | |
1046 if self.fileobj is None: | |
1047 raise ValueError("I/O operation on closed GzipFile object") | |
1048 return self | |
1049 | |
1050 def __exit__(self, *args): | |
1051 self.close() | |
1052 | |
1053 | |
1054 def natural_sort_key(s, _nsre = REGEX_NATURAL_SORT): | |
1055 return [int(text) if text.isdigit() else text.lower() | |
1056 for text in re.split(_nsre, s)] | |
1057 | |
1058 | |
1059 def generic_linux_sort(self): | |
1060 import locale | |
1061 locale.setlocale(locale.LC_ALL, "C") | |
1062 yourList.sort(cmp=locale.strcoll) | |
1063 | |
1064 | |
1065 def parse_date(adate): | |
1066 """ | |
1067 Convert human-entered time into linux integer timestamp | |
1068 | |
1069 @param adate string Human entered date to parse into linux time | |
1070 | |
1071 @return integer Linux time equivalent or 0 if no date supplied | |
1072 """ | |
1073 adate = adate.strip() | |
1074 if adate > '': | |
1075 adateP = parser2.parse(adate, fuzzy=True) | |
1076 #dateP2 = time.mktime(adateP.timetuple()) | |
1077 # This handles UTC & daylight savings exactly | |
1078 return calendar.timegm(adateP.timetuple()) | |
1079 return 0 | |
1080 | |
1081 | |
1082 if __name__ == '__main__': | |
1083 | |
1084 kipper = Kipper() | |
1085 kipper.__main__() | |
1086 |