comparison tiling_pyhist.py @ 0:c051e9688932 draft default tip

planemo upload for repository https://github.com/goeckslab/gleam.git commit 11356473f09dd54d86af28b74bd9ed097d07ca04
author goeckslab
date Thu, 03 Jul 2025 23:48:01 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c051e9688932
1 import argparse
2 import logging
3 import os
4 import subprocess
5 import sys
6 import tempfile
7 import zipfile
8 from concurrent.futures import ProcessPoolExecutor
9 from pathlib import Path
10 from typing import Tuple
11
12 import openslide
13 import psutil
14 from pyhist import PySlide, TileGenerator
15 from src import utility_functions
16
17 # Configure logging to stdout
18 logging.basicConfig(
19 stream=sys.stdout,
20 format="%(asctime)s - %(levelname)s - %(message)s",
21 level=logging.INFO,
22 )
23
24 # Constants
25 SEGMENT_BINARY_PATH = "/pyhist/src/graph_segmentation/segment"
26 DEFAULT_PATCH_SIZE = 256
27 DEFAULT_DOWNSCALE_FACTOR = 8
28 TILE_FORMAT = "png"
29 MEMORY_PER_WORKER = 1 # GB, estimated memory per worker process
30
31
32 def log_memory_usage() -> None:
33 """Log the current memory usage of the process in megabytes."""
34 process = psutil.Process(os.getpid())
35 mem_info = process.memory_info()
36 logging.info(
37 "Memory usage: RSS=%.2f MB, VMS=%.2f MB",
38 mem_info.rss / 1024 / 1024,
39 mem_info.vms / 1024 / 1024
40 )
41
42
43 def validate_slide(image_path: Path) -> None:
44 """Validate the input image using OpenSlide."""
45 try:
46 with openslide.OpenSlide(str(image_path)):
47 logging.info("Validated input file with OpenSlide: %s", image_path)
48 except openslide.OpenSlideError as error:
49 raise RuntimeError("Invalid input file: %s", error) from error
50
51
52 def check_segmentation_binary() -> bool:
53 """Check if the segmentation binary exists and is executable."""
54 if os.path.exists(SEGMENT_BINARY_PATH) and os.access(SEGMENT_BINARY_PATH, os.X_OK):
55 logging.info("Segmentation executable found: %s", SEGMENT_BINARY_PATH)
56 return True
57 logging.warning("Segmentation executable missing, using Otsu method")
58 return False
59
60
61 def build_pyhist_config(image_path: Path, output_dir: Path) -> dict:
62 """Build the configuration dictionary for PyHIST processing."""
63 return {
64 "svs": str(image_path),
65 "patch_size": DEFAULT_PATCH_SIZE,
66 "method": "otsu",
67 "thres": 0.1,
68 "output_downsample": DEFAULT_DOWNSCALE_FACTOR,
69 "mask_downsample": DEFAULT_DOWNSCALE_FACTOR,
70 "borders": "0000",
71 "corners": "1010",
72 "pct_bc": 1,
73 "k_const": 1000,
74 "minimum_segmentsize": 1000,
75 "save_patches": True,
76 "save_blank": False,
77 "save_nonsquare": False,
78 "save_tilecrossed_image": False,
79 "save_mask": True,
80 "save_edges": False,
81 "info": "verbose",
82 "output": str(output_dir),
83 "format": TILE_FORMAT,
84 }
85
86
87 def process_image_with_pyhist(
88 image_path: Path, output_dir: Path, original_name: str
89 ) -> Path:
90 """Process a single image with PyHIST and return the tile directory."""
91 logging.info("Processing image: %s", image_path)
92 log_memory_usage()
93
94 # Validate input
95 validate_slide(image_path)
96
97 # Check segmentation method
98 check_segmentation_binary()
99
100 # Prepare PyHIST configuration
101 config = build_pyhist_config(image_path, output_dir)
102
103 # Set logging level based on config
104 log_levels = {
105 "default": logging.INFO,
106 "verbose": logging.DEBUG,
107 "silent": logging.CRITICAL,
108 }
109 logging.getLogger().setLevel(log_levels[config["info"]])
110
111 # Process the slide
112 utility_functions.check_image(config["svs"])
113 slide = PySlide(config)
114 logging.info("Slide loaded: %s", slide)
115
116 tile_generator = TileGenerator(slide)
117 logging.info("Tile generator initialized: %s", tile_generator)
118
119 try:
120 tile_generator.execute()
121 except subprocess.CalledProcessError as error:
122 raise RuntimeError("Tile extraction failed: %s", error) from error
123
124 tile_dir = Path(slide.tile_folder)
125 tiles = list(tile_dir.glob(f"*.{TILE_FORMAT}"))
126 logging.info("Found %d tiles in %s", len(tiles), tile_dir)
127
128 utility_functions.clean(slide)
129 return tile_dir
130
131
132 def append_tiles_to_zip(
133 zip_file: zipfile.ZipFile,
134 original_name: str,
135 tile_dir: Path
136 ) -> None:
137 """Append PNG tiles from the tile directory to the ZIP file."""
138 original_base = Path(original_name).stem
139 tiles = list(tile_dir.glob(f"*.{TILE_FORMAT}"))
140
141 for tile in tiles:
142 tile_number = tile.stem.split("_")[-1]
143 arcname = f"{original_base}/{original_base}_{tile_number}.{TILE_FORMAT}"
144 zip_file.write(tile, arcname)
145
146 logging.info("Appended %d tiles from %s", len(tiles), tile_dir)
147
148
149 def process_single_image(task: Tuple[Path, str, Path]) -> Path:
150 """Process a single image and return the tile directory."""
151 image_path, original_name, output_dir = task
152 try:
153 tile_dir = process_image_with_pyhist(
154 image_path,
155 output_dir,
156 original_name
157 )
158 return tile_dir
159 except Exception as error:
160 logging.error("Error processing %s: %s", image_path, error)
161 raise
162
163
164 def get_max_workers() -> int:
165 """Determine the maximum number of worker processes based on available resources."""
166 cpu_cores = psutil.cpu_count(logical=False) # Physical CPU cores
167 available_memory = psutil.virtual_memory().available / (1024 ** 3) # in GB
168 max_workers_memory = available_memory // MEMORY_PER_WORKER
169 max_workers = min(cpu_cores, max_workers_memory)
170 return max(1, int(max_workers))
171
172
173 def parse_arguments() -> argparse.Namespace:
174 """Parse command-line arguments."""
175 parser = argparse.ArgumentParser(description="Tile extraction for Galaxy")
176 parser.add_argument(
177 "--input",
178 action="append",
179 help="Input image paths",
180 default=[]
181 )
182 parser.add_argument(
183 "--original_name",
184 action="append",
185 help="Original file names",
186 default=[]
187 )
188 parser.add_argument(
189 "--output_zip",
190 required=True,
191 help="Output ZIP file path"
192 )
193 return parser.parse_args()
194
195
196 def main() -> None:
197 """Main function to orchestrate tile extraction and ZIP creation with dynamic multiprocessing."""
198 # Removed os.chdir("/pyhist") to stay in Galaxy's working directory
199 logging.info("Working directory: %s", os.getcwd())
200
201 args = parse_arguments()
202
203 if len(args.input) != len(args.original_name):
204 raise ValueError("Mismatch between input paths and original names")
205
206 # Create a temporary directory using tempfile
207 with tempfile.TemporaryDirectory(prefix="pyhist_tiles_", dir=os.getcwd()) as temp_dir_path:
208 temp_dir = Path(temp_dir_path)
209 logging.info("Created temporary directory: %s", temp_dir)
210
211 # Prepare tasks with unique output directories
212 tasks = [
213 (Path(image_path), original_name, temp_dir / Path(original_name).stem)
214 for image_path, original_name in zip(args.input, args.original_name)
215 ]
216
217 # Determine the number of worker processes based on available resources
218 max_workers = get_max_workers()
219 logging.info("Using %d worker processes", max_workers)
220
221 # Process images in parallel
222 with ProcessPoolExecutor(max_workers=max_workers) as executor:
223 tile_dirs = list(executor.map(process_single_image, tasks))
224
225 # Create the ZIP file and append all tiles
226 with zipfile.ZipFile(args.output_zip, "w", zipfile.ZIP_DEFLATED) as zip_file:
227 for (image_path, original_name, output_dir), tile_dir in zip(tasks, tile_dirs):
228 append_tiles_to_zip(zip_file, original_name, tile_dir)
229
230 logging.info("Final ZIP size: %d bytes", Path(args.output_zip).stat().st_size)
231 # No need for shutil.rmtree as TemporaryDirectory cleans up automatically
232 logging.info("Temporary directory cleaned up")
233
234
235 if __name__ == "__main__":
236 main()