comparison gcp_batch_netcat.py @ 4:2ff4a39ea41b draft

planemo upload commit 1bf6938d35be8e67e317f504f43f281ce7dc06e6
author enis
date Tue, 22 Jul 2025 14:47:47 +0000
parents 0ea626b10557
children b2ce158b4f22
comparison
equal deleted inserted replaced
3:0ea626b10557 4:2ff4a39ea41b
1 import argparse
2 import json
3 import logging
4 import os
5 import sys
6 # import time
7 import uuid
8 from google.cloud import batch_v1
1 9
2 import json 10 # Configure logging to go to stdout instead of stderr to avoid Galaxy marking job as failed
3 import subprocess 11 import sys
4 import argparse 12 logging.basicConfig(
5 import uuid 13 level=logging.INFO,
6 import time 14 format='%(asctime)s - %(levelname)s - %(message)s',
7 import os 15 stream=sys.stdout
16 )
17 logger = logging.getLogger(__name__)
8 18
9 def main(): 19 def main():
10 parser = argparse.ArgumentParser() 20 parser = argparse.ArgumentParser()
11 parser.add_argument('--nfs_address', required=True) 21 parser.add_argument('--nfs_address', required=False, help='NFS server address (if not provided, will be auto-detected from /galaxy/server/database/ mount)')
12 parser.add_argument('--output', required=True) 22 parser.add_argument('--output', required=True)
13 parser.add_argument('--project', required=True) 23 parser.add_argument('--project', required=False, help='GCP Project ID (if not provided, will be extracted from service account key)')
14 parser.add_argument('--region', required=True) 24 parser.add_argument('--region', required=True)
15 parser.add_argument('--port', default='2049') 25 parser.add_argument('--network', default='default', help='GCP Network name')
26 parser.add_argument('--subnet', default='default', help='GCP Subnet name')
16 parser.add_argument('--service_account_key', required=True) 27 parser.add_argument('--service_account_key', required=True)
17 args = parser.parse_args() 28 args = parser.parse_args()
18 29
19 # Set up authentication using the service account key 30 # Set up authentication using the service account key
20 os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.service_account_key 31 os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.service_account_key
32 logger.info(f"Authentication configured with service account: {args.service_account_key}")
21 33
22 # Ensure gcloud uses a writable config directory 34 # Extract GCP project ID from service account key if not provided
23 os.environ['CLOUDSDK_CONFIG'] = '/tmp/gcloud-config' 35 if args.project:
36 project_id = args.project
37 logger.info(f"Using provided project ID: {project_id}")
38 else:
39 try:
40 with open(args.service_account_key, 'r') as f:
41 service_account_data = json.load(f)
42 project_id = service_account_data.get('project_id')
43 if not project_id:
44 raise ValueError("project_id not found in service account key file")
45 logger.info(f"Extracted project ID from service account key: {project_id}")
46 except Exception as e:
47 logger.error(f"Failed to extract project ID from service account key: {e}")
48 raise
24 49
25 # Create the temp config directory if it doesn't exist 50 # Extract NFS server address if not provided
26 os.makedirs('/tmp/gcloud-config', exist_ok=True) 51 if args.nfs_address:
52 nfs_address = args.nfs_address
53 logger.info(f"Using provided NFS address: {nfs_address}")
54 else:
55 try:
56 # Try to detect NFS server from /galaxy/server/database/ mount
57 import subprocess
58 result = subprocess.run(['mount'], capture_output=True, text=True)
59 nfs_address = None
60
61 for line in result.stdout.split('\n'):
62 if '/galaxy/server/database' in line and ':' in line:
63 # Look for NFS mount pattern: server:/path on /galaxy/server/database
64 parts = line.split()
65 for part in parts:
66 if ':' in part and part.count(':') == 1:
67 nfs_address = part.split(':')[0]
68 break
69 if nfs_address:
70 logger.info(f"Detected NFS address from mount: {nfs_address}")
71 break
72
73 if not nfs_address:
74 # Fallback: try to parse /proc/mounts
75 try:
76 with open('/proc/mounts', 'r') as f:
77 for line in f:
78 if '/galaxy/server/database' in line and ':' in line:
79 parts = line.split()
80 if len(parts) > 0 and ':' in parts[0]:
81 nfs_address = parts[0].split(':')[0]
82 logger.info(f"Detected NFS address from /proc/mounts: {nfs_address}")
83 break
84 except:
85 pass
86
87 if not nfs_address:
88 raise ValueError("Could not auto-detect NFS server address from /galaxy/server/database/ mount")
89
90 logger.info(f"Auto-detected NFS address from mount: {nfs_address}")
91 except Exception as e:
92 logger.error(f"Failed to auto-detect NFS address: {e}")
93 raise
94
95 # time.sleep(10000)
27 96
28 job_name = f'netcat-job-{uuid.uuid4()}' 97 job_name = f'netcat-job-{uuid.uuid4()}'
98 logger.info(f"Generated job name: {job_name}")
29 99
30 job_spec = { 100 # Create Batch client
31 "taskGroups": [ 101 logger.info("Creating Batch client...")
32 { 102 client = batch_v1.BatchServiceClient()
33 "taskSpec": { 103 logger.info("Batch client created successfully")
34 "runnables": [
35 {
36 "script": {
37 "text": f"/usr/bin/nc -z -v {args.nfs_address} {args.port}"
38 }
39 }
40 ],
41 "computeResource": {
42 "cpuMilli": 1000,
43 "memoryMib": 1024
44 },
45 "environment": {
46 "imageUri": "afgane/gcp-batch-netcat:0.1.0"
47 }
48 },
49 "taskCount": 1,
50 "parallelism": 1
51 }
52 ],
53 "logsPolicy": {
54 "destination": "CLOUD_LOGGING"
55 }
56 }
57 104
58 job_spec_file = 'job.json' 105 # Define the job using the Python client library objects
59 with open(job_spec_file, 'w') as f: 106 logger.info("Building job specification...")
60 json.dump(job_spec, f) 107 runnable = batch_v1.Runnable()
108 runnable.container = batch_v1.Runnable.Container()
109 runnable.container.image_uri = "afgane/gcp-batch-netcat:0.2.0"
110 runnable.container.entrypoint = "/usr/bin/nc"
111 runnable.container.commands = ["-z", "-v", nfs_address, "2049"]
112 logger.debug(f"Container config: image={runnable.container.image_uri}, entrypoint={runnable.container.entrypoint}, commands={runnable.container.commands}")
61 113
62 command = [ 114 task = batch_v1.TaskSpec()
63 'gcloud', 'batch', 'jobs', 'submit', job_name, 115 task.runnables = [runnable]
64 '--location', args.region, 116 task.compute_resource = batch_v1.ComputeResource()
65 '--project', args.project, 117 task.compute_resource.cpu_milli = 1000
66 '--config', job_spec_file, 118 task.compute_resource.memory_mib = 1024
67 '--format=text', 119 logger.debug(f"Compute resources: CPU={task.compute_resource.cpu_milli}m, Memory={task.compute_resource.memory_mib}MiB")
68 '--verbosity=debug'
69 ]
70 120
71 # Wait 4 minutes before submitting the job 121 task_group = batch_v1.TaskGroup()
72 time.sleep(240) 122 task_group.task_count = 1
123 task_group.parallelism = 1
124 task_group.task_spec = task
125 logger.debug(f"Task group: count={task_group.task_count}, parallelism={task_group.parallelism}")
73 126
127 # Network configuration: Batch job should run in the same network as the NFS server
128 network_interface = batch_v1.AllocationPolicy.NetworkInterface()
129 network_interface.network = f"global/networks/{args.network}"
130 network_interface.subnetwork = f"regions/{args.region}/subnetworks/{args.subnet}"
131 logger.debug(f"Network: {network_interface.network}")
132 logger.debug(f"Subnet: {network_interface.subnetwork}")
133
134 network_policy = batch_v1.AllocationPolicy.NetworkPolicy()
135 network_policy.network_interfaces = [network_interface]
136
137 allocation_policy = batch_v1.AllocationPolicy()
138 allocation_policy.network = network_policy
139
140 job = batch_v1.Job()
141 job.task_groups = [task_group]
142 job.allocation_policy = allocation_policy
143 job.logs_policy = batch_v1.LogsPolicy()
144 job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
145 logger.info("Job specification built successfully")
146
147 create_request = batch_v1.CreateJobRequest()
148 create_request.parent = f"projects/{project_id}/locations/{args.region}"
149 create_request.job_id = job_name
150 create_request.job = job
151 logger.debug(f"Create request parent: {create_request.parent}")
152 logger.debug(f"Create request job_id: {create_request.job_id}")
153
154 logger.info(f"Submitting job with name: {job_name}")
155 logger.info(f"Target project: {project_id}")
156 logger.info(f"Target Batch region: {args.region}")
157 logger.info(f"NFS target: {nfs_address}:2049")
158
159 # Proceed with job submission
74 try: 160 try:
75 result = subprocess.run(command, capture_output=True, text=True, check=True) 161 logger.info("Calling client.create_job()...")
162 job_response = client.create_job(request=create_request)
163 logger.info("Job submitted successfully!")
164 logger.info(f"Job name: {job_response.name}")
165 logger.info(f"Job UID: {job_response.uid}")
166
76 with open(args.output, 'w') as f: 167 with open(args.output, 'w') as f:
77 f.write("Job output:\n") 168 f.write("Job submitted successfully using Python client.\n")
78 f.write(result.stdout) 169 f.write(f"Job name: {job_name}\n")
79 f.write(result.stderr) 170 f.write(f"Job response name: {job_response.name}\n")
80 except subprocess.CalledProcessError as e: 171 f.write(f"Job UID: {job_response.uid}\n")
172 f.write(f"Project: {project_id}\n")
173 f.write(f"Region: {args.region}\n")
174 f.write(f"NFS Address: {nfs_address}:2049\n")
175
176 except Exception as e:
177 logger.error(f"Error submitting job: {type(e).__name__}: {e}")
178 logger.error(f"Error details: {str(e)}")
179 import traceback
180 logger.error("Traceback:", exc_info=True)
181
81 with open(args.output, 'w') as f: 182 with open(args.output, 'w') as f:
82 f.write("Error submitting job:\n") 183 f.write(f"Error submitting job: {type(e).__name__}: {e}\n")
83 f.write(e.stderr) 184 f.write(f"Error details: {str(e)}\n")
185 f.write(f"Job name: {job_name}\n")
186 f.write(f"Project: {project_id}\n")
187 f.write(f"Region: {args.region}\n")
188 f.write(f"Traceback:\n")
189 f.write(traceback.format_exc())
84 190
85 if __name__ == '__main__': 191 if __name__ == '__main__':
86 main() 192 main()