Mercurial > repos > guerler > springsuite
comparison planemo/bin/dynamodb_load @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 #!/Users/guerler/spring/springsuite/planemo/bin/python3 | |
2 | |
3 import argparse | |
4 import os | |
5 | |
6 import boto | |
7 from boto.compat import json | |
8 from boto.compat import six | |
9 from boto.dynamodb.schema import Schema | |
10 | |
11 | |
12 DESCRIPTION = """Load data into one or more DynamoDB tables. | |
13 | |
14 For each table, data is read from two files: | |
15 - {table_name}.metadata for the table's name, schema and provisioned | |
16 throughput (only required if creating the table). | |
17 - {table_name}.data for the table's actual contents. | |
18 | |
19 Both files are searched for in the current directory. To read them from | |
20 somewhere else, use the --in-dir parameter. | |
21 | |
22 This program does not wipe the tables prior to loading data. However, any | |
23 items present in the data files will overwrite the table's contents. | |
24 """ | |
25 | |
26 | |
27 def _json_iterload(fd): | |
28 """Lazily load newline-separated JSON objects from a file-like object.""" | |
29 buffer = "" | |
30 eof = False | |
31 while not eof: | |
32 try: | |
33 # Add a line to the buffer | |
34 buffer += fd.next() | |
35 except StopIteration: | |
36 # We can't let that exception bubble up, otherwise the last | |
37 # object in the file will never be decoded. | |
38 eof = True | |
39 try: | |
40 # Try to decode a JSON object. | |
41 json_object = json.loads(buffer.strip()) | |
42 | |
43 # Success: clear the buffer (everything was decoded). | |
44 buffer = "" | |
45 except ValueError: | |
46 if eof and buffer.strip(): | |
47 # No more lines to load and the buffer contains something other | |
48 # than whitespace: the file is, in fact, malformed. | |
49 raise | |
50 # We couldn't decode a complete JSON object: load more lines. | |
51 continue | |
52 | |
53 yield json_object | |
54 | |
55 | |
56 def create_table(metadata_fd): | |
57 """Create a table from a metadata file-like object.""" | |
58 | |
59 | |
60 def load_table(table, in_fd): | |
61 """Load items into a table from a file-like object.""" | |
62 for i in _json_iterload(in_fd): | |
63 # Convert lists back to sets. | |
64 data = {} | |
65 for k, v in six.iteritems(i): | |
66 if isinstance(v, list): | |
67 data[k] = set(v) | |
68 else: | |
69 data[k] = v | |
70 table.new_item(attrs=data).put() | |
71 | |
72 | |
73 def dynamodb_load(tables, in_dir, create_tables): | |
74 conn = boto.connect_dynamodb() | |
75 for t in tables: | |
76 metadata_file = os.path.join(in_dir, "%s.metadata" % t) | |
77 data_file = os.path.join(in_dir, "%s.data" % t) | |
78 if create_tables: | |
79 with open(metadata_file) as meta_fd: | |
80 metadata = json.load(meta_fd) | |
81 table = conn.create_table( | |
82 name=t, | |
83 schema=Schema(metadata["schema"]), | |
84 read_units=metadata["read_units"], | |
85 write_units=metadata["write_units"], | |
86 ) | |
87 table.refresh(wait_for_active=True) | |
88 else: | |
89 table = conn.get_table(t) | |
90 | |
91 with open(data_file) as in_fd: | |
92 load_table(table, in_fd) | |
93 | |
94 | |
95 if __name__ == "__main__": | |
96 parser = argparse.ArgumentParser( | |
97 prog="dynamodb_load", | |
98 description=DESCRIPTION | |
99 ) | |
100 parser.add_argument( | |
101 "--create-tables", | |
102 action="store_true", | |
103 help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)." | |
104 ) | |
105 parser.add_argument("--in-dir", default=".") | |
106 parser.add_argument("tables", metavar="TABLES", nargs="+") | |
107 | |
108 namespace = parser.parse_args() | |
109 | |
110 dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables) |