comparison gcp_batch_netcat.py @ 5:b2ce158b4f22 draft

planemo upload commit ece227052d14d755b0d0b07a827152b2e98fb94b
author enis
date Thu, 24 Jul 2025 21:41:18 +0000
parents 2ff4a39ea41b
children d25792770df8
comparison
equal deleted inserted replaced
4:2ff4a39ea41b 5:b2ce158b4f22
1 import argparse 1 import argparse
2 import json 2 import json
3 import logging 3 import logging
4 import os 4 import os
5 import sys 5 import sys
6 # import time
7 import uuid 6 import uuid
8 from google.cloud import batch_v1 7 from google.cloud import batch_v1
9 8
10 # Configure logging to go to stdout instead of stderr to avoid Galaxy marking job as failed 9 # Configure logging to go to stdout instead of stderr to avoid Galaxy marking job as failed
11 import sys
12 logging.basicConfig( 10 logging.basicConfig(
13 level=logging.INFO, 11 level=logging.INFO,
14 format='%(asctime)s - %(levelname)s - %(message)s', 12 format='%(asctime)s - %(levelname)s - %(message)s',
15 stream=sys.stdout 13 stream=sys.stdout
16 ) 14 )
17 logger = logging.getLogger(__name__) 15 logger = logging.getLogger(__name__)
16
17 def determine_test_target(args):
18 """Determine the target host and port based on test type"""
19
20 if args.test_type == 'custom':
21 if not args.custom_host:
22 raise ValueError("custom_host is required when test_type is 'custom'")
23 return args.custom_host, args.custom_port
24
25 elif args.test_type == 'nfs':
26 # Extract NFS server address if not provided
27 if args.nfs_address:
28 nfs_address = args.nfs_address
29 logger.info(f"Using provided NFS address: {nfs_address}")
30 else:
31 try:
32 # Try to detect NFS server from /galaxy/server/database/ mount
33 import subprocess
34 result = subprocess.run(['mount'], capture_output=True, text=True)
35 nfs_address = None
36
37 for line in result.stdout.split('\n'):
38 if '/galaxy/server/database' in line and ':' in line:
39 # Look for NFS mount pattern: server:/path on /galaxy/server/database
40 parts = line.split()
41 for part in parts:
42 if ':' in part and part.count(':') == 1:
43 nfs_address = part.split(':')[0]
44 break
45 if nfs_address:
46 logger.info(f"Detected NFS address from mount: {nfs_address}")
47 break
48
49 if not nfs_address:
50 # Fallback: try to parse /proc/mounts
51 try:
52 with open('/proc/mounts', 'r') as f:
53 for line in f:
54 if '/galaxy/server/database' in line and ':' in line:
55 parts = line.split()
56 if len(parts) > 0 and ':' in parts[0]:
57 nfs_address = parts[0].split(':')[0]
58 logger.info(f"Detected NFS address from /proc/mounts: {nfs_address}")
59 break
60 except:
61 pass
62
63 if not nfs_address:
64 raise ValueError("Could not auto-detect NFS server address from /galaxy/server/database/ mount")
65
66 logger.info(f"Auto-detected NFS address from mount: {nfs_address}")
67 except Exception as e:
68 logger.error(f"Failed to auto-detect NFS address: {e}")
69 raise
70 return nfs_address, 2049
71
72 elif args.test_type == 'galaxy_web':
73 # Try to detect Galaxy web service
74 try:
75 import subprocess
76 result = subprocess.run(['kubectl', 'get', 'svc', '-o', 'json'], capture_output=True, text=True)
77 if result.returncode == 0:
78 services = json.loads(result.stdout)
79 for item in services.get('items', []):
80 name = item.get('metadata', {}).get('name', '')
81 if 'galaxy' in name.lower() and ('web' in name.lower() or 'nginx' in name.lower()):
82 # Found a Galaxy web service
83 spec = item.get('spec', {})
84 if spec.get('type') == 'LoadBalancer':
85 ingress = item.get('status', {}).get('loadBalancer', {}).get('ingress', [])
86 if ingress:
87 ip = ingress[0].get('ip')
88 if ip:
89 port = 80
90 for port_spec in spec.get('ports', []):
91 if port_spec.get('port'):
92 port = port_spec['port']
93 break
94 logger.info(f"Found Galaxy web service LoadBalancer: {ip}:{port}")
95 return ip, port
96 # Fallback to ClusterIP
97 cluster_ip = spec.get('clusterIP')
98 if cluster_ip and cluster_ip != 'None':
99 port = 80
100 for port_spec in spec.get('ports', []):
101 if port_spec.get('port'):
102 port = port_spec['port']
103 break
104 logger.info(f"Found Galaxy web service ClusterIP: {cluster_ip}:{port}")
105 return cluster_ip, port
106 except Exception as e:
107 logger.warning(f"Could not auto-detect Galaxy web service: {e}")
108
109 # Fallback: try common Galaxy service names
110 common_hosts = ['galaxy-web', 'galaxy-nginx', 'galaxy']
111 logger.info(f"Trying common Galaxy service name: {common_hosts[0]}")
112 return common_hosts[0], 80
113
114 elif args.test_type == 'k8s_dns':
115 # Test Kubernetes DNS resolution
116 return 'kubernetes.default.svc.cluster.local', 443
117
118 elif args.test_type == 'google_dns':
119 # Test external connectivity
120 return '8.8.8.8', 53
121
122 else:
123 raise ValueError(f"Unsupported test type: {args.test_type}")
18 124
19 def main(): 125 def main():
20 parser = argparse.ArgumentParser() 126 parser = argparse.ArgumentParser()
21 parser.add_argument('--nfs_address', required=False, help='NFS server address (if not provided, will be auto-detected from /galaxy/server/database/ mount)') 127 parser.add_argument('--nfs_address', required=False, help='NFS server address (if not provided, will be auto-detected from /galaxy/server/database/ mount)')
22 parser.add_argument('--output', required=True) 128 parser.add_argument('--output', required=True)
23 parser.add_argument('--project', required=False, help='GCP Project ID (if not provided, will be extracted from service account key)') 129 parser.add_argument('--project', required=False, help='GCP Project ID (if not provided, will be extracted from service account key)')
24 parser.add_argument('--region', required=True) 130 parser.add_argument('--region', required=True)
25 parser.add_argument('--network', default='default', help='GCP Network name') 131 parser.add_argument('--network', default='default', help='GCP Network name')
26 parser.add_argument('--subnet', default='default', help='GCP Subnet name') 132 parser.add_argument('--subnet', default='default', help='GCP Subnet name')
27 parser.add_argument('--service_account_key', required=True) 133 parser.add_argument('--service_account_key', required=True)
134 parser.add_argument('--test_type', default='nfs', choices=['nfs', 'galaxy_web', 'k8s_dns', 'google_dns', 'custom'],
135 help='Type of connectivity test to perform')
136 parser.add_argument('--custom_host', required=False, help='Custom host to test (required if test_type is custom)')
137 parser.add_argument('--custom_port', type=int, default=80, help='Custom port to test (default: 80)')
28 args = parser.parse_args() 138 args = parser.parse_args()
29 139
30 # Set up authentication using the service account key 140 # Set up authentication using the service account key
31 os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.service_account_key 141 os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.service_account_key
32 logger.info(f"Authentication configured with service account: {args.service_account_key}") 142 logger.info(f"Authentication configured with service account: {args.service_account_key}")
45 logger.info(f"Extracted project ID from service account key: {project_id}") 155 logger.info(f"Extracted project ID from service account key: {project_id}")
46 except Exception as e: 156 except Exception as e:
47 logger.error(f"Failed to extract project ID from service account key: {e}") 157 logger.error(f"Failed to extract project ID from service account key: {e}")
48 raise 158 raise
49 159
50 # Extract NFS server address if not provided 160 # Determine target host and port based on test type
51 if args.nfs_address: 161 try:
52 nfs_address = args.nfs_address 162 target_host, target_port = determine_test_target(args)
53 logger.info(f"Using provided NFS address: {nfs_address}") 163 logger.info(f"Target determined: {target_host}:{target_port}")
54 else: 164 except Exception as e:
55 try: 165 logger.error(f"Failed to determine target: {e}")
56 # Try to detect NFS server from /galaxy/server/database/ mount 166 raise
57 import subprocess
58 result = subprocess.run(['mount'], capture_output=True, text=True)
59 nfs_address = None
60
61 for line in result.stdout.split('\n'):
62 if '/galaxy/server/database' in line and ':' in line:
63 # Look for NFS mount pattern: server:/path on /galaxy/server/database
64 parts = line.split()
65 for part in parts:
66 if ':' in part and part.count(':') == 1:
67 nfs_address = part.split(':')[0]
68 break
69 if nfs_address:
70 logger.info(f"Detected NFS address from mount: {nfs_address}")
71 break
72
73 if not nfs_address:
74 # Fallback: try to parse /proc/mounts
75 try:
76 with open('/proc/mounts', 'r') as f:
77 for line in f:
78 if '/galaxy/server/database' in line and ':' in line:
79 parts = line.split()
80 if len(parts) > 0 and ':' in parts[0]:
81 nfs_address = parts[0].split(':')[0]
82 logger.info(f"Detected NFS address from /proc/mounts: {nfs_address}")
83 break
84 except:
85 pass
86
87 if not nfs_address:
88 raise ValueError("Could not auto-detect NFS server address from /galaxy/server/database/ mount")
89
90 logger.info(f"Auto-detected NFS address from mount: {nfs_address}")
91 except Exception as e:
92 logger.error(f"Failed to auto-detect NFS address: {e}")
93 raise
94
95 # time.sleep(10000)
96 167
97 job_name = f'netcat-job-{uuid.uuid4()}' 168 job_name = f'netcat-job-{uuid.uuid4()}'
98 logger.info(f"Generated job name: {job_name}") 169 logger.info(f"Generated job name: {job_name}")
99 170
100 # Create Batch client 171 # Create Batch client
105 # Define the job using the Python client library objects 176 # Define the job using the Python client library objects
106 logger.info("Building job specification...") 177 logger.info("Building job specification...")
107 runnable = batch_v1.Runnable() 178 runnable = batch_v1.Runnable()
108 runnable.container = batch_v1.Runnable.Container() 179 runnable.container = batch_v1.Runnable.Container()
109 runnable.container.image_uri = "afgane/gcp-batch-netcat:0.2.0" 180 runnable.container.image_uri = "afgane/gcp-batch-netcat:0.2.0"
110 runnable.container.entrypoint = "/usr/bin/nc" 181
111 runnable.container.commands = ["-z", "-v", nfs_address, "2049"] 182 # Create a comprehensive test script
112 logger.debug(f"Container config: image={runnable.container.image_uri}, entrypoint={runnable.container.entrypoint}, commands={runnable.container.commands}") 183 test_script = f'''#!/bin/bash
184 set -e
185 echo "=== GCP Batch Connectivity Test ==="
186 echo "Test Type: {args.test_type}"
187 echo "Target: {target_host}:{target_port}"
188 echo "Timestamp: $(date)"
189 echo "Container hostname: $(hostname)"
190 echo ""
191
192 # Basic network info
193 echo "=== Network Information ==="
194 echo "Container IP addresses:"
195 hostname -I
196 echo "Default route:"
197 ip route | grep default || echo "No default route found"
198 echo ""
199
200 # DNS configuration
201 echo "=== DNS Configuration ==="
202 echo "DNS servers:"
203 cat /etc/resolv.conf | grep nameserver || echo "No nameservers found"
204 echo ""
205
206 # Test DNS resolution of target
207 echo "=== DNS Resolution Test ==="
208 echo "Resolving {target_host}:"
209 nslookup {target_host} || {{
210 echo "DNS resolution failed for {target_host}"
211 echo "Trying with Google DNS (8.8.8.8):"
212 nslookup {target_host} 8.8.8.8 || echo "DNS resolution failed even with Google DNS"
213 }}
214 echo ""
215
216 # Basic connectivity test
217 echo "=== Primary Connectivity Test ==="
218 echo "Testing connection to {target_host}:{target_port}..."
219 timeout 30 nc -z -v -w 10 {target_host} {target_port}
220 nc_result=$?
221 echo "Netcat result: $nc_result"
222 echo ""
223
224 # Additional connectivity tests
225 echo "=== Additional Connectivity Tests ==="
226 echo "Testing Google DNS (8.8.8.8:53):"
227 timeout 10 nc -z -v -w 5 8.8.8.8 53 && echo "✓ External DNS reachable" || echo "✗ External DNS unreachable"
228
229 echo "Testing Kubernetes API (if accessible):"
230 timeout 10 nc -z -v -w 5 kubernetes.default.svc.cluster.local 443 2>/dev/null && echo "✓ Kubernetes API reachable" || echo "✗ Kubernetes API unreachable"
231
232 echo ""
233 echo "=== Network Troubleshooting ==="
234 echo "Route table:"
235 ip route
236 echo ""
237 echo "ARP table:"
238 arp -a 2>/dev/null || echo "ARP command not available"
239 echo ""
240
241 echo "=== Final Result ==="
242 if [ $nc_result -eq 0 ]; then
243 echo "✓ SUCCESS: Connection to {target_host}:{target_port} successful"
244 exit 0
245 else
246 echo "✗ FAILED: Connection to {target_host}:{target_port} failed"
247 echo "This suggests a network connectivity issue between GCP Batch and the target service."
248 echo "Common causes:"
249 echo "- Firewall rules blocking traffic"
250 echo "- Service not accessible from external networks"
251 echo "- Target service only accepting internal cluster traffic"
252 exit 1
253 fi
254 '''
255
256 runnable.container.entrypoint = "/bin/bash"
257 runnable.container.commands = ["-c", test_script]
258 logger.debug(f"Container config: image={runnable.container.image_uri}, entrypoint={runnable.container.entrypoint}")
113 259
114 task = batch_v1.TaskSpec() 260 task = batch_v1.TaskSpec()
115 task.runnables = [runnable] 261 task.runnables = [runnable]
116 task.compute_resource = batch_v1.ComputeResource() 262 task.compute_resource = batch_v1.ComputeResource()
117 task.compute_resource.cpu_milli = 1000 263 task.compute_resource.cpu_milli = 1000
152 logger.debug(f"Create request job_id: {create_request.job_id}") 298 logger.debug(f"Create request job_id: {create_request.job_id}")
153 299
154 logger.info(f"Submitting job with name: {job_name}") 300 logger.info(f"Submitting job with name: {job_name}")
155 logger.info(f"Target project: {project_id}") 301 logger.info(f"Target project: {project_id}")
156 logger.info(f"Target Batch region: {args.region}") 302 logger.info(f"Target Batch region: {args.region}")
157 logger.info(f"NFS target: {nfs_address}:2049") 303 logger.info(f"Test target: {target_host}:{target_port}")
158 304
159 # Proceed with job submission 305 # Proceed with job submission
160 try: 306 try:
161 logger.info("Calling client.create_job()...") 307 logger.info("Calling client.create_job()...")
162 job_response = client.create_job(request=create_request) 308 job_response = client.create_job(request=create_request)
169 f.write(f"Job name: {job_name}\n") 315 f.write(f"Job name: {job_name}\n")
170 f.write(f"Job response name: {job_response.name}\n") 316 f.write(f"Job response name: {job_response.name}\n")
171 f.write(f"Job UID: {job_response.uid}\n") 317 f.write(f"Job UID: {job_response.uid}\n")
172 f.write(f"Project: {project_id}\n") 318 f.write(f"Project: {project_id}\n")
173 f.write(f"Region: {args.region}\n") 319 f.write(f"Region: {args.region}\n")
174 f.write(f"NFS Address: {nfs_address}:2049\n") 320 f.write(f"Test Type: {args.test_type}\n")
321 f.write(f"Target: {target_host}:{target_port}\n")
322 f.write(f"\nTo view job logs, run:\n")
323 f.write(f"gcloud logging read 'resource.type=gce_instance AND resource.labels.instance_id={job_name}' --project={project_id}\n")
175 324
176 except Exception as e: 325 except Exception as e:
177 logger.error(f"Error submitting job: {type(e).__name__}: {e}") 326 logger.error(f"Error submitting job: {type(e).__name__}: {e}")
178 logger.error(f"Error details: {str(e)}") 327 logger.error(f"Error details: {str(e)}")
179 import traceback 328 import traceback
183 f.write(f"Error submitting job: {type(e).__name__}: {e}\n") 332 f.write(f"Error submitting job: {type(e).__name__}: {e}\n")
184 f.write(f"Error details: {str(e)}\n") 333 f.write(f"Error details: {str(e)}\n")
185 f.write(f"Job name: {job_name}\n") 334 f.write(f"Job name: {job_name}\n")
186 f.write(f"Project: {project_id}\n") 335 f.write(f"Project: {project_id}\n")
187 f.write(f"Region: {args.region}\n") 336 f.write(f"Region: {args.region}\n")
337 f.write(f"Test Type: {args.test_type}\n")
338 f.write(f"Target: {target_host}:{target_port}\n")
188 f.write(f"Traceback:\n") 339 f.write(f"Traceback:\n")
189 f.write(traceback.format_exc()) 340 f.write(traceback.format_exc())
190 341
191 if __name__ == '__main__': 342 if __name__ == '__main__':
192 main() 343 main()