from __future__ import annotations
import datetime
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING
from typing import Any
from typing import AsyncIterator
from typing import Awaitable
from typing import Callable
from typing import NewType
from typing import Protocol
from typing import TypeVar
T = TypeVar("T")
Decorator = Callable[[T], T]
Lease = NewType("Lease", object)
ContinuationToken = NewType("ContinuationToken", object)
# https://github.com/python/mypy/issues/708
[docs]class Heartbeater(Protocol):
def __call__(self) -> Awaitable[None]:
"Heartbeats on the message"
[docs]class Context:
"""Task execution context. This class can have dynamic attributes."""
app_context: pyncette.PyncetteContext
task: pyncette.task.Task
scheduled_at: datetime.datetime
_lease: Lease | None
heartbeat: Heartbeater
args: dict[str, Any]
if TYPE_CHECKING:
def __getattr__(self, name: str) -> Any:
...
def __setattr__(self, name: str, value: Any) -> Any:
...
[docs]class TaskFunc(Protocol):
def __call__(self, context: Context) -> Awaitable[None]:
"Executes the task"
[docs]class PartitionSelector(Protocol):
def __call__(self, partition_count: int, task_id: str) -> int:
"Gets the partition number for a given task id"
[docs]class NextFunc(Protocol):
def __call__(self) -> Awaitable[None]:
"Enter the next middleware or the task body"
[docs]class MiddlewareFunc(Protocol):
def __call__(self, context: Context, next: NextFunc) -> Awaitable[None]:
"Executes the middleware"
[docs]class FixtureFunc(Protocol):
def __call__(self, app_context: pyncette.PyncetteContext) -> AsyncIterator[Any]:
"Executes the fixture"
[docs]class ResultType(Enum):
"""Status returned by polling the task"""
MISSING = 0
PENDING = 1
READY = 2
LOCKED = 3
LEASE_MISMATCH = 4
[docs]class ExecutionMode(Enum):
"""The execution mode for a Pyncette task."""
AT_LEAST_ONCE = 0
AT_MOST_ONCE = 1
[docs]class FailureMode(Enum):
"""What should happen when a task fails."""
NONE = 0
UNLOCK = 1
COMMIT = 2
[docs]@dataclass
class PollResponse:
"""The result of a task poll"""
result: ResultType
scheduled_at: datetime.datetime
lease: Lease | None
[docs]@dataclass
class QueryResponse:
"""The result of a task query"""
tasks: list[tuple[pyncette.task.Task, Lease]]
continuation_token: ContinuationToken | None
if TYPE_CHECKING:
import pyncette
import pyncette.task