Mercurial > repos > ufz > lm_get_projects
comparison get_projects.py @ 0:713502c9a98f draft default tip
planemo upload for repository https://github.com/jw44lavo/galaxy-tools/blob/bgo/tools/lambdaminer/ commit 3f45cd07445498cefa73931cadf3d1115245ca1e
author | ufz |
---|---|
date | Mon, 17 Mar 2025 20:35:10 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:713502c9a98f |
---|---|
1 import argparse | |
2 import os | |
3 from json import load | |
4 | |
5 | |
6 import pandas as pd | |
7 import sqlalchemy as db | |
8 | |
9 | |
10 def get_arguments() -> argparse.Namespace: | |
11 """ | |
12 Parse and return the command-line arguments required for the script. | |
13 | |
14 return: argparse.Namespace: Parsed arguments containing credentials_file (str) and login (str). | |
15 """ | |
16 | |
17 parser = argparse.ArgumentParser( | |
18 description="Read the projects of the current user from the Lambda-Miner Database" | |
19 ) | |
20 | |
21 # Add argument for the credentials file | |
22 parser.add_argument( | |
23 "-c", | |
24 "--credentials-file", | |
25 dest="credentials_file", | |
26 type=str, | |
27 required=False, # Optional | |
28 help=( | |
29 "Credential file in JSON format including dialect, user, password, host, port, and " | |
30 "database. If not provided, the environment variable LAMBDAMINER_CREDENTIALS will be " | |
31 "used." | |
32 ) | |
33 ) | |
34 | |
35 # Add argument for the login name | |
36 parser.add_argument( | |
37 "-l", | |
38 "--login-name", | |
39 dest="login", | |
40 type=str, | |
41 required=True, | |
42 help="UFZ login name of the user" | |
43 ) | |
44 | |
45 # Add argument for the output file | |
46 parser.add_argument( | |
47 "-o", | |
48 "--output-file", | |
49 dest="output", | |
50 type=str, | |
51 default="projects.csv", | |
52 help="Specifiy the output file path including the file name (default: 'projects.csv')" | |
53 ) | |
54 | |
55 # Add argument for the type | |
56 parser.add_argument( | |
57 "-t", | |
58 "--type", | |
59 choices=["generic", "import", "calibration", "assignment", "validation", "export"], | |
60 default="generic", | |
61 help="Specify the workflow type (default: 'generic')" | |
62 ) | |
63 | |
64 return parser.parse_args() | |
65 | |
66 | |
67 def parse_check_args(args): | |
68 """ | |
69 Parse and validate command line arguments. | |
70 | |
71 Following actions are performed: | |
72 - Check of the existence of the specified directory in the output path. | |
73 - Assignment the correct credentials file to the arguments. | |
74 | |
75 :param args: command line arguments. | |
76 :type args: argparse.Namespace | |
77 :raises FileNotFoundError: If the specified directory in the output path does not exist. | |
78 """ | |
79 | |
80 # Extract the directory part of the specified output path | |
81 dir_path = os.path.dirname(args.output) or "." | |
82 | |
83 # Check if the directory exists and raise error if not | |
84 if not os.path.isdir(dir_path): | |
85 raise FileNotFoundError(f"Error: The directory does not exist: {dir_path}") | |
86 | |
87 # Get environment variable LAMBDAMINER_CREDENTIALS | |
88 envar_credentials = os.getenv("LAMBDAMINER_CREDENTIALS") | |
89 | |
90 # Use the provided argument or fallback to the environment variable | |
91 args.credentials_file = args.credentials_file or envar_credentials | |
92 | |
93 assert args.credentials_file is not None, "Error: No credentials specified" | |
94 | |
95 | |
96 def get_engine(credentials_path: str, echo: bool = False) -> db.engine.Engine: | |
97 """ | |
98 Create and return a SQLAlchemy engine based on the supplied credentials. | |
99 | |
100 The engine is created using the data from the supplied credentials file, | |
101 which should be in JSON format and include the following keys: | |
102 dialect, user, password, host, port, database | |
103 | |
104 :param credentials_path: The path to the credentials file. | |
105 :type credentials_path: str | |
106 :return: The SQLAlchemy engine object. | |
107 :rtype: sqlalchemy.engine.Engine | |
108 """ | |
109 | |
110 with open(credentials_path) as file: | |
111 credentials = load(file) | |
112 | |
113 dialect = credentials["dialect"] | |
114 username = credentials["user"] | |
115 password = credentials["password"] | |
116 host = credentials["host"] | |
117 port = credentials["port"] | |
118 database_name = credentials["database"] | |
119 | |
120 database_url = f"{dialect}://{username}:{password}@{host}:{port}/{database_name}" | |
121 | |
122 return db.create_engine(database_url, echo=echo) | |
123 | |
124 | |
125 def get_user_id(connection, metadata, login: str) -> int: | |
126 """ | |
127 Retrieve the user_id for a given login. | |
128 | |
129 :param connection: The database connection. | |
130 :param metadata: The database metadata containing table definitions. | |
131 :param login: The login username to search for. | |
132 :return: The user_id if found, otherwise None. | |
133 """ | |
134 | |
135 # Access the 'ufz_user' table from metadata. | |
136 User = metadata.tables["ufz_user"] | |
137 | |
138 # Construct a query to select the user_id where the login matches. | |
139 query = db.select(User.c.user_id).where(User.c.login == login) | |
140 | |
141 # Execute the query and fetch the scalar result. | |
142 result = connection.execute(query).scalar() | |
143 | |
144 # Return the user_id as an integer if found, otherwise return None. | |
145 return int(result) if result else None | |
146 | |
147 | |
148 def get_projects_with_sample_count(connection, metadata, user_id): | |
149 """ | |
150 Retrieve projects and their sample counts for a given user_id. | |
151 | |
152 The query will return a pandas DataFrame with the columns: | |
153 project_id : The id of the project. | |
154 name : The name of the project. | |
155 sample_count : The number of samples associated with the project. | |
156 | |
157 :param connection: The database connection. | |
158 :param metadata: The database metadata containing table definitions. | |
159 :param user_id: The user_id to search for. | |
160 :return: A pandas DataFrame with the projects and their sample counts. | |
161 """ | |
162 | |
163 User = metadata.tables["ufz_user"] | |
164 User_Project = metadata.tables["ufz_user_project"] | |
165 Project = metadata.tables["project"] | |
166 Sample = metadata.tables["sample"] | |
167 | |
168 # Construct the query: | |
169 # 1. Select project_id, name and the count of sample_id as sample_count. | |
170 # 2. Join the tables ufz_user_project, ufz_user, project and sample. | |
171 # - Join ufz_user_project with project on project_id. | |
172 # - Join ufz_user with ufz_user_project on user_id. | |
173 # - Join sample with project on project_id. | |
174 # - Use an outer join to include projects without samples. | |
175 # 3. Filter the results to only include the given user_id. | |
176 # 4. Group the results by project_id and name. | |
177 | |
178 query = ( | |
179 db.select( | |
180 Project.c.project_id.label("Project ID"), | |
181 Project.c.name.label("Project Name"), | |
182 db.func.count(Sample.c.sample_id).label("Sample Count") | |
183 ) | |
184 .join(User_Project, User_Project.c.project_id == Project.c.project_id) | |
185 .join(User, User.c.user_id == User_Project.c.user_id) | |
186 .join(Sample, Sample.c.project == Project.c.project_id, isouter=True) | |
187 .where(User.c.user_id == user_id) | |
188 .group_by(Project.c.project_id, Project.c.name) | |
189 ) | |
190 | |
191 # Execute the query, fetch the results, and return the DataFrame. | |
192 return pd.DataFrame(connection.execute(query).fetchall()) | |
193 | |
194 | |
195 def main(): | |
196 | |
197 # Parse command-line arguments | |
198 args = get_arguments() | |
199 | |
200 # Parse and check the specified command line arguments | |
201 parse_check_args(args) | |
202 | |
203 try: | |
204 # Load credentials and create the database engine | |
205 engine = get_engine(args.credentials_file) | |
206 | |
207 # Reflect metadata and connect to the database | |
208 metadata = db.MetaData() | |
209 metadata.reflect(bind=engine, only=["ufz_user", "ufz_user_project", "project", "sample"]) | |
210 | |
211 with engine.connect() as conn: | |
212 # Get user ID | |
213 user_id = get_user_id(conn, metadata, args.login) | |
214 | |
215 if not user_id: | |
216 raise ValueError( | |
217 "No Lambda-Miner user found with the login name \"{}\". " | |
218 "Please find the description on how to register for the Lambda-Miner at " | |
219 "https://lambda-miner-project.pages.ufz.de/lambda-miner-workflows/getting-started/." | |
220 .format(args.login) | |
221 ) | |
222 | |
223 # Get projects with sample counts | |
224 projects = get_projects_with_sample_count(conn, metadata, user_id) | |
225 | |
226 # Write projects as a CSV file to the specified output | |
227 with open(args.output, "w") as f: | |
228 f.write(projects.to_csv(index=False)) | |
229 | |
230 # Display the result | |
231 if projects.empty: | |
232 raise ValueError( | |
233 "No projects found for the user \"{}\". " | |
234 "Please create a project before going on." | |
235 .format(args.login) | |
236 ) | |
237 else: | |
238 print(projects) | |
239 | |
240 except FileNotFoundError: | |
241 raise FileNotFoundError(f"Credentials file not found at \"{args.credentials_file}\".") | |
242 | |
243 except db.exc.SQLAlchemyError as e: | |
244 raise RuntimeError(f"Database error occurred: {e}") | |
245 | |
246 except Exception as e: | |
247 raise RuntimeError(f"An unexpected error occurred: {e}") | |
248 | |
249 | |
250 if __name__ == "__main__": | |
251 main() |