view utils.py @ 0:375c36923da1 draft default tip

planemo upload for repository https://github.com/goeckslab/gleam.git commit 1c6c1ad7a1b2bd3645aa0eafa2167784820b52e0
author goeckslab
date Tue, 09 Dec 2025 23:49:47 +0000
parents
children
line wrap: on
line source

import json
import logging
import os
import random
import sys
import tempfile
import zipfile
from pathlib import Path
from typing import List, Optional

import numpy as np
import pandas as pd
import torch

LOG = logging.getLogger(__name__)


def str2bool(val) -> bool:
    """Parse common truthy strings to bool."""
    return str(val).strip().lower() in ("1", "true", "yes", "y")


def load_user_hparams(hp_arg: Optional[str]) -> dict:
    """Parse --hyperparameters (inline JSON or path to .json)."""
    if not hp_arg:
        return {}
    try:
        s = hp_arg.strip()
        if s.startswith("{"):
            return json.loads(s)
        with open(s, "r") as f:
            return json.load(f)
    except Exception as e:
        LOG.warning(f"Could not parse --hyperparameters: {e}. Ignoring.")
        return {}


def set_seeds(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


def ensure_local_tmp():
    os.makedirs("/tmp", exist_ok=True)


def enable_tensor_cores_if_available():
    if torch.cuda.is_available():
        torch.set_float32_matmul_precision("high")


def enable_deterministic_mode(seed: Optional[int] = None):
    """
    Force deterministic algorithms where possible to reduce run-to-run variance.
    """
    if seed is not None:
        set_seeds(seed)
        os.environ.setdefault("PYTHONHASHSEED", str(int(seed)))
    # cuBLAS determinism
    os.environ.setdefault("CUBLAS_WORKSPACE_CONFIG", ":4096:8")
    try:
        torch.use_deterministic_algorithms(True)
    except Exception as e:
        LOG.warning(f"Could not enable torch deterministic algorithms: {e}")
    try:
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    except Exception as e:
        LOG.warning(f"Could not enforce deterministic cuDNN settings: {e}")
    try:
        torch.backends.cuda.matmul.allow_tf32 = False
    except Exception:
        pass
    try:
        torch.backends.cudnn.allow_tf32 = False
    except Exception:
        pass


def load_file(path: str) -> pd.DataFrame:
    if not path:
        return None
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Dataset not found: {path}")
    return pd.read_csv(path, sep=None, engine="python")


def prepare_image_search_dirs(args) -> Optional[Path]:
    if not args.images_zip:
        return None

    root = Path(tempfile.mkdtemp(prefix="autogluon_images_"))
    LOG.info(f"Extracting {len(args.images_zip)} image ZIP(s) to {root}")

    for zip_path in args.images_zip:
        path = Path(zip_path)
        if not path.exists():
            raise FileNotFoundError(f"Image ZIP not found: {zip_path}")
        with zipfile.ZipFile(path, 'r') as z:
            z.extractall(root)
        LOG.info(f"Extracted {path.name}")

    return root


def absolute_path_expander(df: pd.DataFrame, extracted_root: Optional[Path], image_columns: Optional[List[str]]) -> List[str]:
    """
    Resolve image paths to absolute paths. If no image_columns are provided,
    infers candidate columns whose values resolve to existing files (checking
    absolute paths first, then paths relative to the extracted_root).
    """
    if df is None or df.empty:
        return []

    image_columns = [c for c in (image_columns or []) if c in df.columns]

    def resolve(p):
        if pd.isna(p):
            return None
        orig = Path(str(p).strip())
        candidates = []
        if orig.is_absolute():
            candidates.append(orig)
        if extracted_root is not None:
            candidates.extend([extracted_root / orig, extracted_root / orig.name])
        for cand in candidates:
            if cand.exists():
                return str(cand.resolve())
        return None

    # Infer image columns if none were provided
    if not image_columns:
        obj_cols = [c for c in df.columns if str(df[c].dtype) == "object"]
        inferred = []
        for col in obj_cols:
            sample = df[col].dropna().head(50)
            if sample.empty:
                continue
            resolved_sample = sample.apply(resolve)
            if resolved_sample.notna().any():
                inferred.append(col)
        image_columns = inferred
        if image_columns:
            LOG.info(f"Inferred image columns: {image_columns}")

    for col in image_columns:
        df[col] = df[col].apply(resolve)

    return image_columns


def verify_outputs(paths):
    ok = True
    for p, desc in paths:
        if os.path.exists(p):
            size = os.path.getsize(p)
            LOG.info(f"✓ Output {desc}: {p} ({size:,} bytes)")
            os.chmod(p, 0o644)
        else:
            LOG.error(f"✗ Output {desc} MISSING: {p}")
            ok = False
    if not ok:
        LOG.error("Some outputs are missing!")
        sys.exit(1)