0
|
1 #!/usr/bin/env python
|
4
|
2
|
0
|
3
|
|
4 import argparse
|
|
5 import datetime
|
|
6 import os
|
|
7 import subprocess
|
|
8 import sys
|
|
9
|
5
|
10 import dateutil.parser
|
|
11
|
|
12 import psycopg2
|
|
13
|
0
|
14 from six.moves import configparser
|
|
15
|
5
|
16 from sqlalchemy import MetaData
|
0
|
17 from sqlalchemy import create_engine
|
|
18 from sqlalchemy.engine.url import make_url
|
|
19
|
|
20 now = datetime.datetime.utcnow
|
|
21 metadata = MetaData()
|
|
22
|
|
23 DEFAULT_MISSING_NUMERIC_VALUE = -9.000000
|
|
24
|
|
25
|
|
26 def check_execution_errors(rc, fstderr, fstdout):
|
|
27 if rc != 0:
|
|
28 fh = open(fstdout, 'rb')
|
|
29 out_msg = fh.read()
|
|
30 fh.close()
|
|
31 fh = open(fstderr, 'rb')
|
|
32 err_msg = fh.read()
|
|
33 fh.close()
|
|
34 msg = '%s\n%s\n' % (str(out_msg), str(err_msg))
|
|
35 sys.exit(msg)
|
|
36
|
|
37
|
|
38 def get_config_settings(config_file, section='defaults'):
|
|
39 # Return a dictionary consisting of the key / value pairs
|
|
40 # of the defaults section of config_file.
|
|
41 d = {}
|
|
42 config_parser = configparser.ConfigParser()
|
|
43 config_parser.read(config_file)
|
|
44 for key, value in config_parser.items(section):
|
|
45 if section == 'defaults':
|
5
|
46 d[key.upper()] = value
|
0
|
47 else:
|
|
48 d[key] = value
|
|
49 return d
|
|
50
|
|
51
|
|
52 def get_response_buffers():
|
|
53 fstderr = os.path.join(os.getcwd(), 'stderr.txt')
|
|
54 fherr = open(fstderr, 'wb')
|
|
55 fstdout = os.path.join(os.getcwd(), 'stdout.txt')
|
|
56 fhout = open(fstdout, 'wb')
|
|
57 return fstderr, fherr, fstdout, fhout
|
|
58
|
|
59
|
|
60 def get_sql_param_val_str(column_val, default):
|
|
61 if set_to_null(column_val):
|
|
62 val = default
|
|
63 else:
|
|
64 val = column_val
|
|
65 return "= '%s'" % val
|
|
66
|
|
67
|
|
68 def get_value_from_config(config_defaults, value):
|
|
69 return config_defaults.get(value, None)
|
|
70
|
|
71
|
|
72 def get_year_from_now():
|
|
73 # Get current date plus one year for possible insertion
|
|
74 # into the public_after_date column of the sample table.
|
|
75 # The default behavior is for the value of the public
|
|
76 # column to be True and the public_after_date to be NULL,
|
|
77 # making the sample "public". However, the user can
|
|
78 # set the value of the public column to False and optionally
|
|
79 # set a date after which the sample becomes public in the
|
|
80 # Affymetrix 96 well plate metadata file associated with
|
|
81 # the sample. If the value of the public column is set
|
|
82 # to False, but no date is set, the default date will be 1
|
|
83 # year from the time the row is inserted into the table.
|
|
84 today = datetime.date.today()
|
|
85 try:
|
|
86 # Return the same day of the year.
|
|
87 year = today.year + 1
|
|
88 return today.replace(year=year)
|
|
89 except Exception:
|
|
90 # Handle leap years.
|
|
91 return today + (datetime.date(today.year + 1, 1, 1) - datetime.date(today.year, 1, 1))
|
|
92
|
|
93
|
|
94 def handle_column_value(val, get_sql_param=True, default=''):
|
|
95 # Regarding the default value, a NULL value indicates an unknown value
|
|
96 # and typically should not be confused with an empty string. Our application
|
|
97 # does not need the concept of unknown value, so most columns are
|
|
98 # non-nullable and our default is an empty string.
|
|
99 param = handle_null(val)
|
|
100 if get_sql_param:
|
|
101 param_val_str = get_sql_param_val_str(val, default)
|
|
102 if param is None:
|
|
103 if get_sql_param:
|
|
104 return default, param_val_str
|
|
105 return default
|
|
106 if get_sql_param:
|
|
107 return param, param_val_str
|
|
108 return param
|
|
109
|
|
110
|
|
111 def handle_null(val):
|
|
112 if set_to_null(val):
|
|
113 return None
|
|
114 return val
|
|
115
|
|
116
|
|
117 def run_command(cmd):
|
|
118 fstderr, fherr, fstdout, fhout = get_response_buffers()
|
|
119 proc = subprocess.Popen(args=cmd, stderr=fherr, stdout=fhout, shell=True)
|
|
120 rc = proc.wait()
|
|
121 # Check results.
|
|
122 fherr.close()
|
|
123 fhout.close()
|
|
124 check_execution_errors(rc, fstderr, fstdout)
|
|
125
|
|
126
|
|
127 def set_to_null(val):
|
|
128 if val in ["", "NA", "NULL"]:
|
|
129 return True
|
|
130 return False
|
|
131
|
|
132
|
|
133 def split_line(line, sep="\t"):
|
|
134 # Remove R quote chars.
|
|
135 items = line.split(sep)
|
|
136 unquoted_items = []
|
|
137 for item in items:
|
|
138 unquoted_items.append(item.strip('"'))
|
|
139 return unquoted_items
|
|
140
|
|
141
|
|
142 def string_as_bool(string):
|
|
143 if str(string).lower() in ('true', 'yes', 'on', '1'):
|
|
144 return True
|
|
145 else:
|
|
146 return False
|
|
147
|
|
148
|
|
149 class StagDatabaseUpdater(object):
|
|
150 def __init__(self):
|
|
151 self.args = None
|
|
152 self.conn = None
|
|
153 self.parse_args()
|
|
154 self.year_from_now = get_year_from_now()
|
|
155 self.db_name = None
|
|
156 self.db_storage_dir = None
|
|
157 self.get_config_settings()
|
|
158 self.outfh = open(self.args.output, "w")
|
|
159 self.connect_db()
|
|
160 self.engine = create_engine(self.args.database_connection_string)
|
|
161 self.metadata = MetaData(self.engine)
|
|
162 self.affy_ids = []
|
|
163 self.allele_ids = []
|
|
164 self.colony_ids = []
|
|
165 self.experiment_ids = []
|
|
166 self.genotype_ids = []
|
|
167 self.person_ids = []
|
|
168 self.phenotype_ids = []
|
|
169 self.reef_ids = []
|
|
170 self.taxonomy_ids = []
|
|
171
|
|
172 def connect_db(self):
|
|
173 url = make_url(self.args.database_connection_string)
|
|
174 self.log('Attempting to connect to the database...')
|
|
175 args = url.translate_connect_args(username='user')
|
|
176 args.update(url.query)
|
|
177 assert url.get_dialect().name == 'postgresql', 'This script can only be used with PostgreSQL.'
|
|
178 self.conn = psycopg2.connect(**args)
|
|
179 self.log("Successfully connected to the database...")
|
|
180
|
|
181 def convert_date_string_for_database(self, date_string):
|
|
182 # The value of date_string is %y/%m/%d with
|
|
183 # the year being 2 digits (yikes!).
|
|
184 fixed_century = "20%s" % date_string
|
|
185 fixed_date = fixed_century.replace("/", "-")
|
|
186 # Convert the string to a format required for
|
|
187 # inserting into the database.
|
|
188 database_format = dateutil.parser.parse(fixed_date)
|
|
189 return str(database_format)
|
|
190
|
|
191 def flush(self):
|
|
192 self.conn.commit()
|
|
193
|
|
194 def export_database(self):
|
|
195 # Export the database to the configured storage location.
|
|
196 if not os.path.isdir(self.db_storage_dir):
|
|
197 os.makedirs(self.db_storage_dir)
|
|
198 db_storage_path = os.path.join(self.db_storage_dir, "exported_%s_db" % self.db_name)
|
|
199 cmd = "pg_dump %s -f %s" % (self.db_name, db_storage_path)
|
|
200 run_command(cmd)
|
|
201
|
|
202 def get_config_settings(self):
|
|
203 config_defaults = get_config_settings(self.args.config_file)
|
|
204 self.db_name = get_value_from_config(config_defaults, 'DB_NAME')
|
1
|
205 base_storage_dir = get_value_from_config(config_defaults, 'DB_STORAGE_DIR')
|
|
206 # Use the date to name the storage directory to
|
|
207 # enable storing a file per day (multiple runs
|
|
208 # per day will overwrite the existing file.
|
|
209 date_str = datetime.datetime.now().strftime("%Y_%m_%d")
|
|
210 self.db_storage_dir = os.path.join(base_storage_dir, date_str)
|
0
|
211
|
|
212 def get_next_sample_id(self):
|
|
213 cmd = "SELECT sample_id FROM sample ORDER by id DESC;"
|
|
214 cur = self.conn.cursor()
|
|
215 cur.execute(cmd)
|
|
216 try:
|
|
217 last_sample_id = cur.fetchone()[0]
|
|
218 # The value of last_sample_id will be something like A10171.
|
|
219 last_sample_id_num = int(last_sample_id.lstrip("A"))
|
|
220 next_sample_id_num = last_sample_id_num + 1
|
|
221 next_sample_id = "A%d" % next_sample_id_num
|
|
222 except Exception:
|
|
223 next_sample_id = "A10000"
|
|
224 return next_sample_id
|
|
225
|
|
226 def log(self, msg):
|
|
227 self.outfh.write("%s\n" % msg)
|
|
228
|
|
229 def update_allele_table(self, file_path):
|
|
230 self.log("Updating the allele table...")
|
|
231 # Columns in the experiment file are:
|
|
232 # affy_id allele
|
|
233 allele_table_inserts = 0
|
|
234 # The allele.tabular file contains a subset of the number of samples
|
|
235 # to be inserted. This is because those samples that failed will not
|
|
236 # be included in the file. Failed samples will have an affy_id value
|
|
237 # of NA in self.affy_ids, which was generated when the genotype.tabular
|
|
238 # file was processed, so we'll use that list to build the correct list
|
|
239 # of self.allele_ids for later use when inserting into the sample table.
|
|
240 fh = open(file_path, "r")
|
|
241 # Skip the header
|
|
242 fh.readline()
|
|
243 for id_index, affy_id in enumerate(self.affy_ids):
|
|
244 if set_to_null(affy_id):
|
|
245 # This is a failed sample, so no allele strings will be
|
|
246 # inserted, and we'll set the allele_id to the default
|
|
247 # empty string.
|
|
248 self.allele_ids.append("")
|
|
249 continue
|
|
250 # See if we need to add a row to the table. The affy_id value
|
|
251 # should not exist in the sample table.
|
|
252 cmd = "SELECT allele_id FROM sample WHERE affy_id = '%s';" % affy_id
|
|
253 cur = self.conn.cursor()
|
|
254 cur.execute(cmd)
|
|
255 try:
|
|
256 allele_id = cur.fetchone()[0]
|
|
257 except Exception:
|
|
258 # Insert a row into the allele table.
|
|
259 line = fh.readline()
|
|
260 line = line.rstrip()
|
|
261 items = split_line(line)
|
|
262 allele = items[1]
|
|
263 cmd = "INSERT INTO allele VALUES (nextval('allele_id_seq'), %s, %s, %s) RETURNING id;"
|
|
264 args = ['NOW()', 'NOW()', allele]
|
|
265 cur = self.update(cmd, args)
|
|
266 self.flush()
|
|
267 allele_id = cur.fetchone()[0]
|
|
268 allele_table_inserts += 1
|
|
269 self.allele_ids.append(allele_id)
|
|
270 self.log("Inserted %d rows into the allele table..." % allele_table_inserts)
|
|
271
|
|
272 def update_colony_table(self, file_path):
|
|
273 self.log("Updating the colony table...")
|
|
274 # Columns in the colony file are:
|
|
275 # latitude longitude depth geographic_origin
|
|
276 # The geographic_origin value is used for deciding into which table
|
|
277 # to insert the latitude and longitude values. If the geographic_origin
|
|
278 # is "colony", the values will be inserted into the colony table.
|
|
279 colony_table_inserts = 0
|
|
280 with open(file_path) as fh:
|
|
281 for i, line in enumerate(fh):
|
|
282 if i == 0:
|
|
283 # Skip header
|
|
284 continue
|
|
285 # Keep track of foreign keys since we skip the header line.
|
|
286 id_index = i - 1
|
|
287 line = line.rstrip()
|
|
288 items = split_line(line)
|
|
289 geographic_origin = items[3]
|
|
290 if set_to_null(geographic_origin):
|
|
291 geographic_origin = "reef"
|
|
292 else:
|
|
293 geographic_origin = geographic_origin.lower()
|
|
294 if geographic_origin == "colony":
|
|
295 latitude = "%6f" % float(items[0])
|
|
296 longitude = "%6f" % float(items[1])
|
|
297 else:
|
|
298 latitude = DEFAULT_MISSING_NUMERIC_VALUE
|
|
299 longitude = DEFAULT_MISSING_NUMERIC_VALUE
|
2
|
300 depth = handle_column_value(items[2], get_sql_param=False, default=-9.0)
|
0
|
301 reef_id = self.reef_ids[id_index]
|
|
302 # See if we need to add a row to the table.
|
|
303 cmd = "SELECT id FROM colony WHERE latitude = %s " % latitude
|
|
304 cmd += "AND longitude = %s AND depth = %s " % (longitude, depth)
|
|
305 cmd += "AND reef_id = %s;" % reef_id
|
|
306 cur = self.conn.cursor()
|
|
307 cur.execute(cmd)
|
|
308 try:
|
|
309 colony_id = cur.fetchone()[0]
|
|
310 except Exception:
|
|
311 # Insert a row into the colony table.
|
|
312 cmd = "INSERT INTO colony VALUES (nextval('colony_id_seq'), %s, %s, %s, %s, %s, %s) RETURNING id;"
|
7
|
313 args = ['NOW()', 'NOW()', latitude, longitude, depth, reef_id]
|
0
|
314 cur = self.update(cmd, args)
|
|
315 self.flush()
|
|
316 colony_id = cur.fetchone()[0]
|
|
317 colony_table_inserts += 1
|
|
318 self.colony_ids.append(colony_id)
|
|
319 self.log("Inserted %d rows into the colony table..." % colony_table_inserts)
|
|
320
|
|
321 def update_experiment_table(self, file_path):
|
|
322 self.log("Updating the experiment table...")
|
|
323 # Columns in the experiment file are:
|
|
324 # seq_facility array_version result_folder_name plate_barcode
|
|
325 experiment_table_inserts = 0
|
|
326 with open(file_path) as fh:
|
|
327 for i, line in enumerate(fh):
|
|
328 if i == 0:
|
|
329 # Skip header
|
|
330 continue
|
|
331 # Keep track of foreign keys since we skip the header line.
|
|
332 line = line.rstrip()
|
|
333 items = split_line(line)
|
|
334 seq_facility, seq_facility_param_val_str = handle_column_value(items[0])
|
|
335 array_version, array_version_param_val_str = handle_column_value(items[1])
|
|
336 result_folder_name, result_folder_name_param_val_str = handle_column_value(items[2])
|
|
337 plate_barcode, plate_barcode_param_val_str = handle_column_value(items[3])
|
|
338 # See if we need to add a row to the table.
|
|
339 cmd = "SELECT id FROM experiment WHERE seq_facility %s " % seq_facility_param_val_str
|
|
340 cmd += "AND array_version %s " % array_version_param_val_str
|
|
341 cmd += "AND result_folder_name %s " % result_folder_name_param_val_str
|
|
342 cmd += "AND plate_barcode %s;" % plate_barcode_param_val_str
|
|
343 cur = self.conn.cursor()
|
|
344 cur.execute(cmd)
|
|
345 try:
|
|
346 experiment_id = cur.fetchone()[0]
|
|
347 except Exception:
|
|
348 # Insert a row into the experiment table.
|
|
349 cmd = "INSERT INTO experiment VALUES (nextval('experiment_id_seq'), %s, %s, %s, %s, %s, %s) RETURNING id;"
|
|
350 args = ['NOW()', 'NOW()', seq_facility, array_version, result_folder_name, plate_barcode]
|
|
351 cur = self.update(cmd, args)
|
|
352 self.flush()
|
|
353 experiment_id = cur.fetchone()[0]
|
|
354 experiment_table_inserts += 1
|
|
355 self.experiment_ids.append(experiment_id)
|
|
356 self.log("Inserted %d rows into the experiment table..." % experiment_table_inserts)
|
|
357
|
|
358 def update_genotype_table(self, file_path):
|
|
359 self.log("Updating the genotype table...")
|
|
360 # Columns in the genotype file are:
|
|
361 # affy_id coral_mlg_clonal_id user_specimen_id db_match genetic_coral_species_call
|
|
362 # coral_mlg_rep_sample_id
|
|
363 genotype_table_inserts = 0
|
|
364 with open(file_path) as fh:
|
|
365 for i, line in enumerate(fh):
|
|
366 if i == 0:
|
|
367 # Skip header
|
|
368 continue
|
|
369 line = line.rstrip()
|
|
370 items = split_line(line)
|
|
371 # Keep an in-memory list of affy_ids for use
|
|
372 # when updating the allele table.
|
|
373 self.affy_ids.append(items[0])
|
|
374 coral_mlg_clonal_id = items[1]
|
|
375 # The value of db_match will be "no_match" if
|
|
376 # a new row should be inserted into the table.
|
|
377 db_match = items[3].lower()
|
|
378 genetic_coral_species_call = handle_column_value(items[4], get_sql_param=False)
|
|
379 coral_mlg_rep_sample_id = handle_column_value(items[5], get_sql_param=False)
|
|
380 if db_match == "failed":
|
|
381 # Handle the special case of a failed sample.
|
|
382 cmd = "SELECT id FROM genotype WHERE coral_mlg_clonal_id = 'failed'"
|
|
383 else:
|
|
384 cmd = "SELECT id FROM genotype WHERE coral_mlg_clonal_id = '%s'" % coral_mlg_clonal_id
|
|
385 # See if we need to add a row to the table.
|
|
386 cur = self.conn.cursor()
|
|
387 cur.execute(cmd)
|
|
388 try:
|
|
389 genotype_id = cur.fetchone()[0]
|
|
390 if db_match == "failed":
|
|
391 val = db_match
|
|
392 else:
|
|
393 val = "match"
|
|
394 self.log("Found genotype row with id %d, value of db_match: %s, should be %s." % (genotype_id, db_match, val))
|
|
395 except Exception:
|
|
396 # Insert a row into the genotype table.
|
|
397 cmd = "INSERT INTO genotype VALUES (nextval('genotype_id_seq'), NOW(), NOW(), "
|
|
398 cmd += "'%s', '%s', '%s') RETURNING id;"
|
|
399 cmd = cmd % (coral_mlg_clonal_id, coral_mlg_rep_sample_id, genetic_coral_species_call)
|
|
400 args = [coral_mlg_clonal_id, coral_mlg_rep_sample_id, genetic_coral_species_call]
|
|
401 cur = self.update(cmd, args)
|
|
402 self.flush()
|
|
403 genotype_id = cur.fetchone()[0]
|
|
404 if db_match == "failed":
|
|
405 val = db_match
|
|
406 else:
|
|
407 val = "no_match"
|
|
408 self.log("Inserted genotype row with id %d, value of db_match: %s, should be %s." % (genotype_id, db_match, val))
|
|
409 genotype_table_inserts += 1
|
|
410 self.genotype_ids.append(genotype_id)
|
|
411 self.log("Inserted %d rows into the genotype table..." % genotype_table_inserts)
|
|
412
|
|
413 def update_person_table(self, file_path):
|
|
414 self.log("Updating the person table...")
|
|
415 # Columns in the person file are:
|
|
416 # last_name first_name organization email
|
|
417 person_table_inserts = 0
|
|
418 with open(file_path) as fh:
|
|
419 for i, line in enumerate(fh):
|
|
420 if i == 0:
|
|
421 # Skip header
|
|
422 continue
|
|
423 line = line.rstrip()
|
|
424 items = split_line(line)
|
|
425 last_name = items[0]
|
|
426 first_name = items[1]
|
|
427 organization = items[2]
|
|
428 email = items[3]
|
|
429 # See if we need to add a row to the table.
|
|
430 cmd = "SELECT id FROM person WHERE email = '%s';" % email
|
|
431 cur = self.conn.cursor()
|
|
432 cur.execute(cmd)
|
|
433 try:
|
|
434 person_id = cur.fetchone()[0]
|
|
435 except Exception:
|
|
436 # Insert a row into the person table.
|
|
437 cmd = "INSERT INTO person VALUES (nextval('person_id_seq'), NOW(), NOW(), "
|
|
438 cmd += "%s, %s, %s, %s) RETURNING id;"
|
|
439 args = [last_name, first_name, organization, email]
|
|
440 cur = self.update(cmd, args)
|
|
441 self.flush()
|
|
442 person_id = cur.fetchone()[0]
|
|
443 person_table_inserts += 1
|
|
444 self.person_ids.append(person_id)
|
|
445 self.log("Inserted %d rows into the person table..." % person_table_inserts)
|
|
446
|
|
447 def update_phenotype_table(self, file_path):
|
|
448 self.log("Updating the phenotype table...")
|
|
449 # Columns in the phenotype file are:
|
|
450 # disease_resist bleach_resist mortality tle spawning sperm_motility healing_time
|
|
451 phenotype_table_inserts = 0
|
|
452 with open(file_path) as fh:
|
|
453 for i, line in enumerate(fh):
|
|
454 if i == 0:
|
|
455 # Skip header
|
|
456 continue
|
|
457 line = line.rstrip()
|
|
458 items = split_line(line)
|
|
459 disease_resist, disease_resist_param_val_str = handle_column_value(items[0], default=-9)
|
|
460 bleach_resist, bleach_resist_param_val_str = handle_column_value(items[1], default=-9)
|
|
461 mortality, mortality_param_val_str = handle_column_value(items[2], default=-9)
|
|
462 tle, tle_param_val_str = handle_column_value(items[3], default=-9)
|
|
463 spawning, spawning_param_val_str = handle_column_value(items[4])
|
|
464 sperm_motility, sperm_motility_param_val_str = handle_column_value(items[5], default=-9.0)
|
|
465 healing_time, healing_time_param_val_str = handle_column_value(items[6], default=-9.0)
|
|
466 # See if we need to add a row to the phenotype table.
|
|
467 cmd = " SELECT id FROM phenotype WHERE disease_resist %s "
|
|
468 cmd += "AND bleach_resist %s AND mortality %s AND tle %s "
|
|
469 cmd += "AND spawning %s AND sperm_motility %s AND healing_time %s;"
|
|
470 cmd = cmd % (disease_resist_param_val_str, bleach_resist_param_val_str,
|
|
471 mortality_param_val_str, tle_param_val_str, spawning_param_val_str,
|
|
472 sperm_motility_param_val_str, healing_time_param_val_str)
|
|
473 cur = self.conn.cursor()
|
|
474 cur.execute(cmd)
|
|
475 try:
|
|
476 phenotype_id = cur.fetchone()[0]
|
|
477 except Exception:
|
|
478 # Insert a row into the phenotype table.
|
|
479 cmd = "INSERT INTO phenotype VALUES (nextval('phenotype_id_seq'), NOW(), NOW(), "
|
|
480 cmd += "%s, %s, %s, %s, %s, %s, %s) RETURNING id;"
|
|
481 args = [disease_resist, bleach_resist, mortality, tle, spawning, sperm_motility, healing_time]
|
|
482 cur = self.update(cmd, args)
|
|
483 self.flush()
|
|
484 phenotype_id = cur.fetchone()[0]
|
|
485 phenotype_table_inserts += 1
|
|
486 self.phenotype_ids.append(phenotype_id)
|
|
487 self.log("Inserted %d rows into the phenotype table..." % phenotype_table_inserts)
|
|
488
|
|
489 def update_reef_table(self, file_path):
|
|
490 self.log("Updating the reef table...")
|
|
491 # Columns in the reef file are:
|
|
492 # name region latitude longitude geographic_origin
|
|
493 # The geographic_origin value is used for deciding into which table
|
|
494 # to insert the latitude and longitude values. If the geographic_origin
|
|
495 # is "reef", the values will be inserted into the reef table.
|
|
496 reef_table_inserts = 0
|
|
497 with open(file_path) as fh:
|
|
498 for i, line in enumerate(fh):
|
|
499 if i == 0:
|
|
500 # Skip header
|
|
501 continue
|
|
502 line = line.rstrip()
|
|
503 items = split_line(line)
|
|
504 name = items[0]
|
|
505 region = items[1]
|
|
506 geographic_origin = items[4]
|
|
507 if set_to_null(geographic_origin):
|
|
508 geographic_origin = "reef"
|
|
509 else:
|
|
510 geographic_origin = geographic_origin.lower()
|
|
511 if geographic_origin == "reef":
|
|
512 latitude = "%6f" % float(items[2])
|
|
513 longitude = "%6f" % float(items[3])
|
|
514 else:
|
|
515 latitude = DEFAULT_MISSING_NUMERIC_VALUE
|
|
516 longitude = DEFAULT_MISSING_NUMERIC_VALUE
|
|
517 # See if we need to add a row to the reef table.
|
|
518 cmd = "SELECT id FROM reef WHERE name = $$%s$$ AND region = '%s' " % (name, region)
|
|
519 cmd += "AND latitude = %s AND longitude = %s " % (latitude, longitude)
|
|
520 cmd += "AND geographic_origin = '%s';" % geographic_origin
|
|
521 cur = self.conn.cursor()
|
|
522 cur.execute(cmd)
|
|
523 try:
|
|
524 reef_id = cur.fetchone()[0]
|
|
525 except Exception:
|
|
526 # Insert a row into the reef table.
|
|
527 cmd = "INSERT INTO reef VALUES (nextval('reef_id_seq'), %s, %s, %s, %s, %s, %s, %s) RETURNING id;"
|
|
528 args = ['NOW()', 'NOW()', name, region, latitude, longitude, geographic_origin]
|
|
529 cur = self.update(cmd, args)
|
|
530 self.flush()
|
|
531 reef_id = cur.fetchone()[0]
|
|
532 reef_table_inserts += 1
|
|
533 self.reef_ids.append(reef_id)
|
|
534 self.log("Inserted %d rows into the reef table..." % reef_table_inserts)
|
|
535
|
|
536 def update_sample_table(self, file_path):
|
|
537 self.log("Updating the sample table...")
|
|
538 # Columns in the sample file are:
|
|
539 # affy_id colony_location collection_date user_specimen_id registry_id
|
|
540 # depth dna_extraction_method dna_concentration public public_after_date
|
3
|
541 # percent_missing_data_coral percent_missing_data_sym percent_acerv_coral percent_reference_sym percent_apalm_coral
|
0
|
542 # percent_alternative_sym percent_heterozygous_coral percent_heterozygous_sym field_call, bcoral_genet_id
|
|
543 sample_table_inserts = 0
|
|
544 with open(file_path) as fh:
|
|
545 for i, line in enumerate(fh):
|
|
546 if i == 0:
|
|
547 # Skip header
|
|
548 continue
|
|
549 line = line.rstrip()
|
|
550 # Keep track of foreign keys since we skip the header line.
|
|
551 id_index = i - 1
|
|
552 items = split_line(line)
|
|
553 sample_id = self.get_next_sample_id()
|
|
554 allele_id = self.allele_ids[id_index]
|
|
555 genotype_id = self.genotype_ids[id_index]
|
|
556 phenotype_id = self.phenotype_ids[id_index]
|
|
557 experiment_id = self.experiment_ids[id_index]
|
|
558 colony_id = self.colony_ids[id_index]
|
|
559 colony_location = handle_column_value(items[1], get_sql_param=False)
|
|
560 taxonomy_id = self.taxonomy_ids[id_index]
|
|
561 collector_id = self.person_ids[id_index]
|
|
562 collection_date = items[2]
|
|
563 user_specimen_id = items[3]
|
|
564 affy_id = handle_column_value(items[0], get_sql_param=False, default="%s_%s" % (sample_id, user_specimen_id))
|
|
565 registry_id = handle_column_value(items[4], get_sql_param=False, default=-9)
|
2
|
566 depth = handle_column_value(items[5], get_sql_param=False, default=-9.0)
|
0
|
567 dna_extraction_method = handle_column_value(items[6], get_sql_param=False)
|
|
568 dna_concentration = handle_column_value(items[7], get_sql_param=False)
|
|
569 public = items[8]
|
|
570 if string_as_bool(public):
|
|
571 public_after_date = ''
|
|
572 else:
|
|
573 if set_to_null(items[9]):
|
|
574 public_after_date = self.year_from_now
|
|
575 else:
|
|
576 public_after_date = items[9]
|
|
577 percent_missing_data_coral = handle_column_value(items[10], get_sql_param=False)
|
|
578 percent_missing_data_sym = handle_column_value(items[11], get_sql_param=False)
|
3
|
579 percent_acerv_coral = handle_column_value(items[12], get_sql_param=False)
|
0
|
580 percent_reference_sym = handle_column_value(items[13], get_sql_param=False)
|
3
|
581 percent_apalm_coral = handle_column_value(items[14], get_sql_param=False)
|
0
|
582 percent_alternative_sym = handle_column_value(items[15], get_sql_param=False)
|
|
583 percent_heterozygous_coral = handle_column_value(items[16], get_sql_param=False)
|
|
584 percent_heterozygous_sym = handle_column_value(items[17], get_sql_param=False)
|
|
585 field_call = handle_column_value(items[18], get_sql_param=False)
|
|
586 bcoral_genet_id = handle_column_value(items[19], get_sql_param=False)
|
|
587 # Insert a row into the sample table.
|
|
588 cmd = "INSERT INTO sample VALUES (nextval('sample_id_seq'), %s, %s, %s, %s, %s, %s, %s, "
|
|
589 cmd += "%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, "
|
|
590 cmd += "%s, %s, %s, %s) RETURNING id;"
|
|
591 args = ['NOW()', 'NOW()', affy_id, sample_id, allele_id, genotype_id, phenotype_id,
|
|
592 experiment_id, colony_id, colony_location, taxonomy_id, collector_id,
|
|
593 collection_date, user_specimen_id, registry_id, depth,
|
|
594 dna_extraction_method, dna_concentration, public, public_after_date,
|
3
|
595 percent_missing_data_coral, percent_missing_data_sym, percent_acerv_coral,
|
|
596 percent_reference_sym, percent_apalm_coral, percent_alternative_sym,
|
0
|
597 percent_heterozygous_coral, percent_heterozygous_sym, field_call, bcoral_genet_id]
|
|
598 cur = self.update(cmd, args)
|
|
599 self.flush()
|
|
600 sample_id = cur.fetchone()[0]
|
|
601 sample_table_inserts += 1
|
|
602 self.log("Inserted %d rows into the sample table..." % sample_table_inserts)
|
|
603
|
|
604 def update_taxonomy_table(self, file_path):
|
|
605 self.log("Updating the taxonomy table...")
|
|
606 # Columns in the taxonomy file are:
|
|
607 # genetic_coral_species_call affy_id genus_name species_name"
|
|
608 taxonomy_table_inserts = 0
|
|
609 with open(file_path) as fh:
|
|
610 for i, line in enumerate(fh):
|
|
611 if i == 0:
|
|
612 # Skip header
|
|
613 continue
|
|
614 line = line.rstrip()
|
|
615 items = split_line(line)
|
|
616 genus_name = handle_column_value(items[2], get_sql_param=False, default='unknown')
|
|
617 species_name = handle_column_value(items[3], get_sql_param=False, default='unknown')
|
|
618 # See if we need to add a row to the taxonomy table.
|
|
619 cmd = "SELECT id FROM taxonomy WHERE species_name = '%s' AND genus_name = '%s';" % (species_name, genus_name)
|
|
620 cur = self.conn.cursor()
|
|
621 cur.execute(cmd)
|
|
622 try:
|
|
623 taxonomy_id = cur.fetchone()[0]
|
|
624 except Exception:
|
|
625 # Insert a row into the taxonomy table.
|
|
626 cmd = "INSERT INTO taxonomy VALUES (nextval('taxonomy_id_seq'), %s, %s, %s, %s) RETURNING id;"
|
|
627 args = ['NOW()', 'NOW()', species_name, genus_name]
|
|
628 cur = self.update(cmd, args)
|
|
629 self.flush()
|
|
630 taxonomy_id = cur.fetchone()[0]
|
|
631 taxonomy_table_inserts += 1
|
|
632 self.taxonomy_ids.append(taxonomy_id)
|
|
633 self.log("Inserted %d rows into the taxonomy table..." % taxonomy_table_inserts)
|
|
634
|
|
635 def parse_args(self):
|
|
636 parser = argparse.ArgumentParser()
|
|
637 parser.add_argument('--config_file', dest='config_file', help='usd_config.ini'),
|
|
638 parser.add_argument('--database_connection_string', dest='database_connection_string', help='Postgres database connection string'),
|
|
639 parser.add_argument('--input_dir', dest='input_dir', help='Input datasets for database insertion')
|
|
640 parser.add_argument('--output', dest='output', help='Output dataset'),
|
|
641 self.args = parser.parse_args()
|
|
642
|
|
643 def run(self):
|
|
644 self.export_database()
|
|
645 input_dir = self.args.input_dir
|
|
646 for file_name in os.listdir(input_dir):
|
|
647 # Tables must be loaded in such a way that foreign keys
|
|
648 # are properly handled. The sample table must be loaded
|
|
649 # last.
|
|
650 if file_name.startswith("allele"):
|
|
651 allele_file = os.path.join(input_dir, file_name)
|
|
652 if file_name.startswith("colony"):
|
|
653 colony_file = os.path.join(input_dir, file_name)
|
|
654 if file_name.startswith("experiment"):
|
|
655 experiment_file = os.path.join(input_dir, file_name)
|
|
656 if file_name.startswith("genotype"):
|
|
657 genotype_file = os.path.join(input_dir, file_name)
|
|
658 elif file_name.startswith("person"):
|
|
659 person_file = os.path.join(input_dir, file_name)
|
|
660 elif file_name.startswith("phenotype"):
|
|
661 phenotype_file = os.path.join(input_dir, file_name)
|
|
662 elif file_name.startswith("reef"):
|
|
663 reef_file = os.path.join(input_dir, file_name)
|
|
664 elif file_name.startswith("sample"):
|
|
665 sample_file = os.path.join(input_dir, file_name)
|
|
666 elif file_name.startswith("taxonomy"):
|
|
667 taxonomy_file = os.path.join(input_dir, file_name)
|
|
668 # Now tables can be loaded in the appropriate order.
|
|
669 self.update_experiment_table(experiment_file)
|
|
670 self.update_genotype_table(genotype_file)
|
|
671 self.update_allele_table(allele_file)
|
|
672 self.update_person_table(person_file)
|
|
673 self.update_phenotype_table(phenotype_file)
|
|
674 self.update_reef_table(reef_file)
|
|
675 self.update_colony_table(colony_file)
|
|
676 self.update_taxonomy_table(taxonomy_file)
|
|
677 self.update_sample_table(sample_file)
|
|
678
|
|
679 def shutdown(self):
|
|
680 self.log("Shutting down...")
|
|
681 self.outfh.flush()
|
|
682 self.outfh.close()
|
|
683 self.conn.close()
|
|
684
|
|
685 def update(self, cmd, args):
|
|
686 for i, arg in enumerate(args):
|
|
687 args[i] = handle_null(arg)
|
|
688 try:
|
|
689 cur = self.conn.cursor()
|
|
690 cur.execute(cmd, tuple(args))
|
|
691 except Exception as e:
|
|
692 msg = "Caught exception executing SQL:\n%s\nException:\n%s\n" % (cmd.format(args), e)
|
|
693 sys.stderr.write(msg)
|
|
694 self.outfh.flush()
|
|
695 self.outfh.close()
|
|
696 self.conn.close()
|
|
697 sys.exit(1)
|
|
698 return cur
|
|
699
|
|
700
|
|
701 if __name__ == '__main__':
|
|
702 sdu = StagDatabaseUpdater()
|
|
703 sdu.run()
|
|
704 sdu.shutdown()
|