Mercurial > repos > iuc > data_manager_gtdbtk_database_installer
view data_manager/gtdbtk_database_installer.py @ 8:750d902de22c draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_gtdbtk_database_installer commit 1019bf0fda897582e2bbdc773aebb3e08e285aae
author | iuc |
---|---|
date | Mon, 21 Oct 2024 15:49:53 +0000 |
parents | 3b1d503c6260 |
children |
line wrap: on
line source
#!/usr/bin/env python import argparse import gzip import json import os import shutil import sys import tarfile from datetime import date from urllib.parse import urlparse from urllib.request import HTTPError, Request, urlopen # rather provide the urls based on the release, less error potential for the admins ! urls = { "202": { "full": "https://data.gtdb.ecogenomic.org/releases/release202/202.0/auxillary_files/gtdbtk_r202_data.tar.gz", "meta_ar": "https://data.gtdb.ecogenomic.org/releases/release202/202.0/ar122_metadata_r202.tar.gz", "meta_bac": "https://data.gtdb.ecogenomic.org/releases/release202/202.0/bac120_metadata_r202.tar.gz", }, "207": { "full": "https://data.gtdb.ecogenomic.org/releases/release207/207.0/auxillary_files/gtdbtk_r207_data.tar.gz", "meta_ar": "https://data.gtdb.ecogenomic.org/releases/release207/207.0/ar53_metadata_r207.tar.gz", "meta_bac": "https://data.gtdb.ecogenomic.org/releases/release207/207.0/bac120_metadata_r207.tar.gz", }, "214": { "full": "https://data.gtdb.ecogenomic.org/releases/release214/214.0/auxillary_files/gtdbtk_r214_data.tar.gz", "meta_ar": "https://data.gtdb.ecogenomic.org/releases/release214/214.1/ar53_metadata_r214.tsv.gz", "meta_bac": "https://data.gtdb.ecogenomic.org/releases/release214/214.1/bac120_metadata_r214.tsv.gz", }, "220": { "full": "https://data.gtdb.ecogenomic.org/releases/release220/220.0/auxillary_files/gtdbtk_package/full_package/gtdbtk_r220_data.tar.gz", "meta_ar": "https://data.gtdb.ecogenomic.org/releases/release220/220.0/ar53_metadata_r220.tsv.gz", "meta_bac": "https://data.gtdb.ecogenomic.org/releases/release220/220.0/bac120_metadata_r220.tsv.gz", }, } def is_urlfile(url): # Check if online file exists try: r = urlopen(url) # response return r.getcode() < 400 except HTTPError: return False def extract_tar_iteratively(tarball, target_directory): """ Extracts a .tar, .tar.gz, or .tar.bz2 archive iteratively in a memory-efficient manner. This function processes the contents of the archive member-by-member, ensuring only one file or directory is loaded into memory at any given time. It handles the creation of directories and symbolic links, and streams large files to disk in chunks to avoid memory overload. Args: tarball (str): Path to the tar archive (e.g., .tar, .tar.gz, .tar.bz2) to be extracted. target_directory (str): The destination directory where the archive content will be extracted. Raises: OSError: If there is an issue with file or directory creation, or writing to disk. tarfile.TarError: If there is an issue opening or reading the tar archive. Example Usage: extract_tar_iteratively("archive.tar.gz", "/path/to/extract") Notes: - The function supports symbolic and hard links present in the tar archive. - It ensures that directories are created before files are extracted. - Large files are streamed to disk in 1 MB chunks to minimize memory usage. - This function does not return anything but will populate the target directory with the extracted content. """ with tarfile.open(tarball, "r:*") as fh: for member in fh: # Full path to where the member should be extracted member_path = os.path.join(target_directory, member.name) if member.isdir(): # If it's a directory, ensure it exists os.makedirs(member_path, exist_ok=True) elif member.isfile(): # If it's a file, extract it in chunks to avoid memory spikes with fh.extractfile(member) as source, open( member_path, "wb" ) as target: shutil.copyfileobj( source, target, length=1024 * 1024 ) # 1 MB chunks elif member.issym() or member.islnk(): # Handle symlinks or hard links if necessary target_link = os.path.join(target_directory, member.name) if member.issym(): os.symlink(member.linkname, target_link) elif member.islnk(): os.link(member.linkname, target_link) def url_download(url, target_directory, meta): # download the url url_parts = urlparse(url) tarball = os.path.abspath( os.path.join(target_directory, os.path.basename(url_parts.path)) ) src = None dst = None try: req = Request(url) src = urlopen(req) with open(tarball, "wb") as dst: while True: chunk = src.read(2**16) # Read in 64 KB chunks instead of 1 KB if chunk: dst.write(chunk) else: break except Exception as e: sys.exit(str(e)) finally: if src is not None: src.close() # extract the metadata if meta: # extract the content of *.tar.gz into the target dir if tarfile.is_tarfile(tarball): extract_tar_iteratively(tarball, target_directory) os.remove(tarball) return target_directory # return path to output folder # extract the content of *.gz into the target dir elif ".gz" in tarball: with gzip.open(tarball, "rb") as f_in: unzipped_file = tarball.strip(".gz") with open(unzipped_file, "wb") as f_out: shutil.copyfileobj(f_in, f_out) os.remove(tarball) folder_of_unzipped_file = os.path.dirname(unzipped_file) return folder_of_unzipped_file else: sys.exit( "No correct input format for metadata file, must be .tar.gz or .gz" ) else: # handle the DB # extract the content of the folder in the tar.gz into the target dir if tarfile.is_tarfile(tarball): extract_tar_iteratively(tarball, target_directory) os.remove(tarball) else: # handle the test case for the DB return tarball # The tarball extraction will create a directory named # something like release202 in the target_directory, so # we need to move the items in that directory to the # target directory. subdir = next(os.walk(target_directory))[1][0] subdir_path = os.path.join(target_directory, subdir) items = os.listdir(subdir_path) for item in items: item_path = os.path.join(subdir_path, item) shutil.move(item_path, target_directory) os.rmdir(subdir_path) return target_directory def create_data_manager_entry(database_name, release, file_path): time = date.today().strftime("%Y-%m-%d") data_manager_entry = {} data_manager_entry["value"] = ( f"{database_name.replace(' ', '_').lower()}_release_{release}_downloaded_{time}" ) data_manager_entry["name"] = f"{database_name} - release {release} ({time})" data_manager_entry["path"] = file_path data_manager_entry["version"] = release return data_manager_entry def download(release, meta, test, out_file): with open(out_file) as fh: params = json.load(fh) target_directory = params["output_data"][0]["extra_files_path"] os.makedirs(target_directory) if test: # switch the DB to use the test case urls[release][ "full" ] = "https://zenodo.org/records/13734217/files/release220-test.tar.gz" # make use of the test to check if all urls exists for _version, items in urls.items(): for url in items.values(): assert is_urlfile(url) data_manager_json = {"data_tables": {}} # download taxonomy metadata tables if meta: url = urls[release]["meta_ar"] url_download(url, target_directory, meta) url = urls[release]["meta_bac"] file_path = url_download(url, target_directory, meta) data_manager_json["data_tables"]["gtdbtk_database_metadata_versioned"] = [ create_data_manager_entry("Metadata Tables", release, file_path) ] # download the full DB else: url = urls[release]["full"] file_path = url_download(url, target_directory, meta) data_manager_json["data_tables"]["gtdbtk_database_versioned"] = [ create_data_manager_entry("Full Database", release, file_path) ] # store in dedicated metadata table with open(out_file, "w") as fh: json.dump(data_manager_json, fh, sort_keys=True) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--version", dest="version", help="DB version") parser.add_argument( "--release", dest="release", help="Release of the GTDB-Tk database version" ) parser.add_argument("--out_file", dest="out_file", help="JSON output file") parser.add_argument( "--meta", dest="meta", action="store_true", help="Store meta data flag", ) parser.add_argument( "--test", dest="test", action="store_true", help="Run test", ) args = parser.parse_args() download( args.release, args.meta, args.test, args.out_file, )