comparison data_stores/kipper.py @ 1:5c5027485f7d draft

Uploaded correct file
author damion
date Sun, 09 Aug 2015 16:07:50 -0400
parents
children 269d246ce6d0
comparison
equal deleted inserted replaced
0:d31a1bd74e63 1:5c5027485f7d
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 import subprocess
5 import datetime
6 import dateutil.parser as parser2
7 import calendar
8 import optparse
9 import re
10 import os
11 import sys
12 from shutil import copy
13 import tempfile
14 import json
15 import glob
16 import gzip
17
18
19
20
21 CODE_VERSION = '1.0.0'
22 REGEX_NATURAL_SORT = re.compile('([0-9]+)')
23 KEYDB_LIST = 1
24 KEYDB_EXTRACT = 2
25 KEYDB_REVERT = 3
26 KEYDB_IMPORT = 4
27
28 class MyParser(optparse.OptionParser):
29 """
30 Provides a better class for displaying formatted help info.
31 From http://stackoverflow.com/questions/1857346/python-optparse-how-to-include-additional-info-in-usage-output.
32 """
33 def format_epilog(self, formatter):
34 return self.epilog
35
36 def stop_err( msg ):
37 sys.stderr.write("%s\n" % msg)
38 sys.exit(1)
39
40 class Kipper(object):
41
42
43 def __init__(self):
44 # Provide defaults
45 self.db_master_file_name = None
46 self.db_master_file_path = None
47 self.metadata_file_path = None
48 self.db_import_file_path = None
49 self.output_file = None # By default, printed to stdout
50 self.volume_id = None
51 self.version_id = None # Note, this is natural #, starts from 1;
52 self.metadata = None
53 self.options = None
54 self.compression = ''
55
56 _nowabout = datetime.datetime.utcnow()
57 self.dateTime = long(_nowabout.strftime("%s"))
58
59 self.delim = "\t"
60 self.nl = "\n"
61
62
63 def __main__(self):
64 """
65 Handles all command line options for creating kipper archives, and extracting or reverting to a version.
66 """
67 options, args = self.get_command_line()
68 self.options = options
69
70 if options.code_version:
71 print CODE_VERSION
72 return CODE_VERSION
73
74 # *********************** Get Master kipper file ***********************
75 if not len(args):
76 stop_err('A Kipper database file name needs to be included as first parameter!')
77
78 self.db_master_file_name = args[0] #accepts relative path with file name
79
80 self.db_master_file_path = self.check_folder(self.db_master_file_name, "Kipper database file")
81 # db_master_file_path is used from now on; db_master_file_name is used just for metadata labeling.
82 # Adjust it to remove any relative path component.
83 self.db_master_file_name = os.path.basename(self.db_master_file_name)
84
85 if os.path.isdir(self.db_master_file_path):
86 stop_err('Error: Kipper data file "%s" is actually a folder!' % (self.db_master_file_path) )
87
88 self.metadata_file_path = self.db_master_file_path + '.md'
89
90 # Returns path but makes sure its folder is real. Must come before get_metadata()
91 self.output_file = self.check_folder(options.db_output_file_path)
92
93
94 # ************************* Get Metadata ******************************
95 if options.initialize:
96 if options.compression:
97 self.compression = options.compression
98
99 self.set_metadata(type=options.initialize, compression=self.compression)
100
101 self.get_metadata(options);
102
103 self.check_date_input(options)
104
105 if options.version_id or (options.extract and options.version_index):
106 if options.version_index:
107 vol_ver = self.version_lookup(options.version_index)
108
109 else:
110 # Note version_id info overrides any date input above.
111 vol_ver = self.get_version(options.version_id)
112
113 if not vol_ver:
114 stop_err("Error: Given version number or name does not exist in this database")
115
116 (volume, version) = vol_ver
117 self.volume_id = volume['id']
118 self.version_id = version['id']
119 self.dateTime = float(version['created'])
120 else:
121 # Use latest version by default
122 if not self.version_id and len(self.metadata['volumes'][-1]['versions']) > 0:
123 self.volume_id = self.metadata['volumes'][-1]['id']
124 self.version_id = self.metadata['volumes'][-1]['versions'][-1]['id']
125
126 # ************************** Action triggers **************************
127
128 if options.volume == True:
129 # Add a new volume to the metadata
130 self.metadata_create_volume()
131 self.write_metadata(self.metadata)
132
133 if options.db_import_file_path != None:
134 # Any time an import file is specified, this is the only action:
135 self.try_import_file(options)
136 return
137
138 if options.metadata == True:
139 # Writes metadata to disk or stdout
140 self.write_metadata2(self.metadata)
141 return
142
143 if options.extract == True:
144 # Defaults to pulling latest version
145 if not (self.version_id):
146 stop_err('Error: Please supply a version id (-n [number]) or date (-d [date]) to extract.')
147
148 if self.output_file and os.path.isdir(self.output_file):
149 # A general output file name for the data store as a whole
150 output_name = self.metadata['file_name']
151 if output_name == '':
152 # Get output file name from version's original import file_name
153 output_name = self.metadata['volumes'][self.volume_id-1]['versions'][self.version_id-1]['file_name']
154 # But remove the .gz suffix if it is there (refactor later).
155 if output_name[-3:] == '.gz':
156 output_name = output_name[0:-3]
157 self.output_file = os.path.join(self.output_file, output_name)
158
159 self.db_scan_action(KEYDB_EXTRACT)
160 return
161
162 if options.revert == True:
163 if not (options.version_id or options.dateTime or options.unixTime):
164 stop_err('Error: Please supply a version id (-n [number]) or date (-d [date]) to revert to.')
165
166 # Send database back to given revision
167 if self.output_file and self.output_file == os.path.dirname(self.db_master_file_path):
168 self.output_file = self.get_db_path()
169 self.db_scan_action(KEYDB_REVERT)
170 return
171
172 # Default to list datastore versions
173 self.get_list()
174
175
176 def get_db_path(self, volume_id = None):
177 #Note: metadata must be established before this method is called.
178 if volume_id is None: volume_id = self.volume_id
179 return self.db_master_file_path + '_' + str(volume_id) + self.metadata['compression']
180
181
182 def get_temp_output_file(self, action = None, path=None):
183 # Returns write handle (+name) of temp file. Returns gzip interface if compression is on.
184 if path == None:
185 path = self.output_file
186
187 temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(path) )
188
189 # If compression is called for, then we have to switch to gzip handler on the temp name:
190 if action in [KEYDB_REVERT, KEYDB_IMPORT] and self.metadata['compression'] == '.gz':
191 temp.close()
192 temp = myGzipFile(temp.name, 'wb')
193
194 return temp
195
196
197 def get_list(self):
198 volumes = self.metadata['volumes']
199 for ptr in range(0, len(volumes)):
200 volume = volumes[ptr]
201 if ptr < len(volumes)-1:
202 ceiling = str(volumes[ptr+1]['floor_id'] - 1)
203 else:
204 ceiling = ''
205 print "Volume " + str(ptr+1) + ", Versions " + str(volume['floor_id']) + "-" + ceiling
206
207 for version in volume['versions']:
208 print str(version['id']) + ": " + self.dateISOFormat(float(version['created'])) + '_v' + version['name']
209
210
211 def set_metadata(self, type='text', compression=''):
212 """
213 Request to initialize metadata file
214 Output metadata to stdio or to -o output file by way of temp file.
215 If one doesn't include -o, then output goes to stdio;
216 If one includes only -o, then output overwrites .md file.
217 If one includes -o [filename] output overwrites [filename]
218
219 Algorithm processes each line as it comes in database. This means there
220 is no significance to the version_ids ordering; earlier items in list can
221 in fact be later versions of db. So must resort and re-assign ids in end.
222 @param type string text or fasta etc.
223 """
224 if os.path.isfile(self.metadata_file_path):
225 stop_err('Error: Metadata file "%s" exists. You must remove it before generating a new one.' % (self.metadata_file_path) )
226
227 self.metadata_create(type, compression)
228
229 volumes = glob.glob(self.db_master_file_path + '_[0-9]*')
230
231 volumes.sort(key=lambda x: natural_sort_key(x))
232 for volume in volumes:
233 # Note: scanned volumes must be consecutive from 1. No error detection yet.
234 self.metadata_create_volume(False)
235 versions = self.metadata['volumes'][-1]['versions']
236 import_modified = os.path.getmtime(volume)
237 dbReader = bigFileReader(volume)
238 version_ids = []
239 db_key_value = dbReader.read()
240 old_key = ''
241 while db_key_value:
242
243 (created_vid, deleted_vid, db_key, restofline) = db_key_value.split(self.delim, 3)
244 version = versions[self.version_dict_lookup(version_ids, long(created_vid), import_modified)]
245 version['rows'] +=1
246 if old_key != db_key:
247 version['keys'] +=1
248 old_key = db_key
249
250 version['inserts'] += 1
251 if deleted_vid:
252 version = versions[self.version_dict_lookup(version_ids, long(deleted_vid), import_modified)]
253 version['deletes'] += 1
254
255 db_key_value = dbReader.read()
256
257 # Reorder, and reassign numeric version ids:
258 versions.sort(key=lambda x: x['id'])
259 for ptr, version in enumerate(versions):
260 version['id'] = ptr+1
261
262 # If first master db volume doesn't exist, then this is an initialization situation
263 if len(volumes) == 0:
264 self.metadata_create_volume()
265 self.create_volume_file()
266
267 with open(self.metadata_file_path,'w') as metadata_handle:
268 metadata_handle.write(json.dumps(self.metadata, sort_keys=True, indent=4, separators=(',', ': ')))
269
270 return True
271
272
273 def get_metadata(self, options):
274 """
275 Read in json metadata from file, and set file processor [fasta|text] engine accordingly.
276 """
277
278 if not os.path.isfile(self.metadata_file_path):
279 #stop_err('Error: Metadata file "%s" does not exist. You must regenerate it with the -m option before performing other actions.' % (self.metadata_file_path) )
280 stop_err('Error: Unable to locate the "%s" metadata file. It should accompany the "%s" file. Use the -M parameter to initialize or regenerate the basic file.' % (self.metadata_file_path, self.db_master_file_name) )
281
282 with open(self.metadata_file_path,'r') as metadata_handle:
283 self.metadata = json.load(metadata_handle)
284
285 # ******************* Select Kipper Pre/Post Processor **********************
286 # FUTURE: More processor options here - including custom ones referenced in metadata
287 if self.metadata['type'] == 'fasta':
288 self.processor = VDBFastaProcessor() # for fasta sequence databases
289 else:
290 self.processor = VDBProcessor() # default text
291
292 # Handle any JSON metadata defaults here for items that aren't present in previous databases.
293 if not 'compression' in self.metadata:
294 self.metadata['compression'] = ''
295
296
297 def write_metadata(self, content):
298 """
299 Called when data store changes occur (revert and import).
300 If they are going to stdout then don't stream metadata there too.
301 """
302 if self.output_file: self.write_metadata2(content)
303
304
305 def write_metadata2(self,content):
306
307 with (open(self.metadata_file_path,'w') if self.output_file else sys.stdout) as metadata_handle:
308 metadata_handle.write(json.dumps(content, sort_keys=True, indent=4, separators=(',', ': ')))
309
310
311 def metadata_create(self, type, compression, floor_id=1):
312 """
313 Initial metadata structure
314 """
315 file_name = self.db_master_file_name.rsplit('.',1)
316 self.metadata = {
317 'version': CODE_VERSION,
318 'name': self.db_master_file_name,
319 'db_file_name': self.db_master_file_name,
320 # A guess about what best base file name would be to write versions out as
321 'file_name': file_name[0] + '.' + type,
322 'type': type,
323 'description': '',
324 'processor': '', # Processing that overrides type-matched processor.
325 'compression': self.compression,
326 'volumes': []
327 }
328
329
330 def metadata_create_volume(self, file_create = True):
331 # Only add a volume if previous volume has at least 1 version in it.
332 if len(self.metadata['volumes']) == 0 or len(self.metadata['volumes'][-1]['versions']) > 0:
333 id = len(self.metadata['volumes']) + 1
334 volume = {
335 'floor_id': self.get_last_version()+1,
336 'id': id,
337 'versions': []
338 }
339 self.metadata['volumes'].append(volume)
340 self.volume_id = id
341 if file_create:
342 self.create_volume_file()
343
344
345 return id
346
347 else:
348 stop_err("Error: Didn't create a new volume because last one is empty already.")
349
350
351 def create_volume_file(self):
352
353 if self.metadata['compression'] == '.gz':
354 gzip.open(self.get_db_path(), 'wb').close()
355 else:
356 open(self.get_db_path(),'w').close()
357
358
359 def metadata_create_version(self, mydate, file_name = '', file_size = 0, version_name = None):
360 id = self.get_last_version()+1
361 if version_name == None:
362 version_name = str(id)
363
364 version = {
365 'id': id,
366 'created': mydate,
367 'name': version_name,
368 'file_name': file_name,
369 'file_size': file_size,
370 'inserts': 0,
371 'deletes': 0,
372 'rows': 0,
373 'keys': 0
374 }
375 self.metadata['volumes'][-1]['versions'].append(version)
376
377 return version
378
379
380 def get_version(self, version_id = None):
381 if version_id is None:
382 version_id = self.version_id
383
384 for volume in self.metadata['volumes']:
385 for version in volume['versions']:
386 if version_id == version['id']:
387 return (volume, version)
388
389 return False
390
391
392 def version_lookup(self, version_name):
393 for volume in self.metadata['volumes']:
394 for version in volume['versions']:
395 if version_name == version['name']:
396 return (volume, version)
397
398 return False
399
400
401 def version_dict_lookup(self, version_ids, id, timestamp = None):
402 if id not in version_ids:
403 version_ids.append(id)
404 version = self.metadata_create_version(timestamp)
405
406 return version_ids.index(id)
407
408
409 #****************** Methods Involving Scan of Master Kipper file **********************
410
411 def db_scan_action (self, action):
412 """
413 #Python 2.6 needs this reopened if it was previously closed.
414 #sys.stdout = open("/dev/stdout", "w")
415 """
416 dbReader = bigFileReader(self.get_db_path())
417 # Setup temp file:
418 if self.output_file:
419 temp_file = self.get_temp_output_file(action=action)
420
421 # Use temporary file so that db_output_file_path switches to new content only when complete
422 with (temp_file if self.output_file else sys.stdout) as output:
423 db_key_value = dbReader.read()
424
425 while db_key_value:
426 if action == KEYDB_EXTRACT:
427 okLines = self.version_extract(db_key_value)
428
429 elif action == KEYDB_REVERT:
430 okLines = self.version_revert(db_key_value)
431
432 if okLines:
433 output.writelines(okLines)
434
435 db_key_value = dbReader.read()
436
437 # Issue: metadata lock while quick update with output_file???
438 if self.output_file:
439 if action == KEYDB_EXTRACT:
440 self.processor.postprocess_file(temp_file.name)
441
442 # Is there a case where we fail to get to this point?
443 os.rename(temp_file.name, self.output_file)
444
445 if action == KEYDB_REVERT:
446 # When reverting, clear all volumes having versions > self.version_id
447 # Takes out volume structure too.
448 volumes = self.metadata['volumes']
449 for volptr in range(len(volumes)-1, -1, -1):
450 volume = volumes[volptr]
451 if volume['floor_id'] > self.version_id: #TO REVERT IS TO KILL ALL LATER VOLUMES.
452 os.remove(self.get_db_path(volume['id']))
453 versions = volume['versions']
454 for verptr in range(len(versions)-1, -1, -1):
455 if versions[verptr]['id'] > self.version_id:
456 popped = versions.pop(verptr)
457 if len(versions) == 0 and volptr > 0:
458 volumes.pop(volptr)
459
460 self.write_metadata(self.metadata)
461
462
463 def db_scan_line(self, db_key_value):
464 """
465 FUTURE: transact_code will signal how key/value should be interpreted, to
466 allow for differential change storage from previous entries.
467 """
468 # (created_vid, deleted_vid, transact_code, restofline) = db_key_value.split(self.delim,3)
469 (created_vid, deleted_vid, restofline) = db_key_value.split(self.delim,2)
470 if deleted_vid: deleted_vid = long(deleted_vid)
471 return (long(created_vid), deleted_vid, restofline)
472
473
474 def version_extract(self, db_key_value):
475 (created_vid, deleted_vid, restofline) = self.db_scan_line(db_key_value)
476
477 if created_vid <= self.version_id and (not deleted_vid or deleted_vid > self.version_id):
478 return self.processor.postprocess_line(restofline)
479
480 return False
481
482
483 def version_revert(self, db_key_value):
484 """
485 Reverting database here.
486 """
487 (created_vid, deleted_vid, restofline) = self.db_scan_line(db_key_value)
488
489 if created_vid <= self.version_id:
490 if (not deleted_vid) or deleted_vid <= self.version_id:
491 return [str(created_vid) + self.delim + str(deleted_vid) + self.delim + restofline]
492 else:
493 return [str(created_vid) + self.delim + self.delim + restofline]
494 return False
495
496
497 def check_date_input(self, options):
498 """
499 """
500 if options.unixTime != None:
501 try:
502 _userTime = float(options.unixTime)
503 # if it is not a float, triggers exception
504 except ValueError:
505 stop_err("Given Unix time could not be parsed [" + options.unixTime + "]. Format should be [integer]")
506
507 elif options.dateTime != None:
508
509 try:
510 _userTime = parse_date(options.dateTime)
511
512 except ValueError:
513 stop_err("Given date could not be parsed [" + options.dateTime + "]. Format should include at least the year, and any of the other more granular parts, in order: YYYY/MM/DD [H:M:S AM/PM]")
514
515 else:
516 return False
517
518 _dtobject = datetime.datetime.fromtimestamp(float(_userTime)) #
519 self.dateTime = long(_dtobject.strftime("%s"))
520
521
522 # Now see if we can set version_id by it. We look for version_id that has created <= self.dateTime
523 for volume in self.metadata['volumes']:
524 for version in volume['versions']:
525 if version['created'] <= self.dateTime:
526 self.version_id = version['id']
527 self.volume_id = volume['id']
528 else:
529 break
530
531 return True
532
533
534 def check_folder(self, file_path, message = "Output directory for "):
535 """
536 Ensures file folder path for output file exists.
537 We don't want to create output in a mistaken location.
538 """
539 if file_path != None:
540
541 path = os.path.normpath(file_path)
542 if not os.path.isdir(os.path.dirname(path)):
543 # Not an absolute path, so try default folder where script launched from:
544 path = os.path.normpath(os.path.join(os.getcwd(), path) )
545 if not os.path.isdir(os.path.dirname(path)):
546 stop_err(message + "[" + path + "] does not exist!")
547
548 return path
549 return None
550
551
552 def check_file_path(self, file, message = "File "):
553
554 path = os.path.normpath(file)
555 # make sure any relative paths are converted to absolute ones
556 if not os.path.isdir(os.path.dirname(path)) or not os.path.isfile(path):
557 # Not an absolute path, so try default folder where script was called:
558 path = os.path.normpath(os.path.join(os.getcwd(),path) )
559 if not os.path.isfile(path):
560 stop_err(message + "[" + path + "] doesn't exist!")
561 return path
562
563
564 def try_import_file(self, options):
565 """
566 Create new version from comparison of import data file against Kipper
567 Note "-o ." parameter enables writing back to master database.
568 """
569 self.db_import_file_path = self.check_file_path(options.db_import_file_path, "Import data file ")
570
571 check_file = self.processor.preprocess_validate_file(self.db_import_file_path)
572 if not check_file:
573 stop_err("Import data file isn't sorted or composed correctly!")
574
575 # SET version date to creation date of import file.
576 import_modified = os.path.getmtime(self.db_import_file_path)
577
578 original_name = os.path.basename(self.db_import_file_path)
579 # creates a temporary file, which has conversion into 1 line key-value records
580 temp = self.processor.preprocess_file(self.db_import_file_path)
581 if (temp):
582
583 self.db_import_file_path = temp.name
584
585 self.import_file(original_name, import_modified, options.version_index)
586
587 os.remove(temp.name)
588
589
590 def import_file(self, file_name, import_modified, version_index = None):
591 """
592 Imports from an import file (or temp file if transformation done above) to
593 temp Kipper version which is copied over to main database on completion.
594
595 Import algorithm only works if the import file is already sorted in the same way as the Kipper database file
596
597 @uses self.db_import_file_path string A file full of one line key[tab]value records.
598 @uses self.output_file string A file to save results in. If empty, then stdio.
599
600 @uses dateTime string Date time to mark created/deleted records by.
601 @puses delim char Separator between key/value pairs.ake it the function.
602
603 @param file_name name of file being imported. This is stored in version record so that output file will be the same.
604 """
605 delim = self.delim
606
607
608 file_size = os.path.getsize(self.db_import_file_path)
609 if version_index == None:
610 version_index = str(self.get_last_version()+1)
611
612 self.volume_id = self.metadata['volumes'][-1]['id'] #For get_db_path() call below.
613
614 if self.output_file:
615 temp_file = self.get_temp_output_file(action=KEYDB_IMPORT, path=self.get_db_path())
616
617 # We want to update database here when output file is db itself.
618 if os.path.isdir(self.output_file):
619 self.output_file = self.get_db_path()
620
621 version = self.metadata_create_version(import_modified, file_name, file_size, version_index)
622 version_id = str(version['id'])
623
624 with (temp_file if self.output_file else sys.stdout) as outputFile :
625 dbReader = bigFileReader(self.get_db_path())
626 importReader = bigFileReader(self.db_import_file_path)
627 old_import_key=''
628
629 while True:
630
631 db_key_value = dbReader.turn()
632 #if import_key_value
633 import_key_value = importReader.turn()
634
635 # Skip empty or whitespace lines:
636 if import_key_value and len(import_key_value.lstrip()) == 0:
637 import_key_value = importReader.read()
638 continue
639
640 if not db_key_value: # eof
641 while import_key_value: # Insert remaining import lines:
642 (import_key, import_value) = self.get_key_value(import_key_value)
643 outputFile.write(version_id + delim + delim + import_key + delim + import_value)
644 import_key_value = importReader.read()
645 version['inserts'] += 1
646 version['rows'] += 1
647
648 if import_key != old_import_key:
649 version['keys'] += 1
650 old_import_key = import_key
651
652 break # Both inputs are eof, so exit
653
654 elif not import_key_value: # db has key that import file no longer has, so mark each subsequent db line as a delete of the key (if it isn't already)
655 while db_key_value:
656 (created_vid, deleted_vid, dbKey, dbValue) = db_key_value.split(delim,3)
657 version['rows'] += 1
658
659 if deleted_vid:
660 outputFile.write(db_key_value)
661 else:
662 outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue)
663 version['deletes'] += 1
664
665 db_key_value = dbReader.read()
666 break
667
668 else:
669 (import_key, import_value) = self.get_key_value(import_key_value)
670 (created_vid, deleted_vid, dbKey, dbValue) = db_key_value.split(delim,3)
671
672 if import_key != old_import_key:
673 version['keys'] += 1
674 old_import_key = import_key
675
676 # All cases below lead to writing a row ...
677 version['rows'] += 1
678
679 if import_key == dbKey:
680 # When the keys match, we have enough information to act on the current db_key_value content;
681 # therefore ensure on next pass that we read it.
682 dbReader.step()
683
684 if import_value == dbValue:
685 outputFile.write(db_key_value)
686
687 # All past items marked with insert will also have a delete. Step until we find one
688 # not marked as a delete... or a new key.
689 if deleted_vid: # Good to go in processing next lines in both files.
690 pass
691 else:
692 importReader.step()
693
694 else: # Case where value changed - so process all db_key_values until key no longer matches.
695
696 # Some future pass will cause import line to be written to db
697 # (when key mismatch occurs) as long as we dont advance it (prematurely).
698 if deleted_vid:
699 #preserve deletion record.
700 outputFile.write(db_key_value)
701
702 else:
703 # Mark record deletion
704 outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue)
705 version['deletes'] += 1
706 # Then advance since new key/value means new create
707
708 else:
709 # Natural sort doesn't do text sort on numeric parts, ignores capitalization.
710 dbKeySort = natural_sort_key(dbKey)
711 import_keySort = natural_sort_key(import_key)
712 # False if dbKey less; Means db key is no longer in sync db,
713 if cmp(dbKeySort, import_keySort) == -1:
714
715 if deleted_vid: #Already marked as a delete
716 outputFile.write(db_key_value)
717
718 else: # Write dbKey as a new delete
719 outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue)
720 version['deletes'] += 1
721 # Advance ... there could be another db_key_value for deletion too.
722 dbReader.step()
723
724 else: #DB key is greater, so insert import_key,import_value in db.
725 # Write a create record
726 outputFile.write(version_id + delim + delim + import_key + delim + import_value)
727 version['inserts'] += 1
728 importReader.step() # Now compare next two candidates.
729
730 if self.output_file:
731 # Kipper won't write an empty version - since this is usually a mistake.
732 # If user has just added new volume though, then slew of inserts will occur
733 # even if version is identical to tail end of previous volume version.
734 if version['inserts'] > 0 or version['deletes'] > 0:
735 #print "Temp file:" + temp_file.name
736 os.rename(temp_file.name, self.output_file)
737 self.write_metadata(self.metadata)
738 else:
739 os.remove(temp_file.name)
740
741
742 def get_last_version(self):
743 """
744 Returns first Volume version counting from most recent.
745 Catch is that some volume might be empty, so have to go to previous one
746 """
747 for ptr in range(len(self.metadata['volumes'])-1, -1, -1):
748 versions = self.metadata['volumes'][ptr]['versions']
749 if len(versions) > 0:
750 return versions[-1]['id']
751
752 return 0
753
754
755 # May want to move this to individual data store processor since it can be sensitive to different kinds of whitespace then.
756 def get_key_value(self, key_value):
757 # ACCEPTS SPLIT AT ANY WHITESPACE PAST KEY BY DEFAULT
758 kvparse = key_value.split(None,1)
759 #return (key_value[0:kvptr], key_value[kvptr:].lstrip())
760 return (kvparse[0], kvparse[1] if len(kvparse) >1 else '')
761
762
763 def dateISOFormat(self, atimestamp):
764 return datetime.datetime.isoformat(datetime.datetime.fromtimestamp(atimestamp))
765
766
767 def get_command_line(self):
768 """
769 *************************** Parse Command Line *****************************
770
771 """
772 parser = MyParser(
773 description = 'Maintains versions of a file-based database with comparison to full-copy import file updates.',
774 usage = 'kipper.py [kipper database file] [options]*',
775 epilog="""
776
777 All outputs go to stdout and affect no change in Kipper database unless the '-o' parameter is supplied. (The one exception to this is when the -M regenerate metadata command is provided, as described below.) Thus by default one sees what would happen if an action were taken, but must take an additional step to affect the data.
778
779 '-o .' is a special request that leads to:
780 * an update of the Kipper database for --import or --revert actions
781 * an update of the .md file for -M --rebuild action
782
783 As well, when -o parameter is a path, and not a specific filename, then kipper.py looks up what the appropriate output file name is according to the metadata file.
784
785 USAGE
786
787 Initialize metadata file and Kipper file.
788 kipper.py [database file] -M --rebuild [type of database:text|fasta]
789
790 View metadata (json) file.
791 kipper.py [database file] -m --metadata
792
793 Import key/value inserts/deletes based on import file. (Current date used).
794 kipper.py [database file] -i --import [import file]
795 e.g.
796 kipper.py cpn60 -i sequences.fasta # outputs new master database to stdout; doesn't rewrite it.
797 kipper.py cpn60 -i sequences.fasta -o . # rewrites cpn60 with new version added.
798
799 Extract a version of the file based on given date/time
800 kipper.py [database file] -e --extract -d datetime -o [output file]
801
802 Extract a version of the file based on given version Id
803 kipper.py [database file] -e --extract -n [version id] -o [output file]
804
805 List versions of dbFile key/value pairs (by date/time)
806 kipper.py [database file]
807 kipper.py [database file] -l --list
808
809 Have database revert to previous version. Drops future records, unmarks corresponding deletes.
810 kipper.py [database file] -r --revert -d datetime -o [output file]
811
812 Return version of this code:
813 kipper.py -v --version
814 """)
815
816 # Data/Metadata changing actions
817 parser.add_option('-M', '--rebuild', type='choice', dest='initialize', choices=['text','fasta'],
818 help='(Re)generate metadata file [name of db].md . Provide the type of db [text|fasta| etc.].')
819
820 parser.add_option('-i', '--import', type='string', dest='db_import_file_path',
821 help='Import key/value inserts/deletes based on delta comparison with import file')
822
823 parser.add_option('-e', '--extract', dest='extract', default=False, action='store_true',
824 help='Extract a version of the file based on given date/time')
825
826 parser.add_option('-r', '--revert', dest='revert', default=False, action='store_true',
827 help='Have database revert to previous version (-d date/time required). Drops future records, unmarks corresponding deletes.')
828
829 parser.add_option('-V', '--volume', dest='volume', default=False, action='store_true',
830 help='Add a new volume to the metadata. New imports will be added here.')
831
832 # Passive actions
833 parser.add_option('-m', '--metadata', dest='metadata', default=False, action='store_true',
834 help='View metadata file [name of db].md')
835
836 parser.add_option('-l', '--list', dest='list', default=False, action='store_true',
837 help='List versions of dbFile key/value pairs (by date/time)')
838
839 parser.add_option('-c', '--compression', dest='compression', type='choice', choices=['.gz'],
840 help='Enable compression of database. options:[.gz]')
841
842 # Used "v" for standard code version identifier.
843 parser.add_option('-v', '--version', dest='code_version', default=False, action='store_true',
844 help='Return version of kipper.py code.')
845
846 parser.add_option('-o', '--output', type='string', dest='db_output_file_path',
847 help='Output to this file. Default is to stdio')
848
849 parser.add_option('-I', '--index', type='string', dest='version_index',
850 help='Provide title (index) e.g. "1.4" of version being imported/extracted.')
851
852 parser.add_option('-d', '--date', type='string', dest='dateTime',
853 help='Provide date/time for sync, extract or revert operations. Defaults to now.')
854 parser.add_option('-u', '--unixTime', type='int', dest='unixTime',
855 help='Provide Unix time (integer) for sync, extract or revert operations.')
856 parser.add_option('-n', '--number', type='int', dest='version_id',
857 help='Provide a version id to extract or revert to.')
858
859 return parser.parse_args()
860
861
862 class VDBProcessor(object):
863
864 delim = '\t'
865 nl = '\n'
866
867 #def preprocess_line(self, line):
868 # return [line]
869
870 def preprocess_file(self, file_path):
871 temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(file_path) )
872 copy (file_path, temp.name)
873 temp.close()
874 sort_a = subprocess.call(['sort','-sfV','-t\t','-k1,1', '-o',temp.name, temp.name])
875 return temp #Enables temp file name to be used by caller.
876
877
878 def preprocess_validate_file(self, file_path):
879
880 # Do import file preprocessing:
881 # 1) Mechanism to verify if downloaded file is complete - check md5 hash?
882 # 4) Could test file.newlines(): returns \r, \n, \r\n if started to read file (1st line).
883 # 5) Could auto-uncompress .tar.gz, bz2 etc.
884 # Ensures "[key] [value]" entries are sorted
885 # "sort --check ..." returns nothing if sorted, or e.g "sort: sequences_A.fastx.sorted:12: disorder: >114 AJ009959.1 … "
886
887 # if not subprocess.call(['sort','--check','-V',db_import_file_path]): #very fast check
888 # subprocess.call(['sort','-V',db_import_file_path]):
889
890 return True
891
892 def postprocess_line(self, line):
893 #Lines are placed in array so that one can map to many in output file
894 return [line]
895
896 def postprocess_file(self, file_path):
897 return False
898
899 def sort(self, a, b):
900 pass
901
902
903 class VDBFastaProcessor(VDBProcessor):
904
905
906 def preprocess_file(self, file_path):
907 """
908 Converts input fasta data into one line tab-delimited record format, then sorts.
909 """
910 temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(file_path) )
911 fileReader = bigFileReader(file_path)
912 line = fileReader.read()
913 old_line = ''
914 while line:
915 line = line.strip()
916 if len(line) > 0:
917
918 if line[0] == '>':
919 if len(old_line):
920 temp.write(old_line + self.nl)
921 lineparse = line.split(None,1)
922 key = lineparse[0].strip()
923 if len(lineparse) > 1:
924 description = lineparse[1].strip().replace(self.delim, ' ')
925 else:
926 description = ''
927 old_line = key[1:] + self.delim + description + self.delim
928
929 else:
930 old_line = old_line + line
931
932 line = fileReader.read()
933
934 if len(old_line)>0:
935 temp.write(old_line+self.nl)
936
937 temp.close()
938
939 # Is this a consideration for natural sort in Python vs bash sort?:
940 # *** WARNING *** The locale specified by the environment affects sort order.
941 # Set LC_ALL=C to get the traditional sort order that uses native byte values.
942 #-s stable; -f ignore case; V natural sort (versioning) ; -k column, -t tab delimiter
943 sort_a = subprocess.call(['sort', '-sfV', '-t\t', '-k1,1', '-o',temp.name, temp.name])
944
945 return temp #Enables temp file name to be used by caller.
946
947
948 def postprocess_line(self, line):
949 """
950 Transform Kipper fasta 1 line format key/value back into output file line(s) - an array
951
952 @param line string containing [accession id][TAB][description][TAB][fasta sequence]
953 @return string containing lines each ending with newline, except end.
954 """
955 line_data = line.split('\t',2)
956 # Set up ">[accession id] [description]\n" :
957 fasta_header = '>' + ' '.join(line_data[0:2]) + '\n'
958 # Put fasta sequences back into multi-line; note trailing item has newline.
959 sequences= self.split_len(line_data[2],80)
960 if len(sequences) and sequences[-1].strip() == '':
961 sequences[-1] = ''
962
963 return fasta_header + '\n'.join(sequences)
964
965
966 def split_len(self, seq, length):
967 return [seq[i:i+length] for i in range(0, len(seq), length)]
968
969
970 class bigFileReader(object):
971 """
972 This provides some advantage over reading line by line, and as well has a system
973 for skipping/not advancing reads - it has a memory via "take_step" about whether
974 it should advance or not - this is used when the master database and the import
975 database are feeding lines into a new database.
976
977 Interestingly, using readlines() with byte hint parameter less
978 than file size improves performance by at least 30% over readline().
979
980 FUTURE: Adjust buffer lines dynamically based on file size/lines ratio?
981 """
982
983 def __init__(self, filename):
984 self.lines = []
985 # This simply allows any .gz repository to be opened
986 # It isn't connected to the Kipper metadata['compression'] feature.
987 if filename[-3:] == '.gz':
988 self.file = gzip.open(filename,'rb')
989 else:
990 self.file = open(filename, 'rb', 1)
991
992 self.line = False
993 self.take_step = True
994 self.buffer_size=1000 # Number of lines to read into buffer.
995
996
997 def turn(self):
998 """
999 When accessing bigFileReader via turn mechanism, we get current line if no step;
1000 otherwise with step we read new line.
1001 """
1002 if self.take_step == True:
1003 self.take_step = False
1004 return self.read()
1005 return self.line
1006
1007
1008 def read(self):
1009 if len(self.lines) == 0:
1010 self.lines = self.file.readlines(self.buffer_size)
1011 if len(self.lines) > 0:
1012 self.line = self.lines.pop(0)
1013 #if len(self.lines) == 0:
1014 # self.lines = self.file.readlines(self.buffer_size)
1015 #make sure each line doesn't include carriage return
1016 return self.line
1017
1018 return False
1019
1020
1021 def readlines(self):
1022 """
1023 Small efficiency:
1024 A test on self.lines after readLines() call can control loop.
1025 Bulk write of remaining buffer; ensures lines array isn't copied
1026 but is preserved when self.lines is removed
1027 """
1028 self.line = False
1029 if len(self.lines) == 0:
1030 self.lines = self.file.readlines(self.buffer_size)
1031 if len(self.lines) > 0:
1032 shallowCopy = self.lines[:]
1033 self.lines = self.file.readlines(self.buffer_size)
1034 return shallowCopy
1035 return False
1036
1037
1038 def step(self):
1039 self.take_step = True
1040
1041
1042
1043 # Enables use of with ... syntax. See https://mail.python.org/pipermail/tutor/2009-November/072959.html
1044 class myGzipFile(gzip.GzipFile):
1045 def __enter__(self):
1046 if self.fileobj is None:
1047 raise ValueError("I/O operation on closed GzipFile object")
1048 return self
1049
1050 def __exit__(self, *args):
1051 self.close()
1052
1053
1054 def natural_sort_key(s, _nsre = REGEX_NATURAL_SORT):
1055 return [int(text) if text.isdigit() else text.lower()
1056 for text in re.split(_nsre, s)]
1057
1058
1059 def generic_linux_sort(self):
1060 import locale
1061 locale.setlocale(locale.LC_ALL, "C")
1062 yourList.sort(cmp=locale.strcoll)
1063
1064
1065 def parse_date(adate):
1066 """
1067 Convert human-entered time into linux integer timestamp
1068
1069 @param adate string Human entered date to parse into linux time
1070
1071 @return integer Linux time equivalent or 0 if no date supplied
1072 """
1073 adate = adate.strip()
1074 if adate > '':
1075 adateP = parser2.parse(adate, fuzzy=True)
1076 #dateP2 = time.mktime(adateP.timetuple())
1077 # This handles UTC & daylight savings exactly
1078 return calendar.timegm(adateP.timetuple())
1079 return 0
1080
1081
1082 if __name__ == '__main__':
1083
1084 kipper = Kipper()
1085 kipper.__main__()
1086