Mercurial > repos > guerler > springsuite
diff planemo/bin/dynamodb_load @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/bin/dynamodb_load Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,110 @@ +#!/Users/guerler/spring/springsuite/planemo/bin/python3 + +import argparse +import os + +import boto +from boto.compat import json +from boto.compat import six +from boto.dynamodb.schema import Schema + + +DESCRIPTION = """Load data into one or more DynamoDB tables. + +For each table, data is read from two files: + - {table_name}.metadata for the table's name, schema and provisioned + throughput (only required if creating the table). + - {table_name}.data for the table's actual contents. + +Both files are searched for in the current directory. To read them from +somewhere else, use the --in-dir parameter. + +This program does not wipe the tables prior to loading data. However, any +items present in the data files will overwrite the table's contents. +""" + + +def _json_iterload(fd): + """Lazily load newline-separated JSON objects from a file-like object.""" + buffer = "" + eof = False + while not eof: + try: + # Add a line to the buffer + buffer += fd.next() + except StopIteration: + # We can't let that exception bubble up, otherwise the last + # object in the file will never be decoded. + eof = True + try: + # Try to decode a JSON object. + json_object = json.loads(buffer.strip()) + + # Success: clear the buffer (everything was decoded). + buffer = "" + except ValueError: + if eof and buffer.strip(): + # No more lines to load and the buffer contains something other + # than whitespace: the file is, in fact, malformed. + raise + # We couldn't decode a complete JSON object: load more lines. + continue + + yield json_object + + +def create_table(metadata_fd): + """Create a table from a metadata file-like object.""" + + +def load_table(table, in_fd): + """Load items into a table from a file-like object.""" + for i in _json_iterload(in_fd): + # Convert lists back to sets. + data = {} + for k, v in six.iteritems(i): + if isinstance(v, list): + data[k] = set(v) + else: + data[k] = v + table.new_item(attrs=data).put() + + +def dynamodb_load(tables, in_dir, create_tables): + conn = boto.connect_dynamodb() + for t in tables: + metadata_file = os.path.join(in_dir, "%s.metadata" % t) + data_file = os.path.join(in_dir, "%s.data" % t) + if create_tables: + with open(metadata_file) as meta_fd: + metadata = json.load(meta_fd) + table = conn.create_table( + name=t, + schema=Schema(metadata["schema"]), + read_units=metadata["read_units"], + write_units=metadata["write_units"], + ) + table.refresh(wait_for_active=True) + else: + table = conn.get_table(t) + + with open(data_file) as in_fd: + load_table(table, in_fd) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="dynamodb_load", + description=DESCRIPTION + ) + parser.add_argument( + "--create-tables", + action="store_true", + help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)." + ) + parser.add_argument("--in-dir", default=".") + parser.add_argument("tables", metavar="TABLES", nargs="+") + + namespace = parser.parse_args() + + dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables)