comparison planemo/bin/dynamodb_load @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 #!/Users/guerler/spring/springsuite/planemo/bin/python3
2
3 import argparse
4 import os
5
6 import boto
7 from boto.compat import json
8 from boto.compat import six
9 from boto.dynamodb.schema import Schema
10
11
12 DESCRIPTION = """Load data into one or more DynamoDB tables.
13
14 For each table, data is read from two files:
15 - {table_name}.metadata for the table's name, schema and provisioned
16 throughput (only required if creating the table).
17 - {table_name}.data for the table's actual contents.
18
19 Both files are searched for in the current directory. To read them from
20 somewhere else, use the --in-dir parameter.
21
22 This program does not wipe the tables prior to loading data. However, any
23 items present in the data files will overwrite the table's contents.
24 """
25
26
27 def _json_iterload(fd):
28 """Lazily load newline-separated JSON objects from a file-like object."""
29 buffer = ""
30 eof = False
31 while not eof:
32 try:
33 # Add a line to the buffer
34 buffer += fd.next()
35 except StopIteration:
36 # We can't let that exception bubble up, otherwise the last
37 # object in the file will never be decoded.
38 eof = True
39 try:
40 # Try to decode a JSON object.
41 json_object = json.loads(buffer.strip())
42
43 # Success: clear the buffer (everything was decoded).
44 buffer = ""
45 except ValueError:
46 if eof and buffer.strip():
47 # No more lines to load and the buffer contains something other
48 # than whitespace: the file is, in fact, malformed.
49 raise
50 # We couldn't decode a complete JSON object: load more lines.
51 continue
52
53 yield json_object
54
55
56 def create_table(metadata_fd):
57 """Create a table from a metadata file-like object."""
58
59
60 def load_table(table, in_fd):
61 """Load items into a table from a file-like object."""
62 for i in _json_iterload(in_fd):
63 # Convert lists back to sets.
64 data = {}
65 for k, v in six.iteritems(i):
66 if isinstance(v, list):
67 data[k] = set(v)
68 else:
69 data[k] = v
70 table.new_item(attrs=data).put()
71
72
73 def dynamodb_load(tables, in_dir, create_tables):
74 conn = boto.connect_dynamodb()
75 for t in tables:
76 metadata_file = os.path.join(in_dir, "%s.metadata" % t)
77 data_file = os.path.join(in_dir, "%s.data" % t)
78 if create_tables:
79 with open(metadata_file) as meta_fd:
80 metadata = json.load(meta_fd)
81 table = conn.create_table(
82 name=t,
83 schema=Schema(metadata["schema"]),
84 read_units=metadata["read_units"],
85 write_units=metadata["write_units"],
86 )
87 table.refresh(wait_for_active=True)
88 else:
89 table = conn.get_table(t)
90
91 with open(data_file) as in_fd:
92 load_table(table, in_fd)
93
94
95 if __name__ == "__main__":
96 parser = argparse.ArgumentParser(
97 prog="dynamodb_load",
98 description=DESCRIPTION
99 )
100 parser.add_argument(
101 "--create-tables",
102 action="store_true",
103 help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)."
104 )
105 parser.add_argument("--in-dir", default=".")
106 parser.add_argument("tables", metavar="TABLES", nargs="+")
107
108 namespace = parser.parse_args()
109
110 dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables)