Contents¶
Overview¶
docs | |
---|---|
tests | |
package |
A reliable distributed scheduler with pluggable storage backends for Async Python.
- Free software: MIT license
Installation¶
Minimal installation (just SQLite persistence):
pip install pyncette
Full installation (all the backends and Prometheus metrics exporter):
pip install pyncette[all]
You can also install the in-development version with:
pip install https://github.com/tibordp/pyncette/archive/master.zip
Documentation¶
Usage example¶
Simple in-memory scheduler (does not persist state)
from pyncette import Pyncette, Context
app = Pyncette()
@app.task(schedule='* * * * *')
async def foo(context: Context):
print('This will run every minute')
if __name__ == '__main__':
app.main()
Persistent distributed cron using Redis (coordinates execution with parallel instances and survives restarts)
from pyncette import Pyncette, Context
from pyncette.redis import redis_repository
app = Pyncette(repository_factory=redis_repository, redis_url='redis://localhost')
@app.task(schedule='* * * * * */10')
async def foo(context: Context):
print('This will run every 10 seconds')
if __name__ == '__main__':
app.main()
See the examples directory for more examples of usage.
Use cases¶
Pyncette is designed for reliable (at-least-once or at-most-once) execution of recurring tasks (think cronjobs) whose lifecycles are managed dynamically, but can work effectively for non-reccuring tasks too.
Example use cases:
- You want to perform a database backup every day at noon
- You want a report to be generated daily for your 10M users at the time of their choosing
- You want currency conversion rates to be refreshed every 10 seconds
- You want to allow your users to schedule non-recurring emails to be sent at an arbitrary time in the future
Pyncette might not be a good fit if:
- You want your tasks to be scheduled to run (ideally) once as soon as possible. It is doable, but you will be better served by a general purpose reliable queue like RabbitMQ or Amazon SQS.
- You need tasks to execute at sub-second intervals with low jitter. Pyncette coordinates execution on a per task-instance basis and this corrdination can add overhead and jitter.
Supported backends¶
Pyncette comes with an implementation for the following backends (used for persistence and coordination) out-of-the-box:
- SQLite (included)
- Redis (
pip install pyncette[redis]
) - PostgreSQL (
pip install pyncette[postgres]
) - MySQL 8.0+ (
pip install pyncette[mysql]
) - Amazon DynamoDB (
pip install pyncette[dynamodb]
)
Pyncette imposes few requirements on the underlying datastores, so it can be extended to support other databases or custom storage formats / integrations with existing systems. For best results, the backend needs to provide:
- Some sort of serialization mechanism, e.g. traditional transactions, atomic stored procedures or compare-and-swap
- Efficient range queries over a secondary index, which can be eventually consistent
Development¶
To run integration tests you will need Redis, PostgreSQL, MySQL and Localstack (for DynamoDB) running locally.
To run the all tests run:
tox
Alternatively, there is a Docker Compose environment that will set up all the backends so that integration tests can run seamlessly:
docker-compose up -d
docker-compose run --rm shell
tox
To run just the unit tests (excluding integration tests):
tox -e py310 # or your Python version of choice
Note, to combine the coverage data from all the tox environments run:
Windows | set PYTEST_ADDOPTS=--cov-append
tox
|
---|---|
Other | PYTEST_ADDOPTS=--cov-append tox
|
Installation¶
At the command line:
pip install pyncette
For installing with Redis peristence:
pip install pyncette[redis]
For installing with MySQL peristence:
pip install pyncette[mysql]
For installing with Amazon DynamoDB peristence:
pip install pyncette[dynamodb]
For installing with PostgreSQL peristence:
pip install pyncette[postgres]
For installing with Prometheus metrics exporter:
pip install pyncette[prometheus]
For a full installation with all the extras:
pip install pyncette[all]
Usage¶
The core unit of execution in Pyncette is a Task
. Each task is a Python coroutine that specifies what needs to be executed.
from pyncette import Pyncette, Context
app = Pyncette()
@app.task(interval=datetime.timedelta(seconds=2))
async def successful_task(context: Context) -> None:
print("This will execute every second")
if __name__ == "__main__":
app.main()
Running the main loop¶
The usual use case is that Pyncette runs as its own process, so the standard way to start the main loop is with main()
method of the Pyncette
. This sets up the logging to standard output and signal handler allowing for graceful shutdown (first SIGINT initiates the graceful shutdown and the second one terminates the process).
If Pyncette is run alongside other code or for customization, create()
can be used to initialize the runtime environment and then the main loop can be run with run()
:
import asyncio
from pyncette import Pyncette
app = Pyncette()
...
async with app.create() as app_context:
await app_context.run()
Specifying the schedule¶
There are two ways a schedule can be specified, one is with the cron-like syntax (uses croniter
under the hood to support the calculation):
@app.task(schedule="* * * * *")
async def every_minute(context: Context):
...
@app.task(schedule="* * * * * */10")
async def every_10_seconds(context: Context):
...
@app.task(schedule="20 4 * * * *")
async def every_day_at_4_20_am(context: Context):
...
The other way is with an interval:
@app.task(interval=datetime.timedelta(seconds=12))
async def every_12_seconds(context: Context):
...
Customizing tasks¶
Pyncette supports multiple different execution modes which provide different levels of reliability guarantees, depending on the nature of the task.
The default task configuration:
- When the task is scheduled for execution, it is locked for 60 seconds
- If the task execution succeeds, the next execution is scheduled and the task is unlocked
- If the task execution fails (exception is raised), the lock is not released, so it will be retried after the lease expires.
- If the task execution exceeds the lease duration, it will be executed again (so there could be two executions at the same time)
Best-effort tasks¶
If the task is run in a best-effort mode, locking will not be employed, and the next execution will be scheduled immediately when it becomes ready.:
from pyncette import ExecutionMode
@app.task(interval=datetime.timedelta(seconds=10), execution_mode=ExecutionMode.AT_MOST_ONCE)
async def every_10_seconds(context: Context):
print("Ping")
Caution
If best effort is used, there is no way to retry a failed execution, and exceptions thrown by the task will only be logged.
Failure behavior¶
Failure behavior can be specified with failure_mode
parameter:
from pyncette import ExecutionMode
@app.task(interval=datetime.timedelta(seconds=10), failure_mode=FailureMode.UNLOCK)
async def every_10_seconds(context: Context):
print("Ping")
FailureMode.NONE
the task will stay locked until the lease expires. This is the default.FailureMode.UNLOCK
the task will be immediately unlocked if an exception is thrown, so it will be retried on the next tick.FailureMode.COMMIT
treat the exception as a success and schedule the next execution in case the exception is thrown.
Timezone support¶
Pyncette is timezone-aware, the timezone for a task can be specified by timezone
parameter:
from pyncette import ExecutionMode
@app.task(schedule="0 12 * * *", timezone="Europe/Dublin")
async def task1(context: Context):
print(f"Hello from Dublin!")
@app.task(schedule="0 12 * * *", timezone="UTC+12")
async def task2(context: Context):
print(f"Hello from Камча́тка!")
The accepted values are all that dateutil.tz.gettz()
accepts.
Disabling a task¶
Tasks can be disabled by passing an enabled=False
in the parameters. This can be used for example
to conditionally enable tasks only on certain instances.
@app.task(schedule="* * * * *", enabled=False)
async def task1(context: Context):
print(f"This will never run.")
Tasks can be disabled also in the initialization code:
from pyncette import Pyncette, Context
app = Pyncette()
@app.task(schedule="* * * * *")
async def task1(context: Context):
print(f"This will never run.")
async with app.create() as app_context:
task1.enabled = False
await app_context.run()
Task parameters¶
The task()
decorator accepts an arbitrary number of additional parameters, which are available through the context
parameter
from pyncette import ExecutionMode
# If we use multiple decorators on the same coroutine, we must explicitely provide the name
@app.task(name="task1", interval=datetime.timedelta(seconds=10), username="abra")
@app.task(name="task2", interval=datetime.timedelta(seconds=20), username="kadabra")
@app.task(name="task3", interval=datetime.timedelta(seconds=30), username="alakazam")
async def task(context: Context):
print(f"{context.args['username']}")
This allows for parametrized tasks with multiple decorators, this is an essential feature needed to support Dynamic tasks.
Note
There is a restriction that all the values of the parameters must be JSON-serializable, since they are persisted in storage when dynamic tasks are used.
Middlewares¶
If you have common logic that should execute around every task invocation, middlewares can be used. Good examples of middlewares are ones used for logging and metrics.
app = Pyncette()
@app.middleware
async def retry(context: Context, next: Callable[[], Awaitable[None]]):
# Example only, prefer to rely on Pyncette to drive task retry logic
for _ in range(5):
try:
await next()
return
except Exception as e:
pass
raise Exception(f"Task {context.task.name} failed too many times.")
@app.middleware
async def logging(context: Context, next: Callable[[], Awaitable[None]]):
logger.info(f"Task {context.task.name} started")
try:
await next()
except Exception as e:
logger.error(f"Task {context.task.name} failed", e)
raise
@app.middleware
async def db_transaction(context: Context, next: Callable[[], Awaitable[None]]):
context.db.begin_transaction()
try:
await next()
except Exception:
context.db.rollback()
raise
else:
context.db.commit()
Middlewares execute in order they are defined.
Fixtures¶
Fixtures provide a convenient way for injecting dependencies into tasks, and specifying the set-up and tear-down code. They can be though of as application-level middlewares. For example, let’s say we want to inject the database and a logfile as dependencies to all our tasks:
app = Pyncette()
@app.fixture()
async def db(app_context: PyncetteContext):
db = await database.connect(...)
try:
yield db
finally:
await db.close()
@app.fixture(name="super_log_file")
async def logfile(app_context: PyncetteContext):
with open("log.txt", "a") as file:
yield file
@app.task(interval=datetime.timedelta(seconds=2))
async def successful_task(context: Context) -> None:
context.super_log_file.write("Querying the database")
results = await context.db.query(...)
...
The lifetime of a fixture is that of a Pyncette application, i.e. the setup code for all fixtures runs before the first tick and the tear-down code runs after the graceful shutdown is initiated and all the pending tasks have finished. Like middlewares, fixtures execute in the order they are defined (and in reverse order on shutdown).
Persistence¶
By default Pyncette runs without persistence. This means that the schedule is mainteined in-memory and there is no coordination between multiple instances of the app.
Enabling persistence allows the aplication to recover from restarts as well as the ability to run multiple instances of an app concurrently without duplicate executions of tasks.
See Backends for instructions on how to configure persistence for a database of your choice.
Heartbeating¶
If have tasks that have an unpredictable run time, it can be hard to come up with an appropriate lease duration in advance. If set too short, lease will expire, leading to duplicate task execution and if too long, there can be insufficient protection against unhealthy workers.
A way to mitigate is to use heartbeating. Heartbeating will periodically extend the lease on the task as long as task is still running. Pyncette supports two approaches to heartbeating:
- Cooperative heartbeating: your task periodically calls
context.heartbeat()
to extend the lease - Automatic heartbeating: your task is decorated with
with_heartbeat()
and it heartbeats automatically in the background for as long as the task is executing.
Beware that automatic heartbeating can potentially be dangerous if, for example, your task is stuck in an infinite loop or an I/O operation that does not have a proper time out. In this case the lease can be kept alive indefinitely and the task will not make any progress. Cooperative heartbeating may be more verbose, but offers a greater degree of control.
If context.heartbeat()
is called when the lease is already lost, the call will raise LeaseLostException
, allowing you to bail out early, since another instance is likely already processing the same task.
from pyncette.utils import with_heartbeat
@app.task(schedule='* * * * * */10')
@with_heartbeat()
async def foo(context: Context):
# The task will be kept alive by the heartbeat
await asyncio.sleep(3600)
if __name__ == '__main__':
app.main()
Dynamic tasks¶
Pyncette supports a use case where the tasks are not necessarily known in advance with schedule_task()
.
@app.dynamic_task()
async def hello(context: Context) -> None:
print(f"Hello {context.args['username']}")
async with app.create() as app_context:
await asyncio.gather(
app_context.schedule_task(hello, "bill_task", schedule="0 * * * *", username="bill"),
app_context.schedule_task(hello, "steve_task", schedule="20 * * * *", username="steve"),
app_context.schedule_task(hello, "john_task", schedule="40 * * * *", username="john"),
)
await app_context.run()
When persistence is used, the schedules and task parameters of the are persisted alongside the execution data, which allows the tasks to be registered and unregistered at will.
An example use case is a web application where every user can have something happen at their chosen schedule. Polling is efficient, since the concrete instances of the dynamic class are only loaded from the storage if the are already due, instead of being polled all the time.
The task instances can be removed by unschedule_task()
...
async with app.create() as app_context:
await app_context.schedule_task(hello, "bill_task", schedule="0 * * * *", username="bill")
await app_context.unschedule_task(hello, "bill_task")
await app_context.run()
Note
If the number of dynamic tasks is large, it is a good idea to limit the batch size:
app = Pyncette(
repository_factory=redis_repository,
redis_url='redis://localhost',
batch_size=10
)
This will cause that only a specified number of dynamic tasks are scheduled for execution during a single tick, as well as allow potential multiple instances of the same app to load balance effectively.
Once-off dynamic tasks¶
Dynamic tasks can also be scheduled to execute only once at a specific date.
@app.dynamic_task()
async def task(context: Context) -> None:
print(f"Hello {context.task.name}!")
async with app.create() as app_context:
await app_context.schedule_task(task, "y2k38", execute_at=datetime(2038, 1, 19, 3, 14, 7));
await app_context.schedule_task(task, "tomorrow", execute_at=datetime.now() + timedelta(days=1));
# This will execute once immediately, since it is already overdue
await app_context.schedule_task(task, "overdue", execute_at=datetime.now() - timedelta(days=1));
await app_context.run()
Once-off tasks have the same reliability guarantees as recurrent tasks, which is controlled by execution_mode and failure_mode parameters, but in case of success, they will not be scheduled again.
Performance¶
Tasks are executed in parallel. If you have a lot of long running tasks, you can set concurrency_limit
in Pyncette
constructor, as this ensures that there are at most that many executing tasks at any given time. If there are no free slots in the semaphore, this will serve as a back-pressure and ensure that we don’t poll additional tasks until some of the currently executing ones finish, enabling the pending tasks to be scheduled on other instances of your app. Setting concurrency_limit
to 1 is equivalent of serializing the execution of all the tasks.
Depending on the backend used, having a dynamic task with a very large number of instances can lead to diminished performance. See Partitioned dynamic tasks for a way to address this issue.
Backends¶
By default Pyncette runs without persistence. This means that the schedule is maintained in-memory and there is no coordination between multiple instances of the app.
Enabling persistence allows the aplication to recover from restarts as well as the ability to run multiple instances of an app concurrently without duplicate executions of tasks.
SQLite¶
SQLite is the default peristence engine and is included in the base Python package.
from pyncette import Pyncette, Context
app = Pyncette(sqlite_database="pyncette.db")
@app.task(schedule='* * * * * */10')
async def foo(context: Context):
print('This will run every 10 seconds')
if __name__ == '__main__':
app.main()
Redis¶
Redis can be enabled by passing redis_repository()
as repository_factory
parameter to the Pyncette
constructor.
from pyncette import Pyncette, Context
from pyncette.redis import redis_repository
app = Pyncette(repository_factory=redis_repository, redis_url='redis://localhost')
Optionally, the tasks can be namespaced if the Redis server is shared among different Pyncette apps:
app = Pyncette(repository_factory=redis_repository, redis_url='redis://localhost', redis_namespace='my_super_app')
PostgreSQL¶
Redis can be enabled by passing postgres_repository()
as repository_factory
parameter to the Pyncette
constructor.
from pyncette import Pyncette, Context
from pyncette.postgres import postgres_repository
app = Pyncette(
repository_factory=postgres_repository,
postgres_url='postgres://postgres@localhost/pyncette'
postgres_table_name='pyncette_tasks'
)
The table will be automatically initialized on startup if it does not exists unless postgres_skip_table_create
is set to True
.
MySQL¶
MySQL can be configured by passing mysql_repository()
as repository_factory
parameter to the Pyncette
constructor.
The MySQL backend requires MySQL version 8.0+.
from pyncette import Pyncette, Context
from pyncette.postgres import mysql_repository
app = Pyncette(
repository_factory=mysql_repository,
mysql_host="localhost",
mysql_database="pyncette",
mysql_user="pyncette",
mysql_password="password",
mysql_table_name='pyncette_tasks'
)
The table will be automatically initialized on startup if it does not exists unless mysql_skip_table_create
is set to True
.
Caution
MySQL backend currently does not work with Python 3.10 due to an issue with an upstream library.
Amazon DynamoDB¶
Amazon DynamoDB backend can be configured with dynamodb_repository()
.
from pyncette import Pyncette, Context
from pyncette.dynamodb import dynamodb_repository
app = Pyncette(
repository_factory=dynamodb_repository,
dynamodb_region_name="eu-west-1",
dynamodb_table_name="pyncette",
)
DynamoDB repository will use ambient credentials, such as environment variables, ~/.aws/config
or EC2 metadata service if e.g. running on EC2 or a Kubernetes cluster with kiam/kube2iam.
For convenience, an appropriate DynamoDB table will be automatically created on startup if it does not exist. The created table uses on-demand pricing model. If you would like to customize this behavior, you can manually create the table beforehand and pass dynamodb_skip_table_create=True
in parameters.
Expected table schema should look something like this
{
"AttributeDefinitions": [
{ "AttributeName": "partition_id", "AttributeType": "S" },
{ "AttributeName": "ready_at", "AttributeType": "S" },
{ "AttributeName": "task_id", "AttributeType": "S" }
],
"KeySchema": [
{ "AttributeName": "partition_id", "KeyType": "HASH" },
{ "AttributeName": "task_id", "KeyType": "RANGE" }
],
"LocalSecondaryIndexes": [
{
"IndexName": "ready_at",
"KeySchema": [
{ "AttributeName": "partition_id", "KeyType": "HASH" },
{ "AttributeName": "ready_at", "KeyType": "RANGE" }
],
"Projection": {
"ProjectionType": "ALL"
}
}
]
}
Advanced usage¶
Partitioned dynamic tasks¶
Certain backends, like Redis and Amazon DynamoDB have a natural partitioning to them. Generally, when using dynamic tasks, the task name is used as a partition key. For example, in DynamoDB, each dynamic task instance is associated with one row/document, but they all share the same partition id.
Similarly for Redis, each task instance record is stored in its own key, but the index that sets them in order of next execution is stored in a single key, so a single large task will not benefit from a clustered Redis setup.
If there is a very large number of dynamic task instances associated with a single task or they are polled very frequently, this can lead to hot partitions and degraded performance. There can also be limits as to how many task instances can even be stored in a single partition. For DynamoDB, the limit is 10GB.
Pyncette supports transparent partitioning of tasks through partitioned_task()
decorator.
from pyncette import Pyncette, Context
app = Pyncette()
@app.partitioned_task(partition_count=32)
async def hello(context: Context) -> None:
print(f"Hello {context.args['username']}")
async with app.create() as app_context:
await asyncio.gather(
app_context.schedule_task(hello, "bill_task", schedule="0 * * * *", username="bill"),
app_context.schedule_task(hello, "steve_task", schedule="20 * * * *", username="steve"),
app_context.schedule_task(hello, "john_task", schedule="40 * * * *", username="john"),
)
await app_context.run()
This splits the dynamic task into 32 partitions and the task instances are automatically assigned to them based on the hash of the task instance name.
The default partition selector uses SHA1 hash of the instance name, but a custom selector can be provided:
def custom_partition_selector(partition_count: int, task_id: str) -> int:
return hash(task_id) % partition_count # Do not use this, as the hash() is not stable
@app.partitioned_task(
partition_count=32,
partition_selector=custom_partition_selector
)
async def hello(context: Context) -> None:
print(f"Hello {context.args['username']}")
Choosing the partition count¶
Care must be taken when selecting a pertition count, as it is not easy to change it later after tasks have already been
scheduled. Changing a partition count will generally map task instances to a different partition, making them not run and also
making it impossible to unschedule them through unschedule_task()
.
There is also a tradeoff as the time complexity as a single Pyncette poll grows linearly with the total number of tasks (or their partitions). Setting the number of partitions too high can lead to diminished performance due to the polling overhead.
It is possible to configure Pyncette to only poll certain partitions using the enabled_partitions
parameter. This will allow the
tasks to be scheduled and unscheduled by any application instance, but only the partitions selected will be polled. You may use
this if you have a large number of instances for a given task in order to spread the load evenly among them.
@app.partitioned_task(
partition_count=8,
# Partitions 4, 5, 6 and 7 will not be polled
enabled_partitions=[0, 1, 2, 3]
)
async def hello(context: Context) -> None:
print(f"Hello {context.args['username']}")
pyncette package¶
Submodules¶
pyncette.dynamodb module¶
-
class
pyncette.dynamodb.
DynamoDBRepository
(dynamo_resource: Any, skip_table_create: bool, partition_prefix: str, **kwargs)[source]¶ Bases:
pyncette.repository.Repository
Redis-backed store for Pyncete task execution data
-
commit_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → None[source]¶ Commits the task, which signals a successful run.
-
extend_lease
(utc_now: datetime.datetime, task: Task, lease: Lease) → Lease | None[source]¶ Extends the lease on the task. Returns the new lease if lease was still valid.
-
poll_dynamic_task
(utc_now: datetime.datetime, task: Task, continuation_token: ContinuationToken | None = None) → QueryResponse[source]¶ Queries the dynamic tasks for execution
-
poll_task
(utc_now: datetime.datetime, task: Task, lease: Lease | None = None) → PollResponse[source]¶ Polls the task to determine whether it is ready for execution
-
register_task
(utc_now: datetime.datetime, task: pyncette.task.Task) → None[source]¶ Registers a dynamic task
-
-
pyncette.dynamodb.
dynamodb_repository
(*, dynamodb_endpoint: str | None = None, dynamodb_region_name: str | None = None, dynamodb_skip_table_create: bool = False, dynamodb_partition_prefix: str = '', **kwargs) → AsyncIterator[DynamoDBRepository][source]¶ Factory context manager for Redis repository that initializes the connection to Redis
pyncette.errors module¶
-
exception
pyncette.errors.
LeaseLostException
(task: Task)[source]¶ Bases:
pyncette.errors.PyncetteException
Signals that the lease on the task was lost
pyncette.mysql module¶
-
class
pyncette.mysql.
MySQLRepository
(pool: aiomysql.pool.Pool, **kwargs)[source]¶ Bases:
pyncette.repository.Repository
-
commit_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → None[source]¶ Commits the task, which signals a successful run.
-
extend_lease
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → Optional[NewType.<locals>.new_type][source]¶ Extends the lease on the task. Returns the new lease if lease was still valid.
-
poll_dynamic_task
(utc_now: datetime.datetime, task: pyncette.task.Task, continuation_token: Optional[NewType.<locals>.new_type] = None) → pyncette.model.QueryResponse[source]¶ Queries the dynamic tasks for execution
-
poll_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: Optional[NewType.<locals>.new_type] = None) → pyncette.model.PollResponse[source]¶ Polls the task to determine whether it is ready for execution
-
register_task
(utc_now: datetime.datetime, task: pyncette.task.Task) → None[source]¶ Registers a dynamic task
-
pyncette.model module¶
-
class
pyncette.model.
Context
[source]¶ Bases:
object
Task execution context. This class can have dynamic attributes.
-
class
pyncette.model.
ExecutionMode
[source]¶ Bases:
enum.Enum
The execution mode for a Pyncette task.
-
AT_LEAST_ONCE
= 0¶
-
AT_MOST_ONCE
= 1¶
-
-
class
pyncette.model.
FailureMode
[source]¶ Bases:
enum.Enum
What should happen when a task fails.
-
COMMIT
= 2¶
-
NONE
= 0¶
-
UNLOCK
= 1¶
-
-
class
pyncette.model.
PollResponse
(result: ResultType, scheduled_at: datetime.datetime, lease: Lease | None)[source]¶ Bases:
object
The result of a task poll
-
class
pyncette.model.
QueryResponse
(tasks: list[tuple[pyncette.task.Task, Lease]], continuation_token: ContinuationToken | None)[source]¶ Bases:
object
The result of a task query
pyncette.postgres module¶
-
class
pyncette.postgres.
PostgresRepository
(pool: asyncpg.pool.Pool, **kwargs)[source]¶ Bases:
pyncette.repository.Repository
-
commit_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → None[source]¶ Commits the task, which signals a successful run.
-
extend_lease
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → Optional[NewType.<locals>.new_type][source]¶ Extends the lease on the task. Returns the new lease if lease was still valid.
-
poll_dynamic_task
(utc_now: datetime.datetime, task: pyncette.task.Task, continuation_token: Optional[NewType.<locals>.new_type] = None) → pyncette.model.QueryResponse[source]¶ Queries the dynamic tasks for execution
-
poll_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: Optional[NewType.<locals>.new_type] = None) → pyncette.model.PollResponse[source]¶ Polls the task to determine whether it is ready for execution
-
register_task
(utc_now: datetime.datetime, task: pyncette.task.Task) → None[source]¶ Registers a dynamic task
-
pyncette.prometheus module¶
-
class
pyncette.prometheus.
MeteredRepository
(metric_set: pyncette.prometheus.OperationMetricSet, inner_repository: pyncette.repository.Repository)[source]¶ Bases:
pyncette.repository.Repository
A wrapper for repository that exposes metrics to Prometheus
-
commit_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → None[source]¶ Commits the task, which signals a successful run.
-
extend_lease
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → Optional[NewType.<locals>.new_type][source]¶ Extends the lease on the task. Returns the new lease if lease was still valid.
-
poll_dynamic_task
(utc_now: datetime.datetime, task: pyncette.task.Task, continuation_token: Optional[NewType.<locals>.new_type] = None) → pyncette.model.QueryResponse[source]¶ Queries the dynamic tasks for execution
-
poll_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: Optional[NewType.<locals>.new_type] = None) → pyncette.model.PollResponse[source]¶ Polls the task to determine whether it is ready for execution
-
register_task
(utc_now: datetime.datetime, task: pyncette.task.Task) → None[source]¶ Registers a dynamic task
-
-
class
pyncette.prometheus.
OperationMetricSet
(operation_name: str, labels: List[str])[source]¶ Bases:
object
Collection of Prometheus metrics representing a logical operation
-
pyncette.prometheus.
prometheus_fixture
(app_context: pyncette.pyncette.PyncetteContext) → AsyncIterator[None][source]¶
-
pyncette.prometheus.
prometheus_middleware
(context: pyncette.model.Context, next: Callable[[], Awaitable[None]]) → None[source]¶ Middleware that exposes task execution metrics to Prometheus
-
pyncette.prometheus.
use_prometheus
(app: pyncette.pyncette.Pyncette, measure_repository: bool = True, measure_ticks: bool = True, measure_tasks: bool = True) → None[source]¶ Decorate Pyncette app with Prometheus metric exporter.
Parameters: - measure_repository – Whether to measure repository operations
- measure_ticks – Whether to measure ticks
- measure_tasks – Whether to measure individual task executions
pyncette.pyncette module¶
-
class
pyncette.pyncette.
Pyncette
(repository_factory: pyncette.repository.RepositoryFactory = <function sqlite_repository>, executor_cls: type = <class 'pyncette.executor.DefaultExecutor'>, poll_interval: datetime.timedelta = datetime.timedelta(seconds=1), **kwargs)[source]¶ Bases:
object
Pyncette application.
-
create
(context_items: dict[str, Any] | None = None) → AsyncIterator[PyncetteContext][source]¶ Creates the execution context.
-
dynamic_task
(**kwargs) → Callable[[pyncette.model.TaskFunc], pyncette.task.Task][source]¶ Decorator for marking the coroutine as a dynamic task
-
fixture
(name: str | None = None) → Decorator[FixtureFunc][source]¶ Decorator for marking the generator as a fixture
-
main
() → None[source]¶ Convenience entrypoint for console apps, which sets up logging and signal handling.
-
middleware
(func: pyncette.model.MiddlewareFunc) → pyncette.model.MiddlewareFunc[source]¶ Decorator for marking the function as a middleware
-
partitioned_task
(**kwargs) → Callable[[pyncette.model.TaskFunc], pyncette.task.PartitionedTask][source]¶ Decorator for marking the coroutine as a partitioned dynamic task
-
-
class
pyncette.pyncette.
PyncetteContext
(app: pyncette.pyncette.Pyncette, repository: pyncette.repository.Repository, executor: pyncette.executor.DefaultExecutor)[source]¶ Bases:
object
Execution context of a Pyncette app
-
last_tick
¶
-
schedule_task
(task: pyncette.task.Task, instance_name: str, **kwargs) → pyncette.task.Task[source]¶ Schedules a concrete instance of a dynamic task
-
pyncette.redis module¶
-
class
pyncette.redis.
RedisRepository
(redis_client: redis.asyncio.client.Redis, **kwargs)[source]¶ Bases:
pyncette.repository.Repository
Redis-backed store for Pyncete task execution data
-
commit_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → None[source]¶ Commits the task, which signals a successful run.
-
extend_lease
(utc_now: datetime.datetime, task: Task, lease: Lease) → Lease | None[source]¶ Extends the lease on the task. Returns the new lease if lease was still valid.
-
poll_dynamic_task
(utc_now: datetime.datetime, task: Task, continuation_token: ContinuationToken | None = None) → QueryResponse[source]¶ Queries the dynamic tasks for execution
-
poll_task
(utc_now: datetime.datetime, task: Task, lease: Lease | None = None) → PollResponse[source]¶ Polls the task to determine whether it is ready for execution
-
register_scripts
() → None[source]¶ Registers the Lua scripts used by the implementation ahead of time
-
register_task
(utc_now: datetime.datetime, task: pyncette.task.Task) → None[source]¶ Registers a dynamic task
-
pyncette.repository module¶
-
class
pyncette.repository.
Repository
[source]¶ Bases:
abc.ABC
Abstract base class representing a store for Pyncette tasks
-
commit_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → None[source]¶ Commits the task, which signals a successful run.
-
extend_lease
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → Optional[NewType.<locals>.new_type][source]¶ Extends the lease on the task. Returns the new lease if lease was still valid.
-
poll_dynamic_task
(utc_now: datetime.datetime, task: pyncette.task.Task, continuation_token: Optional[NewType.<locals>.new_type] = None) → pyncette.model.QueryResponse[source]¶ Queries the dynamic tasks for execution
-
poll_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: Optional[NewType.<locals>.new_type] = None) → pyncette.model.PollResponse[source]¶ Polls the task to determine whether it is ready for execution
-
register_task
(utc_now: datetime.datetime, task: pyncette.task.Task) → None[source]¶ Registers a dynamic task
-
pyncette.executor module¶
pyncette.healthcheck module¶
-
pyncette.healthcheck.
default_healthcheck
(app_context: pyncette.pyncette.PyncetteContext) → bool[source]¶
-
pyncette.healthcheck.
use_healthcheck_server
(app: pyncette.pyncette.Pyncette, port: int = 8080, bind_address: Optional[str] = None, healthcheck_handler: Callable[[pyncette.pyncette.PyncetteContext], Awaitable[bool]] = <function default_healthcheck>) → None[source]¶ Decorate Pyncette app with a healthcheck endpoint served as a HTTP endpoint.
Parameters: - app – Pyncette app
- port – The local port to bind to
- bind_address – The local address to bind to
Healthcheck_handler: A coroutine that determines health status
pyncette.sqlite module¶
-
class
pyncette.sqlite.
SqliteRepository
(connection: aiosqlite.core.Connection, **kwargs)[source]¶ Bases:
pyncette.repository.Repository
-
commit_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → None[source]¶ Commits the task, which signals a successful run.
-
extend_lease
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: NewType.<locals>.new_type) → Optional[NewType.<locals>.new_type][source]¶ Extends the lease on the task. Returns the new lease if lease was still valid.
-
poll_dynamic_task
(utc_now: datetime.datetime, task: pyncette.task.Task, continuation_token: Optional[NewType.<locals>.new_type] = None) → pyncette.model.QueryResponse[source]¶ Queries the dynamic tasks for execution
-
poll_task
(utc_now: datetime.datetime, task: pyncette.task.Task, lease: Optional[NewType.<locals>.new_type] = None) → pyncette.model.PollResponse[source]¶ Polls the task to determine whether it is ready for execution
-
register_task
(utc_now: datetime.datetime, task: pyncette.task.Task) → None[source]¶ Registers a dynamic task
-
pyncette.task module¶
-
class
pyncette.task.
PartitionedTask
(*, partition_count: int, partition_selector: PartitionSelector = <function _default_partition_selector>, enabled_partitions: list[int] | None = None, **kwargs)[source]¶ Bases:
pyncette.task.Task
-
class
pyncette.task.
Task
(*, name: str, func: TaskFunc, enabled: bool = True, dynamic: bool = False, parent_task: Task | None = None, schedule: str | None = None, interval: datetime.timedelta | None = None, execute_at: datetime.datetime | None = None, timezone: str | None = None, fast_forward: bool = False, failure_mode: FailureMode = <FailureMode.NONE: 0>, execution_mode: ExecutionMode = <ExecutionMode.AT_LEAST_ONCE: 0>, lease_duration: datetime.timedelta = datetime.timedelta(seconds=60), **kwargs)[source]¶ Bases:
object
The base unit of execution
-
canonical_name
¶ A unique identifier for a task instance
-
enabled
¶
-
get_next_execution
(utc_now: datetime.datetime, last_execution: datetime.datetime | None) → datetime.datetime | None[source]¶
-
pyncette.utils module¶
-
pyncette.utils.
with_heartbeat
(lease_remaining_ratio: float = 0.5, cancel_on_lease_lost: bool = False) → Callable[[pyncette.model.TaskFunc], pyncette.model.TaskFunc][source]¶ Decorate the task to use automatic heartbeating in background.
Parameters: - lease_remaining_ratio – Number between 0 and 1. The ratio between elapsed time and the lease duration when heartbeating will be performed. Default is 0.5.
- cancel_on_lease_lost – Whether the task should be cancelled if lease expires. Default is False.
Module contents¶
-
class
pyncette.
Pyncette
(repository_factory: pyncette.repository.RepositoryFactory = <function sqlite_repository>, executor_cls: type = <class 'pyncette.executor.DefaultExecutor'>, poll_interval: datetime.timedelta = datetime.timedelta(seconds=1), **kwargs)[source]¶ Bases:
object
Pyncette application.
-
create
(context_items: dict[str, Any] | None = None) → AsyncIterator[PyncetteContext][source]¶ Creates the execution context.
-
dynamic_task
(**kwargs) → Callable[[pyncette.model.TaskFunc], pyncette.task.Task][source]¶ Decorator for marking the coroutine as a dynamic task
-
fixture
(name: str | None = None) → Decorator[FixtureFunc][source]¶ Decorator for marking the generator as a fixture
-
main
() → None[source]¶ Convenience entrypoint for console apps, which sets up logging and signal handling.
-
middleware
(func: pyncette.model.MiddlewareFunc) → pyncette.model.MiddlewareFunc[source]¶ Decorator for marking the function as a middleware
-
partitioned_task
(**kwargs) → Callable[[pyncette.model.TaskFunc], pyncette.task.PartitionedTask][source]¶ Decorator for marking the coroutine as a partitioned dynamic task
-
-
class
pyncette.
ExecutionMode
[source]¶ Bases:
enum.Enum
The execution mode for a Pyncette task.
-
AT_LEAST_ONCE
= 0¶
-
AT_MOST_ONCE
= 1¶
-
-
class
pyncette.
FailureMode
[source]¶ Bases:
enum.Enum
What should happen when a task fails.
-
COMMIT
= 2¶
-
NONE
= 0¶
-
UNLOCK
= 1¶
-
-
class
pyncette.
Context
[source]¶ Bases:
object
Task execution context. This class can have dynamic attributes.
-
class
pyncette.
PyncetteContext
(app: pyncette.pyncette.Pyncette, repository: pyncette.repository.Repository, executor: pyncette.executor.DefaultExecutor)[source]¶ Bases:
object
Execution context of a Pyncette app
-
last_tick
¶
-
schedule_task
(task: pyncette.task.Task, instance_name: str, **kwargs) → pyncette.task.Task[source]¶ Schedules a concrete instance of a dynamic task
-
Contributing¶
Contributions are welcome, and they are greatly appreciated! Every little bit helps, and credit will always be given.
Bug reports¶
When reporting a bug please include:
- Your operating system name and version.
- Any details about your local setup that might be helpful in troubleshooting.
- Detailed steps to reproduce the bug.
Documentation improvements¶
Pyncette could always use more documentation, whether as part of the official Pyncette docs, in docstrings, or even on the web in blog posts, articles, and such.
Feature requests and feedback¶
The best way to send feedback is to file an issue at https://github.com/tibordp/pyncette/issues.
If you are proposing a feature:
- Explain in detail how it would work.
- Keep the scope as narrow as possible, to make it easier to implement.
- Remember that this is a volunteer-driven project, and that code contributions are welcome :)
Development¶
To set up pyncette for local development:
Fork pyncette (look for the “Fork” button).
Clone your fork locally:
git clone git@github.com:tibordp/pyncette.git
Create a branch for local development:
git checkout -b name-of-your-bugfix-or-feature
Now you can make your changes locally.
Running integration tests assumes that there will be Redis, PostgreSQL, MySQL and Localstack (for DynamoDB) running on localhost. Alternatively, there is a Docker Compose environment that will set up all the backends so that integration tests can run seamlessly:
docker-compose up -d docker-compose run --rm shell
When you’re done making changes run all the checks and docs builder with tox one command:
tox
Pyncette uses black and isort to enforce formatting and import ordering. If you want to auto-format the code, you can do it like this:
tox -e check
Commit your changes and push your branch to GitHub:
git add . git commit -m "Your detailed description of your changes." git push origin name-of-your-bugfix-or-feature
Submit a pull request through the GitHub website.
If you run into issues setting up a local environment or testing the code locally, feel free to submit the PR anyway and GitHub Actions will test it for you.
Pull Request Guidelines¶
If you need some code review or feedback while you’re developing the code just make the pull request.
For merging, you should:
- Update documentation when there’s new API, functionality etc.
- Add a note to
CHANGELOG.rst
about the changes. - Add yourself to
AUTHORS.rst
.
Tips¶
To run a subset of tests:
tox -e envname -- pytest -k test_myfeature
To run all the test environments in parallel (see [tox documentation](https://tox.wiki/en/latest/user_guide.html#parallel-mode)):
tox --parallel auto
Authors¶
- Tibor Djurica Potpara - https://www.ojdip.net
Changelog¶
0.10.1 (2023-05-09)¶
- Include missing lua files in the built wheel
0.10.0 (2023-05-08)¶
- Drop support for Python 3.7
- Add support for Python 3.11
- Modernize Python package structure and linters
- Fix a few bugs and type annotations
0.8.1 (2021-04-08)¶
- Improve performance for calculation of the next execution time
- Add ability for repositories to pass a pagination token
- Add add_to_context() to inject static data to context
- Clean up documentation and add additional examples
0.8.0 (2021-04-05)¶
- Added Amazon DynamoDB backend
- Added MySQL backend
- Added support for partitioned dynamic tasks
0.7.0 (2021-03-31)¶
- Added support for automatic and cooperative lease heartbeating
- PostgreSQL backend can now skip automatic table creation
- Improved signal handling
- CI: Add Codecov integration
- Devenv: Run integration tests in Docker Compose
0.6.1 (2020-04-02)¶
- Optimize the task querying on Postgres backend
- Fix: ensure that there are no name colissions between concrete instances of different dynamic tasks
- Improve fairness of polling tasks under high contention.
0.6.0 (2020-03-31)¶
- Added PostgreSQL backend
- Added Sqlite backend and made it the default (replacing InMemoryRepository)
- Refactored test suite to cover all conformance/integration tests on all backends
- Refactored Redis backend, simplifying the Lua scripts and improving exceptional case handling (e.g. tasks disappearing between query and poll)
- Main loop only sleeps for the rest of remaining poll_interval before next tick instead of the full amount
- General bug fixes, documentation changes, clean up
0.5.0 (2020-03-27)¶
- Fixes bug where a locked dynamic task could be executed again on next tick.
- poll_task is now reentrant with regards to locking. If the lease passed in matches the lease on the task, it behaves as though it were unlocked.
0.4.0 (2020-02-16)¶
- Middleware support and optional metrics via Prometheus
- Improved the graceful shutdown behavior
- Task instance and application context are now available in the task context
- Breaking change: dynamic task parameters are now accessed via context.args[‘name’] instead of context.name
- Improved examples, documentation and packaging
0.2.0 (2020-01-08)¶
- Timezone support
- More efficient poling when Redis backend is used
0.1.1 (2020-01-08)¶
- First release that actually works.
0.0.0 (2019-12-31)¶
- First release on PyPI.