Mercurial > repos > enis > gcp_batch_netcat
comparison gcp_batch_netcat.py @ 4:2ff4a39ea41b draft
planemo upload commit 1bf6938d35be8e67e317f504f43f281ce7dc06e6
author | enis |
---|---|
date | Tue, 22 Jul 2025 14:47:47 +0000 |
parents | 0ea626b10557 |
children | b2ce158b4f22 |
comparison
equal
deleted
inserted
replaced
3:0ea626b10557 | 4:2ff4a39ea41b |
---|---|
1 import argparse | |
2 import json | |
3 import logging | |
4 import os | |
5 import sys | |
6 # import time | |
7 import uuid | |
8 from google.cloud import batch_v1 | |
1 | 9 |
2 import json | 10 # Configure logging to go to stdout instead of stderr to avoid Galaxy marking job as failed |
3 import subprocess | 11 import sys |
4 import argparse | 12 logging.basicConfig( |
5 import uuid | 13 level=logging.INFO, |
6 import time | 14 format='%(asctime)s - %(levelname)s - %(message)s', |
7 import os | 15 stream=sys.stdout |
16 ) | |
17 logger = logging.getLogger(__name__) | |
8 | 18 |
9 def main(): | 19 def main(): |
10 parser = argparse.ArgumentParser() | 20 parser = argparse.ArgumentParser() |
11 parser.add_argument('--nfs_address', required=True) | 21 parser.add_argument('--nfs_address', required=False, help='NFS server address (if not provided, will be auto-detected from /galaxy/server/database/ mount)') |
12 parser.add_argument('--output', required=True) | 22 parser.add_argument('--output', required=True) |
13 parser.add_argument('--project', required=True) | 23 parser.add_argument('--project', required=False, help='GCP Project ID (if not provided, will be extracted from service account key)') |
14 parser.add_argument('--region', required=True) | 24 parser.add_argument('--region', required=True) |
15 parser.add_argument('--port', default='2049') | 25 parser.add_argument('--network', default='default', help='GCP Network name') |
26 parser.add_argument('--subnet', default='default', help='GCP Subnet name') | |
16 parser.add_argument('--service_account_key', required=True) | 27 parser.add_argument('--service_account_key', required=True) |
17 args = parser.parse_args() | 28 args = parser.parse_args() |
18 | 29 |
19 # Set up authentication using the service account key | 30 # Set up authentication using the service account key |
20 os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.service_account_key | 31 os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.service_account_key |
32 logger.info(f"Authentication configured with service account: {args.service_account_key}") | |
21 | 33 |
22 # Ensure gcloud uses a writable config directory | 34 # Extract GCP project ID from service account key if not provided |
23 os.environ['CLOUDSDK_CONFIG'] = '/tmp/gcloud-config' | 35 if args.project: |
36 project_id = args.project | |
37 logger.info(f"Using provided project ID: {project_id}") | |
38 else: | |
39 try: | |
40 with open(args.service_account_key, 'r') as f: | |
41 service_account_data = json.load(f) | |
42 project_id = service_account_data.get('project_id') | |
43 if not project_id: | |
44 raise ValueError("project_id not found in service account key file") | |
45 logger.info(f"Extracted project ID from service account key: {project_id}") | |
46 except Exception as e: | |
47 logger.error(f"Failed to extract project ID from service account key: {e}") | |
48 raise | |
24 | 49 |
25 # Create the temp config directory if it doesn't exist | 50 # Extract NFS server address if not provided |
26 os.makedirs('/tmp/gcloud-config', exist_ok=True) | 51 if args.nfs_address: |
52 nfs_address = args.nfs_address | |
53 logger.info(f"Using provided NFS address: {nfs_address}") | |
54 else: | |
55 try: | |
56 # Try to detect NFS server from /galaxy/server/database/ mount | |
57 import subprocess | |
58 result = subprocess.run(['mount'], capture_output=True, text=True) | |
59 nfs_address = None | |
60 | |
61 for line in result.stdout.split('\n'): | |
62 if '/galaxy/server/database' in line and ':' in line: | |
63 # Look for NFS mount pattern: server:/path on /galaxy/server/database | |
64 parts = line.split() | |
65 for part in parts: | |
66 if ':' in part and part.count(':') == 1: | |
67 nfs_address = part.split(':')[0] | |
68 break | |
69 if nfs_address: | |
70 logger.info(f"Detected NFS address from mount: {nfs_address}") | |
71 break | |
72 | |
73 if not nfs_address: | |
74 # Fallback: try to parse /proc/mounts | |
75 try: | |
76 with open('/proc/mounts', 'r') as f: | |
77 for line in f: | |
78 if '/galaxy/server/database' in line and ':' in line: | |
79 parts = line.split() | |
80 if len(parts) > 0 and ':' in parts[0]: | |
81 nfs_address = parts[0].split(':')[0] | |
82 logger.info(f"Detected NFS address from /proc/mounts: {nfs_address}") | |
83 break | |
84 except: | |
85 pass | |
86 | |
87 if not nfs_address: | |
88 raise ValueError("Could not auto-detect NFS server address from /galaxy/server/database/ mount") | |
89 | |
90 logger.info(f"Auto-detected NFS address from mount: {nfs_address}") | |
91 except Exception as e: | |
92 logger.error(f"Failed to auto-detect NFS address: {e}") | |
93 raise | |
94 | |
95 # time.sleep(10000) | |
27 | 96 |
28 job_name = f'netcat-job-{uuid.uuid4()}' | 97 job_name = f'netcat-job-{uuid.uuid4()}' |
98 logger.info(f"Generated job name: {job_name}") | |
29 | 99 |
30 job_spec = { | 100 # Create Batch client |
31 "taskGroups": [ | 101 logger.info("Creating Batch client...") |
32 { | 102 client = batch_v1.BatchServiceClient() |
33 "taskSpec": { | 103 logger.info("Batch client created successfully") |
34 "runnables": [ | |
35 { | |
36 "script": { | |
37 "text": f"/usr/bin/nc -z -v {args.nfs_address} {args.port}" | |
38 } | |
39 } | |
40 ], | |
41 "computeResource": { | |
42 "cpuMilli": 1000, | |
43 "memoryMib": 1024 | |
44 }, | |
45 "environment": { | |
46 "imageUri": "afgane/gcp-batch-netcat:0.1.0" | |
47 } | |
48 }, | |
49 "taskCount": 1, | |
50 "parallelism": 1 | |
51 } | |
52 ], | |
53 "logsPolicy": { | |
54 "destination": "CLOUD_LOGGING" | |
55 } | |
56 } | |
57 | 104 |
58 job_spec_file = 'job.json' | 105 # Define the job using the Python client library objects |
59 with open(job_spec_file, 'w') as f: | 106 logger.info("Building job specification...") |
60 json.dump(job_spec, f) | 107 runnable = batch_v1.Runnable() |
108 runnable.container = batch_v1.Runnable.Container() | |
109 runnable.container.image_uri = "afgane/gcp-batch-netcat:0.2.0" | |
110 runnable.container.entrypoint = "/usr/bin/nc" | |
111 runnable.container.commands = ["-z", "-v", nfs_address, "2049"] | |
112 logger.debug(f"Container config: image={runnable.container.image_uri}, entrypoint={runnable.container.entrypoint}, commands={runnable.container.commands}") | |
61 | 113 |
62 command = [ | 114 task = batch_v1.TaskSpec() |
63 'gcloud', 'batch', 'jobs', 'submit', job_name, | 115 task.runnables = [runnable] |
64 '--location', args.region, | 116 task.compute_resource = batch_v1.ComputeResource() |
65 '--project', args.project, | 117 task.compute_resource.cpu_milli = 1000 |
66 '--config', job_spec_file, | 118 task.compute_resource.memory_mib = 1024 |
67 '--format=text', | 119 logger.debug(f"Compute resources: CPU={task.compute_resource.cpu_milli}m, Memory={task.compute_resource.memory_mib}MiB") |
68 '--verbosity=debug' | |
69 ] | |
70 | 120 |
71 # Wait 4 minutes before submitting the job | 121 task_group = batch_v1.TaskGroup() |
72 time.sleep(240) | 122 task_group.task_count = 1 |
123 task_group.parallelism = 1 | |
124 task_group.task_spec = task | |
125 logger.debug(f"Task group: count={task_group.task_count}, parallelism={task_group.parallelism}") | |
73 | 126 |
127 # Network configuration: Batch job should run in the same network as the NFS server | |
128 network_interface = batch_v1.AllocationPolicy.NetworkInterface() | |
129 network_interface.network = f"global/networks/{args.network}" | |
130 network_interface.subnetwork = f"regions/{args.region}/subnetworks/{args.subnet}" | |
131 logger.debug(f"Network: {network_interface.network}") | |
132 logger.debug(f"Subnet: {network_interface.subnetwork}") | |
133 | |
134 network_policy = batch_v1.AllocationPolicy.NetworkPolicy() | |
135 network_policy.network_interfaces = [network_interface] | |
136 | |
137 allocation_policy = batch_v1.AllocationPolicy() | |
138 allocation_policy.network = network_policy | |
139 | |
140 job = batch_v1.Job() | |
141 job.task_groups = [task_group] | |
142 job.allocation_policy = allocation_policy | |
143 job.logs_policy = batch_v1.LogsPolicy() | |
144 job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING | |
145 logger.info("Job specification built successfully") | |
146 | |
147 create_request = batch_v1.CreateJobRequest() | |
148 create_request.parent = f"projects/{project_id}/locations/{args.region}" | |
149 create_request.job_id = job_name | |
150 create_request.job = job | |
151 logger.debug(f"Create request parent: {create_request.parent}") | |
152 logger.debug(f"Create request job_id: {create_request.job_id}") | |
153 | |
154 logger.info(f"Submitting job with name: {job_name}") | |
155 logger.info(f"Target project: {project_id}") | |
156 logger.info(f"Target Batch region: {args.region}") | |
157 logger.info(f"NFS target: {nfs_address}:2049") | |
158 | |
159 # Proceed with job submission | |
74 try: | 160 try: |
75 result = subprocess.run(command, capture_output=True, text=True, check=True) | 161 logger.info("Calling client.create_job()...") |
162 job_response = client.create_job(request=create_request) | |
163 logger.info("Job submitted successfully!") | |
164 logger.info(f"Job name: {job_response.name}") | |
165 logger.info(f"Job UID: {job_response.uid}") | |
166 | |
76 with open(args.output, 'w') as f: | 167 with open(args.output, 'w') as f: |
77 f.write("Job output:\n") | 168 f.write("Job submitted successfully using Python client.\n") |
78 f.write(result.stdout) | 169 f.write(f"Job name: {job_name}\n") |
79 f.write(result.stderr) | 170 f.write(f"Job response name: {job_response.name}\n") |
80 except subprocess.CalledProcessError as e: | 171 f.write(f"Job UID: {job_response.uid}\n") |
172 f.write(f"Project: {project_id}\n") | |
173 f.write(f"Region: {args.region}\n") | |
174 f.write(f"NFS Address: {nfs_address}:2049\n") | |
175 | |
176 except Exception as e: | |
177 logger.error(f"Error submitting job: {type(e).__name__}: {e}") | |
178 logger.error(f"Error details: {str(e)}") | |
179 import traceback | |
180 logger.error("Traceback:", exc_info=True) | |
181 | |
81 with open(args.output, 'w') as f: | 182 with open(args.output, 'w') as f: |
82 f.write("Error submitting job:\n") | 183 f.write(f"Error submitting job: {type(e).__name__}: {e}\n") |
83 f.write(e.stderr) | 184 f.write(f"Error details: {str(e)}\n") |
185 f.write(f"Job name: {job_name}\n") | |
186 f.write(f"Project: {project_id}\n") | |
187 f.write(f"Region: {args.region}\n") | |
188 f.write(f"Traceback:\n") | |
189 f.write(traceback.format_exc()) | |
84 | 190 |
85 if __name__ == '__main__': | 191 if __name__ == '__main__': |
86 main() | 192 main() |