Background Tasks
OpenViper includes a built-in background task queue and periodic scheduler
powered by Dramatiq. Developer modules never
import Dramatiq directly - the public API surface is openviper.tasks.actor()
and openviper.tasks.periodic().
Installation
Install the tasks extra to pull in Dramatiq and its core dependencies,
then choose a broker-specific extra for your message broker:
# Redis broker (default)
pip install 'openviper[tasks-redis]'
# RabbitMQ broker
pip install 'openviper[tasks-rabbitmq]'
# Amazon SQS broker
pip install 'openviper[tasks-sqs]'
The tasks extra installs dramatiq, croniter, and
cron-descriptor. The broker extras add the corresponding driver
(redis, pika, or dramatiq-sqs/boto3).
For testing, use "stub" as the broker - no extra packages are needed.
Configuration
Add openviper.tasks to INSTALLED_APPS and configure the
TASKS dictionary in your settings module:
INSTALLED_APPS = [
"openviper.auth",
"openviper.admin",
"openviper.tasks",
# ... your apps
]
TASKS = {
"enabled": 1,
"broker": "redis",
"broker_url": "redis://localhost:6379",
"backend_url": "", # optional: for result retrieval
"logging": {
"level": "INFO",
"file": {
"log_dir": "logs",
"file_name": "tasks.log",
"log_format": "json",
"max_size": 10, # MB
},
"database": {
"task": 1,
"periodic": 1,
},
},
}
Logging is opt-in by default. When logging.file and
logging.database are both None (the default), the worker
outputs only essential startup/shutdown messages. Set file to a
dict or 1 to enable file logging; set database to a dict or
1 to persist task/periodic execution records.
Configuration reference
Defining Tasks
Use the @actor decorator in your app’s
tasks.py module:
# myapp/tasks.py
from openviper.tasks import actor
@actor
async def send_welcome_email(user_id: int) -> None:
"""Send a welcome email to a new user."""
...
@actor(queue_name="emails", actor_name="core.send_email")
async def send_email(to: str, subject: str) -> None:
...
Enqueue a task from anywhere:
from myapp.tasks import send_welcome_email
send_welcome_email.send(user_id=42)
send_welcome_email.send_with_options(args=(42,), delay=5_000)
When TASKS['enabled'] == 0, .send() falls back to synchronous
execution in the caller’s scope.
Periodic Tasks
Use @periodic to register recurring
jobs:
# myapp/tasks.py
from openviper.tasks import periodic
@periodic(every="60s")
async def health_check() -> None:
"""Run every 60 seconds."""
...
@periodic(cron="0 8 * * *")
async def morning_report() -> None:
"""Run daily at 8 AM."""
...
Supported interval units: s (seconds), m (minutes), h (hours),
d (days).
Periodic tasks are automatically deduplicated across workers: only one worker will enqueue a given job per interval cycle, even when multiple workers share a database.
Periodic parameters
Parameter |
Type |
Description |
|---|---|---|
cron |
str |
Standard 5-field crontab expression |
every |
str |
Human-readable interval ( |
startup |
bool |
Run once immediately when the worker starts |
retries |
int |
Maximum retry attempts on failure (default 3) |
Running the Worker
Start the unified worker process (scheduler + task consumer):
python viperctl.py start-worker
Options:
--processes N- Number of worker processes (default: 1)--threads N- Threads per process (default: 8)--queues queue1 queue2- Specific queues to consume--no-scheduler- Disable the periodic scheduler thread
Note
Only one worker should run the scheduler to avoid duplicate periodic
task enqueues. When running multiple workers, start one with the
scheduler enabled (the default) and all others with --no-scheduler:
# Primary worker (scheduler + consumer)
python viperctl.py start-worker
# Additional workers (consumer only)
python viperctl.py start-worker --no-scheduler
Task Discovery
On startup, the worker scans each app in INSTALLED_APPS for a
tasks.py module and imports it. Apps without a tasks.py are
silently skipped.
Testing
OpenViper provides TaskQueue and EagerTaskRunner for
testing without a live worker:
from openviper.testing.tasks import TaskQueue, EagerTaskRunner, assert_task_queued
def test_task_is_enqueued():
queue = TaskQueue()
with queue.patch():
my_task.send(1, 2)
assert_task_queued(queue, "my_task")
async def test_task_executes_eagerly():
result = await EagerTaskRunner().run(my_task, 1, 2)
assert result == expected
Fixtures
task_queue-TaskQueuefixture that intercepts.send()callstask_runner-EagerTaskRunnerfixture for immediate execution
Persistence Models
Task results and periodic execution records are tracked in the database:
TaskResult- Tracks task state (pending, running, success, failure, dead)ScheduledJob- Synchronises in-memory schedules with the database
Admin Integration
The RunNowAction admin action allows administrators to enqueue a
task directly from the admin panel.