Mercurial > repos > iuc > data_manager_nextclade
comparison data_manager/nextclade_dm.py @ 0:4de9e77bcc9e draft
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_nextclade commit 3d6dabd066dcbe31cfa38fbfac340e253d8a984d
| author | iuc |
|---|---|
| date | Sat, 30 Jul 2022 08:09:42 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4de9e77bcc9e |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 import argparse | |
| 4 import datetime | |
| 5 import json | |
| 6 import operator | |
| 7 import pathlib | |
| 8 import subprocess | |
| 9 import sys | |
| 10 from typing import List | |
| 11 | |
| 12 | |
| 13 def parse_date(d: str) -> datetime.datetime: | |
| 14 # Parses the publication date from the nextclade release tags or user input into a datetime object. | |
| 15 date = None | |
| 16 try: | |
| 17 date = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%SZ") | |
| 18 except ValueError: | |
| 19 date = datetime.datetime.strptime(d, "%Y-%m-%d") | |
| 20 return date | |
| 21 | |
| 22 | |
| 23 def entry_to_tag(entry: dict) -> str: | |
| 24 return ( | |
| 25 entry["attributes"]["name"]["value"] + "_" + entry["attributes"]["tag"]["value"] | |
| 26 ) | |
| 27 | |
| 28 | |
| 29 def get_database_list() -> List[dict]: | |
| 30 list_cmd = [ | |
| 31 "nextclade", | |
| 32 "dataset", | |
| 33 "list", | |
| 34 "--json", | |
| 35 "--include-old", | |
| 36 "--include-incompatible", | |
| 37 ] | |
| 38 list_proc = subprocess.run(list_cmd, capture_output=True, check=True) | |
| 39 database_list = json.loads(list_proc.stdout) | |
| 40 entry_list = [] | |
| 41 for db_entry in database_list: | |
| 42 attributes = db_entry["attributes"] | |
| 43 entry = { | |
| 44 "value": entry_to_tag(db_entry), | |
| 45 "database_name": attributes["name"]["value"], | |
| 46 "description": attributes["name"]["valueFriendly"], | |
| 47 "date": datetime.datetime.fromisoformat( | |
| 48 attributes["tag"]["value"].replace("Z", "") | |
| 49 ), | |
| 50 "tag": attributes["tag"]["value"], | |
| 51 "min_nextclade_version": db_entry["compatibility"]["nextcladeCli"]["min"], | |
| 52 } | |
| 53 entry_list.append(entry) | |
| 54 return entry_list | |
| 55 | |
| 56 | |
| 57 def filter_by_date( | |
| 58 existing_release_tags: List[str], | |
| 59 name: str, | |
| 60 releases: list, | |
| 61 start_date: datetime.datetime = None, | |
| 62 end_date: datetime.datetime = None, | |
| 63 ) -> List[dict]: | |
| 64 ret = [] | |
| 65 for release in releases: | |
| 66 if ( | |
| 67 release["database_name"] != name | |
| 68 or release["value"] in existing_release_tags | |
| 69 ): | |
| 70 continue | |
| 71 if start_date and release["date"] < start_date: | |
| 72 break | |
| 73 if not end_date or release["date"] <= end_date: | |
| 74 ret.append(release) | |
| 75 | |
| 76 return ret | |
| 77 | |
| 78 | |
| 79 def download_and_unpack(name: str, release: str, output_directory: str) -> pathlib.Path: | |
| 80 download_cmd = [ | |
| 81 "nextclade", | |
| 82 "dataset", | |
| 83 "get", | |
| 84 "--name", | |
| 85 name, | |
| 86 "--tag", | |
| 87 release, | |
| 88 "--output-dir", | |
| 89 ] | |
| 90 output_path = pathlib.Path(output_directory) / ( | |
| 91 name + "_" + release.replace(":", "-") | |
| 92 ) | |
| 93 download_cmd.append(str(output_path)) | |
| 94 subprocess.run(download_cmd, check=True) | |
| 95 return output_path | |
| 96 | |
| 97 | |
| 98 def comma_split(args: str) -> List[str]: | |
| 99 return args.split(",") | |
| 100 | |
| 101 | |
| 102 if __name__ == "__main__": | |
| 103 | |
| 104 parser = argparse.ArgumentParser() | |
| 105 parser.add_argument("--testmode", default=False, action="store_true") | |
| 106 parser.add_argument("--latest", default=False, action="store_true") | |
| 107 parser.add_argument("--start_date", type=parse_date) | |
| 108 parser.add_argument("--end_date", type=parse_date) | |
| 109 parser.add_argument("--known_revisions", type=comma_split) | |
| 110 parser.add_argument("--datasets", type=comma_split, default=["sars-cov-2"]) | |
| 111 parser.add_argument("datatable_name", default="nextclade") | |
| 112 parser.add_argument("galaxy_config") | |
| 113 args = parser.parse_args() | |
| 114 | |
| 115 # known-revisions is populated from the Galaxy data table by the wrapper | |
| 116 if args.known_revisions is not None: | |
| 117 existing_release_tags = set(args.known_revisions) | |
| 118 else: | |
| 119 existing_release_tags = set() | |
| 120 | |
| 121 releases_available = get_database_list() | |
| 122 if args.testmode: | |
| 123 releases = [] | |
| 124 for name in args.datasets: | |
| 125 releases.extend( | |
| 126 filter_by_date( | |
| 127 [], | |
| 128 name, | |
| 129 releases_available, | |
| 130 start_date=args.start_date, | |
| 131 end_date=args.end_date, | |
| 132 ) | |
| 133 ) | |
| 134 for release in releases: | |
| 135 print( | |
| 136 release["value"], | |
| 137 release["description"], | |
| 138 release["date"].isoformat(), | |
| 139 release["min_nextclade_version"], | |
| 140 ) | |
| 141 sys.exit(0) | |
| 142 | |
| 143 with open(args.galaxy_config) as fh: | |
| 144 config = json.load(fh) | |
| 145 | |
| 146 output_directory = config.get("output_data", [{}])[0].get("extra_files_path", None) | |
| 147 | |
| 148 data_manager_dict = {"data_tables": {args.datatable_name: []}} | |
| 149 | |
| 150 releases = [] | |
| 151 if args.latest: | |
| 152 for dataset in args.datasets: | |
| 153 for release in releases_available: | |
| 154 if release["database_name"] == dataset: | |
| 155 if release["value"] not in existing_release_tags: | |
| 156 # add the latest release for this dataset, but only if we don't already have it | |
| 157 releases.append(release) | |
| 158 break | |
| 159 else: | |
| 160 for dataset in args.datasets: | |
| 161 releases_for_ds = filter_by_date( | |
| 162 existing_release_tags, | |
| 163 dataset, | |
| 164 releases_available, | |
| 165 start_date=args.start_date, | |
| 166 end_date=args.end_date, | |
| 167 ) | |
| 168 releases.extend(releases_for_ds) | |
| 169 | |
| 170 for release in releases: | |
| 171 fname = download_and_unpack( | |
| 172 release["database_name"], release["tag"], output_directory | |
| 173 ) | |
| 174 if fname is not None: | |
| 175 data_manager_dict["data_tables"][args.datatable_name].append( | |
| 176 { | |
| 177 "value": release["value"], | |
| 178 "database_name": release["database_name"], | |
| 179 "description": release["description"], | |
| 180 "min_nextclade_version": release["min_nextclade_version"], | |
| 181 "date": release["date"].isoformat(), # ISO 8601 is easily sortable | |
| 182 "path": str(output_directory / fname), | |
| 183 } | |
| 184 ) | |
| 185 data_manager_dict["data_tables"][args.datatable_name].sort( | |
| 186 key=operator.itemgetter("value"), reverse=True | |
| 187 ) | |
| 188 with open(args.galaxy_config, "w") as fh: | |
| 189 json.dump(data_manager_dict, fh, indent=2, sort_keys=True) |
