Mercurial > repos > shellac > sam_consensus_v3
view env/bin/sdbadmin @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
#!/Users/cmdms/OneDrive-UOB/Development/Projects/2021/sam-consensus-v3/env/bin/python3 # Copyright (c) 2009 Chris Moyer http://kopertop.blogspot.com/ # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, dis- # tribute, sublicense, and/or sell copies of the Software, and to permit # persons to whom the Software is furnished to do so, subject to the fol- # lowing conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # # Tools to dump and recover an SDB domain # VERSION = "%prog version 1.0" import boto import time from boto import sdb from boto.compat import json def choice_input(options, default=None, title=None): """ Choice input """ if title == None: title = "Please choose" print title objects = [] for n, obj in enumerate(options): print "%s: %s" % (n, obj) objects.append(obj) choice = int(raw_input(">>> ")) try: choice = objects[choice] except: choice = default return choice def confirm(message="Are you sure?"): choice = raw_input("%s [yN] " % message) return choice and len(choice) > 0 and choice[0].lower() == "y" def dump_db(domain, file_name, use_json=False, sort_attributes=False): """ Dump SDB domain to file """ f = open(file_name, "w") if use_json: for item in domain: data = {"name": item.name, "attributes": item} print >> f, json.dumps(data, sort_keys=sort_attributes) else: doc = domain.to_xml(f) def empty_db(domain): """ Remove all entries from domain """ for item in domain: item.delete() def load_db(domain, file, use_json=False): """ Load a domain from a file, this doesn't overwrite any existing data in the file so if you want to do a full recovery and restore you need to call empty_db before calling this :param domain: The SDB Domain object to load to :param file: The File to load the DB from """ if use_json: for line in file.readlines(): if line: data = json.loads(line) item = domain.new_item(data['name']) item.update(data['attributes']) item.save() else: domain.from_xml(file) def check_valid_region(conn, region): if conn is None: print 'Invalid region (%s)' % region sys.exit(1) def create_db(domain_name, region_name): """Create a new DB :param domain: Name of the domain to create :type domain: str """ sdb = boto.sdb.connect_to_region(region_name) check_valid_region(sdb, region_name) return sdb.create_domain(domain_name) if __name__ == "__main__": from optparse import OptionParser parser = OptionParser(version=VERSION, usage="Usage: %prog [--dump|--load|--empty|--list|-l] [options]") # Commands parser.add_option("--dump", help="Dump domain to file", dest="dump", default=False, action="store_true") parser.add_option("--load", help="Load domain contents from file", dest="load", default=False, action="store_true") parser.add_option("--empty", help="Empty all contents of domain", dest="empty", default=False, action="store_true") parser.add_option("-l", "--list", help="List All domains", dest="list", default=False, action="store_true") parser.add_option("-c", "--create", help="Create domain", dest="create", default=False, action="store_true") parser.add_option("-a", "--all-domains", help="Operate on all domains", action="store_true", default=False, dest="all_domains") if json: parser.add_option("-j", "--use-json", help="Load/Store as JSON instead of XML", action="store_true", default=False, dest="json") parser.add_option("-s", "--sort-attibutes", help="Sort the element attributes", action="store_true", default=False, dest="sort_attributes") parser.add_option("-d", "--domain", help="Do functions on domain (may be more then one)", action="append", dest="domains") parser.add_option("-f", "--file", help="Input/Output file we're operating on", dest="file_name") parser.add_option("-r", "--region", help="Region (e.g. us-east-1[default] or eu-west-1)", default="us-east-1", dest="region_name") (options, args) = parser.parse_args() if options.create: for domain_name in options.domains: create_db(domain_name, options.region_name) exit() sdb = boto.sdb.connect_to_region(options.region_name) check_valid_region(sdb, options.region_name) if options.list: for db in sdb.get_all_domains(): print db exit() if not options.dump and not options.load and not options.empty: parser.print_help() exit() # # Setup # if options.domains: domains = [] for domain_name in options.domains: domains.append(sdb.get_domain(domain_name)) elif options.all_domains: domains = sdb.get_all_domains() else: domains = [choice_input(options=sdb.get_all_domains(), title="No domain specified, please choose one")] # # Execute the commands # stime = time.time() if options.empty: if confirm("WARNING!!! Are you sure you want to empty the following domains?: %s" % domains): stime = time.time() for domain in domains: print "--------> Emptying %s <--------" % domain.name empty_db(domain) else: print "Canceling operations" exit() if options.dump: for domain in domains: print "--------> Dumping %s <---------" % domain.name if options.file_name: file_name = options.file_name else: file_name = "%s.db" % domain.name dump_db(domain, file_name, options.json, options.sort_attributes) if options.load: for domain in domains: print "---------> Loading %s <----------" % domain.name if options.file_name: file_name = options.file_name else: file_name = "%s.db" % domain.name load_db(domain, open(file_name, "rb"), options.json) total_time = round(time.time() - stime, 2) print "--------> Finished in %s <--------" % total_time