Source code for pyncette.utils

import asyncio
import logging
from functools import wraps

from .errors import LeaseLostException
from .model import Context
from .model import Decorator
from .model import TaskFunc

logger = logging.getLogger(__name__)

DEFAULT_LEASE_REMAINING_RATIO = 0.5


[docs]def with_heartbeat( lease_remaining_ratio: float = DEFAULT_LEASE_REMAINING_RATIO, cancel_on_lease_lost: bool = False, ) -> Decorator[TaskFunc]: """ Decorate the task to use automatic heartbeating in background. :param lease_remaining_ratio: Number between 0 and 1. The ratio between elapsed time and the lease duration when heartbeating will be performed. Default is 0.5. :param cancel_on_lease_lost: Whether the task should be cancelled if lease expires. Default is False. """ if lease_remaining_ratio <= 0 or lease_remaining_ratio >= 1: raise ValueError("Lease remaining ratio must be in (0, 1)") def decorator(func: TaskFunc) -> TaskFunc: @wraps(func) async def _func(context: Context) -> None: body = asyncio.ensure_future(func(context)) async def _heartbeater() -> None: delay_duration = context.task.lease_duration.total_seconds() * lease_remaining_ratio while True: await asyncio.sleep(delay_duration) try: await asyncio.shield(context.heartbeat()) except LeaseLostException: if cancel_on_lease_lost: body.cancel() # Regardless of whether we want the task body to continue # executing, it makes no sense to continue heartbeating # since the lease has already been lost. return except Exception as e: # There may be transient errors while heartbeating. In this case # ignore them until the next heartbeat interval. logger.warning(f"Heartbeating on {context.task} failed", exc_info=e) heartbeater = asyncio.create_task(_heartbeater()) try: await body finally: heartbeater.cancel() try: await heartbeater except asyncio.CancelledError: pass return _func return decorator