Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/galaxy/tool_util/verify/wait.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Abstraction for waiting on API conditions to become true.""" import time DEFAULT_POLLING_BACKOFF = 0 DEFAULT_POLLING_DELTA = 0.25 TIMEOUT_MESSAGE_TEMPLATE = "Timed out after {} seconds waiting on {}." def wait_on(function, desc, timeout, delta=DEFAULT_POLLING_DELTA, polling_backoff=DEFAULT_POLLING_BACKOFF, sleep_=None): """Wait for function to return non-None value. Grow the polling interval (initially ``delta`` defaulting to 0.25 seconds) incrementally by the supplied ``polling_backoff`` (defaulting to 0). Throw a TimeoutAssertionError if the supplied timeout is reached without supplied function ever returning a non-None value. """ sleep = sleep_ or time.sleep total_wait = 0 while True: if total_wait > timeout: raise TimeoutAssertionError(TIMEOUT_MESSAGE_TEMPLATE.format(total_wait, desc)) value = function() if value is not None: return value total_wait += delta sleep(delta) delta += polling_backoff class TimeoutAssertionError(AssertionError): """Derivative of AssertionError indicating wait_on exceeded max time.""" def __init__(self, message): super().__init__(message)