Mercurial > repos > goeckslab > tiling_pyhist
comparison tiling_pyhist.py @ 0:c051e9688932 draft default tip
planemo upload for repository https://github.com/goeckslab/gleam.git commit 11356473f09dd54d86af28b74bd9ed097d07ca04
author | goeckslab |
---|---|
date | Thu, 03 Jul 2025 23:48:01 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c051e9688932 |
---|---|
1 import argparse | |
2 import logging | |
3 import os | |
4 import subprocess | |
5 import sys | |
6 import tempfile | |
7 import zipfile | |
8 from concurrent.futures import ProcessPoolExecutor | |
9 from pathlib import Path | |
10 from typing import Tuple | |
11 | |
12 import openslide | |
13 import psutil | |
14 from pyhist import PySlide, TileGenerator | |
15 from src import utility_functions | |
16 | |
17 # Configure logging to stdout | |
18 logging.basicConfig( | |
19 stream=sys.stdout, | |
20 format="%(asctime)s - %(levelname)s - %(message)s", | |
21 level=logging.INFO, | |
22 ) | |
23 | |
24 # Constants | |
25 SEGMENT_BINARY_PATH = "/pyhist/src/graph_segmentation/segment" | |
26 DEFAULT_PATCH_SIZE = 256 | |
27 DEFAULT_DOWNSCALE_FACTOR = 8 | |
28 TILE_FORMAT = "png" | |
29 MEMORY_PER_WORKER = 1 # GB, estimated memory per worker process | |
30 | |
31 | |
32 def log_memory_usage() -> None: | |
33 """Log the current memory usage of the process in megabytes.""" | |
34 process = psutil.Process(os.getpid()) | |
35 mem_info = process.memory_info() | |
36 logging.info( | |
37 "Memory usage: RSS=%.2f MB, VMS=%.2f MB", | |
38 mem_info.rss / 1024 / 1024, | |
39 mem_info.vms / 1024 / 1024 | |
40 ) | |
41 | |
42 | |
43 def validate_slide(image_path: Path) -> None: | |
44 """Validate the input image using OpenSlide.""" | |
45 try: | |
46 with openslide.OpenSlide(str(image_path)): | |
47 logging.info("Validated input file with OpenSlide: %s", image_path) | |
48 except openslide.OpenSlideError as error: | |
49 raise RuntimeError("Invalid input file: %s", error) from error | |
50 | |
51 | |
52 def check_segmentation_binary() -> bool: | |
53 """Check if the segmentation binary exists and is executable.""" | |
54 if os.path.exists(SEGMENT_BINARY_PATH) and os.access(SEGMENT_BINARY_PATH, os.X_OK): | |
55 logging.info("Segmentation executable found: %s", SEGMENT_BINARY_PATH) | |
56 return True | |
57 logging.warning("Segmentation executable missing, using Otsu method") | |
58 return False | |
59 | |
60 | |
61 def build_pyhist_config(image_path: Path, output_dir: Path) -> dict: | |
62 """Build the configuration dictionary for PyHIST processing.""" | |
63 return { | |
64 "svs": str(image_path), | |
65 "patch_size": DEFAULT_PATCH_SIZE, | |
66 "method": "otsu", | |
67 "thres": 0.1, | |
68 "output_downsample": DEFAULT_DOWNSCALE_FACTOR, | |
69 "mask_downsample": DEFAULT_DOWNSCALE_FACTOR, | |
70 "borders": "0000", | |
71 "corners": "1010", | |
72 "pct_bc": 1, | |
73 "k_const": 1000, | |
74 "minimum_segmentsize": 1000, | |
75 "save_patches": True, | |
76 "save_blank": False, | |
77 "save_nonsquare": False, | |
78 "save_tilecrossed_image": False, | |
79 "save_mask": True, | |
80 "save_edges": False, | |
81 "info": "verbose", | |
82 "output": str(output_dir), | |
83 "format": TILE_FORMAT, | |
84 } | |
85 | |
86 | |
87 def process_image_with_pyhist( | |
88 image_path: Path, output_dir: Path, original_name: str | |
89 ) -> Path: | |
90 """Process a single image with PyHIST and return the tile directory.""" | |
91 logging.info("Processing image: %s", image_path) | |
92 log_memory_usage() | |
93 | |
94 # Validate input | |
95 validate_slide(image_path) | |
96 | |
97 # Check segmentation method | |
98 check_segmentation_binary() | |
99 | |
100 # Prepare PyHIST configuration | |
101 config = build_pyhist_config(image_path, output_dir) | |
102 | |
103 # Set logging level based on config | |
104 log_levels = { | |
105 "default": logging.INFO, | |
106 "verbose": logging.DEBUG, | |
107 "silent": logging.CRITICAL, | |
108 } | |
109 logging.getLogger().setLevel(log_levels[config["info"]]) | |
110 | |
111 # Process the slide | |
112 utility_functions.check_image(config["svs"]) | |
113 slide = PySlide(config) | |
114 logging.info("Slide loaded: %s", slide) | |
115 | |
116 tile_generator = TileGenerator(slide) | |
117 logging.info("Tile generator initialized: %s", tile_generator) | |
118 | |
119 try: | |
120 tile_generator.execute() | |
121 except subprocess.CalledProcessError as error: | |
122 raise RuntimeError("Tile extraction failed: %s", error) from error | |
123 | |
124 tile_dir = Path(slide.tile_folder) | |
125 tiles = list(tile_dir.glob(f"*.{TILE_FORMAT}")) | |
126 logging.info("Found %d tiles in %s", len(tiles), tile_dir) | |
127 | |
128 utility_functions.clean(slide) | |
129 return tile_dir | |
130 | |
131 | |
132 def append_tiles_to_zip( | |
133 zip_file: zipfile.ZipFile, | |
134 original_name: str, | |
135 tile_dir: Path | |
136 ) -> None: | |
137 """Append PNG tiles from the tile directory to the ZIP file.""" | |
138 original_base = Path(original_name).stem | |
139 tiles = list(tile_dir.glob(f"*.{TILE_FORMAT}")) | |
140 | |
141 for tile in tiles: | |
142 tile_number = tile.stem.split("_")[-1] | |
143 arcname = f"{original_base}/{original_base}_{tile_number}.{TILE_FORMAT}" | |
144 zip_file.write(tile, arcname) | |
145 | |
146 logging.info("Appended %d tiles from %s", len(tiles), tile_dir) | |
147 | |
148 | |
149 def process_single_image(task: Tuple[Path, str, Path]) -> Path: | |
150 """Process a single image and return the tile directory.""" | |
151 image_path, original_name, output_dir = task | |
152 try: | |
153 tile_dir = process_image_with_pyhist( | |
154 image_path, | |
155 output_dir, | |
156 original_name | |
157 ) | |
158 return tile_dir | |
159 except Exception as error: | |
160 logging.error("Error processing %s: %s", image_path, error) | |
161 raise | |
162 | |
163 | |
164 def get_max_workers() -> int: | |
165 """Determine the maximum number of worker processes based on available resources.""" | |
166 cpu_cores = psutil.cpu_count(logical=False) # Physical CPU cores | |
167 available_memory = psutil.virtual_memory().available / (1024 ** 3) # in GB | |
168 max_workers_memory = available_memory // MEMORY_PER_WORKER | |
169 max_workers = min(cpu_cores, max_workers_memory) | |
170 return max(1, int(max_workers)) | |
171 | |
172 | |
173 def parse_arguments() -> argparse.Namespace: | |
174 """Parse command-line arguments.""" | |
175 parser = argparse.ArgumentParser(description="Tile extraction for Galaxy") | |
176 parser.add_argument( | |
177 "--input", | |
178 action="append", | |
179 help="Input image paths", | |
180 default=[] | |
181 ) | |
182 parser.add_argument( | |
183 "--original_name", | |
184 action="append", | |
185 help="Original file names", | |
186 default=[] | |
187 ) | |
188 parser.add_argument( | |
189 "--output_zip", | |
190 required=True, | |
191 help="Output ZIP file path" | |
192 ) | |
193 return parser.parse_args() | |
194 | |
195 | |
196 def main() -> None: | |
197 """Main function to orchestrate tile extraction and ZIP creation with dynamic multiprocessing.""" | |
198 # Removed os.chdir("/pyhist") to stay in Galaxy's working directory | |
199 logging.info("Working directory: %s", os.getcwd()) | |
200 | |
201 args = parse_arguments() | |
202 | |
203 if len(args.input) != len(args.original_name): | |
204 raise ValueError("Mismatch between input paths and original names") | |
205 | |
206 # Create a temporary directory using tempfile | |
207 with tempfile.TemporaryDirectory(prefix="pyhist_tiles_", dir=os.getcwd()) as temp_dir_path: | |
208 temp_dir = Path(temp_dir_path) | |
209 logging.info("Created temporary directory: %s", temp_dir) | |
210 | |
211 # Prepare tasks with unique output directories | |
212 tasks = [ | |
213 (Path(image_path), original_name, temp_dir / Path(original_name).stem) | |
214 for image_path, original_name in zip(args.input, args.original_name) | |
215 ] | |
216 | |
217 # Determine the number of worker processes based on available resources | |
218 max_workers = get_max_workers() | |
219 logging.info("Using %d worker processes", max_workers) | |
220 | |
221 # Process images in parallel | |
222 with ProcessPoolExecutor(max_workers=max_workers) as executor: | |
223 tile_dirs = list(executor.map(process_single_image, tasks)) | |
224 | |
225 # Create the ZIP file and append all tiles | |
226 with zipfile.ZipFile(args.output_zip, "w", zipfile.ZIP_DEFLATED) as zip_file: | |
227 for (image_path, original_name, output_dir), tile_dir in zip(tasks, tile_dirs): | |
228 append_tiles_to_zip(zip_file, original_name, tile_dir) | |
229 | |
230 logging.info("Final ZIP size: %d bytes", Path(args.output_zip).stat().st_size) | |
231 # No need for shutil.rmtree as TemporaryDirectory cleans up automatically | |
232 logging.info("Temporary directory cleaned up") | |
233 | |
234 | |
235 if __name__ == "__main__": | |
236 main() |