Source code for common.db_tools.db_tools.queues.hyperband
from collections import deque
from typing import Any, Optional
from pottery import RedisDeque, Redlock
from redis import Redis
from schemas.requests.common import HyperbandRequest
from .base import _EmptyContextManager, _logger
[docs]class HyperbandQueue:
def __init__(self, pool: Optional[BlockingIOError]) -> None:
self._deque: Any
self._lock: Any
if pool is not None:
key = "shift:hyperband_queue"
redis_instance = Redis(connection_pool=pool)
self._deque = RedisDeque(redis=redis_instance, key=key)
self._lock = Redlock(key=key + "-lock", masters={redis_instance})
self._serialize = lambda x: x.json()
self._deserialize = lambda x: HyperbandRequest.parse_raw(str(x))
else:
self._deque, self._lock = deque(), _EmptyContextManager()
self._serialize, self._deserialize = lambda x: x, lambda x: x
[docs] def empty(self) -> bool:
"""Checks whether the queue is empty.
Returns:
bool: True if the queue is empty, False otherwise.
"""
with self._lock:
return len(self._deque) == 0
[docs] def put(self, value: HyperbandRequest):
"""Puts the request into queue.
Args:
value (Hyperband Request): Request to put into queue.
"""
with self._lock:
self._deque.append(self._serialize(value))
_logger.info(
"Hyperband request %s - inserted into hyperband queue",
value.id,
)
[docs] def get(self) -> HyperbandRequest:
"""Returns the first element from the queue. Before this call, it should be checked with another method that the queue is not empty. Note that it is not safe to call this method from multiple threads/processes as the element could have already been returned to another thread/process.
Returns:
HyperbandStatus: First element from the queue.
"""
with self._lock:
return_value = self._deserialize(self._deque.popleft())
_logger.info(
"Hyperband request %s - popped from the hyperband queue",
return_value.id,
)
return return_value
def __len__(self) -> int:
return len(self._deque)