Database & ORM
The openviper.db package provides an async ORM built on top of
SQLAlchemy Core. Define models as Python classes, query them with a
chainable QuerySet API, and manage schema changes with the
built-in JSON schema synchronization system - all with async/await.
Overview
Models inherit from Model and declare fields
as class-level descriptors. Every database operation is coroutine-based
and uses a per-request connection pool managed by SQLAlchemy’s async engine.
A permissions layer enforces row-level access control by default;
use ignore_permissions=True or the bypass_permissions()
context manager for trusted internal code paths.
Table naming is automatic: a model in apps/blog/models.py named Post
gets table name blog_post. Override with Meta.table_name.
Key Classes & Functions
openviper.db.models
- class Model
Base class for all ORM models. Subclass and set
class Metato configure the table name and other options.Every model gets a default auto-incrementing integer
idprimary key unless you declare your own.Meta options:
table_name- explicit table name (auto-generated otherwise).abstract = True- mark as abstract; no table is created; useful for shared-field mixins.proxy = True- proxy model; shares the parent’s table, no new table is created.managed = False- OpenViper will not create or manage this table; useful for views or externally-managed tables.read_only = True- all write operations raiseDatabaseReadOnlyError.single = True- singleton model; only one instance is allowed. See Single Models.cache_ttl = N- cache query results for N seconds using the project cache backend (0disables caching).verbose_name- human-readable singular name (defaults to the class name).verbose_name_plural- human-readable plural name (defaults toverbose_name + 's').ordering = ["-created_at"]- default ordering applied to all queries on this model. Prefix with-for descending.unique_together = [("field_a", "field_b")]- composite unique constraint. Each inner tuple or list is one constraint.index_together = [("field_a", "field_b")]- composite index. Each inner tuple or list creates one index.indexes = [Index(...)]- explicitIndexdeclarations.constraints = [CheckConstraint(...)]- explicitConstraintdeclarations.backend = "alias"- route this model to a specific database alias instead of the default.
- pk
Alias for
id. Always returns the primary key value.
- has_changed
Trueif any field value differs from the last saved state.
- save(ignore_permissions=False) Awaitable[None]
Persist (INSERT or UPDATE) the instance. Runs the full lifecycle hook chain - see Lifecycle Hooks below.
- delete(ignore_permissions=False) Awaitable[None]
Delete this instance. Fires
on_delete→ DELETE →after_delete.
- get_sensitive(field_name: str) str | None
Decrypt and return the plaintext value of a
SensitiveFieldcolumn. RaisesFieldErrorif the field is not aSensitiveField.config = await APIConfig.objects.get(id=1) plaintext = config.get_sensitive("api_key") # "sk-live-abc123"
For
PasswordField(user passwords), the original plaintext cannot be recovered; usecheck_password()instead.
- class AbstractModel
Abstract base - subclasses inherit its fields but share no table. Use for timestamp mixins or other common patterns.
- class Manager
Attached to every non-abstract model as
Model.objects. Provides factory methods that all returnQuerySet(lazy) or awaitables (terminal).Factory methods (return QuerySet):
all(),filter(),exclude(),order_by(),only(),defer(),distinct(),annotate(),select_related(),prefetch_related(),using(alias)Awaitable shortcuts:
get(**kwargs),get_or_none(**kwargs),create(**kwargs),get_or_create(defaults, **kwargs),update_or_create(defaults, **kwargs),in_bulk(id_list=None, field_name='id')Terminal awaitables:
first(),last(),count(),exists(),values(*fields),values_list(*fields, flat=False),aggregate(**kwargs),explain()Bulk operations:
bulk_create(objs, batch_size=None),bulk_update(objs, fields, batch_size=None)Iteration helpers:
iterator(chunk_size=2000),batch(size=100),id_batch(size=100)Single-model helpers (for models with
Meta.single = True):get_single(),create_single(**kwargs),update_single(**kwargs),get_or_create_single(**kwargs)Custom QuerySet:
from_queryset(queryset_class)- classmethod that returns a Manager subclass using queryset_class for all queries, so custom QuerySet methods are callable directly on the manager.class PublishedQuerySet(QuerySet): def published(self) -> QuerySet: return self.filter(published=True) PublishedManager = Manager.from_queryset(PublishedQuerySet) class Post(Model): objects = PublishedManager() posts = await Post.objects.published().all()
update_or_createlooks up a row matching**kwargs, updates it withdefaults, and creates it when absent. Returns(instance, created).in_bulkreturns a{field_value: instance}mapping. When id_list isNoneall rows are returned (subject toMAX_QUERY_ROWS).
- class QuerySet
Lazy, chainable query builder. All filtering/ordering/slicing methods return a new
QuerySet(non-mutating). Results are fetched only when a terminal method is awaited.A
QuerySetis also directly awaitable:await qsis equivalent toawait qs.all().Note
No default row limit
filter(),all(), and other terminal methods return all matching rows by default. Uselimit()per-query or setMAX_QUERY_ROWSin your project settings to apply a project-wide cap. For large datasets preferiterator()/batch()which stream rows without loading the entire result into memory.# ── no limit by default ────────────────────────────────────── async def example(): posts = await Post.objects.all() # all rows # ── explicit limit ──────────────────────────────────────── posts = await Post.objects.limit(50).all() # exactly 50 # ── optional project-wide cap ──────────────────────────────── # settings.py MAX_QUERY_ROWS = 1000 # apply a default cap to all queries # ── stream all rows without a limit ────────────────────────── async def example(): async for post in Post.objects.filter(published=True).iterator(): await process(post) # keyset-paginated, unbounded # ── batch processing ────────────────────────────────────── async for batch in Post.objects.all().batch(size=200): await send_emails(batch) # list of ≤200 instances
Tip
Use a serializer for automatic pagination
ModelSerializerprovides three class-methods that work directly on aQuerySetand handle the row limit / pagination for you:from openviper.serializers import ModelSerializer from myapp.models import Post class PostSerializer(ModelSerializer): class Meta: model = Post fields = ["id", "title", "created_at"] # PAGE_SIZE = 25 ← default; override per-serializer async def list_posts(request): # ── serialize_many: all rows, batched internally ────────── # Bypasses the 1 000-row cap; fetches in PAGE_SIZE chunks. data = await PostSerializer.serialize_many( Post.objects.filter(published=True).order_by("-created_at") ) # returns: list[dict] # ── paginate: one page with total count + next/prev URLs ── page = int(request.query_params.get("page", 1)) result = await PostSerializer.paginate( Post.objects.filter(published=True).order_by("-created_at"), page=page, page_size=20, base_url="/api/posts/", ) # result.count → total matching rows # result.results → list[dict] for this page # result.next → "/api/posts/?page=3&page_size=20" or None # result.previous → "/api/posts/?page=1&page_size=20" or None return JSONResponse(result.model_dump()) async def list_posts_json(request): # ── serialize_many_json: same as serialize_many but returns bytes body = await PostSerializer.serialize_many_json( Post.objects.filter(published=True) ) return Response(body, content_type="application/json")
Filtering:
- filter(*q_objects, **kwargs) QuerySet
Add
WHEREconditions. AcceptsQobjects orkeyword=valuepairs with lookup suffixes:__contains,__icontains,__startswith,__endswith,__gt,__gte,__lt,__lte,__in,__isnull,__exact,__range.FK traversal is supported across multiple relationship hops:
author__username="alice"performs a JOIN on theauthorFK field, andparent__reporter__profile__bio="hello"chains three JOINs continuously through the relationship graph. A maximum depth of 5 FK hops is enforced per traversal to prevent query complexity attacks.
- exclude(*q_objects, **kwargs) QuerySet
Exclude rows matching the given conditions (negated
filter).
Ordering & slicing:
- limit(n) QuerySet
Limit the number of rows returned (SQL
LIMIT).# Get first 10 posts posts = await Post.objects.order_by("-created_at").limit(10).all() # Combine with offset for manual pagination page_size = 20 page = 3 offset = (page - 1) * page_size posts = await Post.objects.limit(page_size).offset(offset).all()
- offset(n) QuerySet
Skip the first n rows (SQL
OFFSET).Warning
OFFSETperformance degrades linearly with the offset value. For example,OFFSET 1000000requires scanning 1M rows before returning results. For deep pagination, prefer keyset (cursor) pagination usingpaginate()with a cursor, or useiterator()for streaming large datasets.# Manual pagination with offset (simple but slow for deep pages) page_size = 20 page_number = 2 offset = (page_number - 1) * page_size posts = await Post.objects.order_by("id").limit(page_size).offset(offset).all() # Better: Use paginate() which runs COUNT + fetch concurrently page = await Post.objects.order_by("id").paginate(page_number=2, page_size=20)
- using(alias) QuerySet
Route this query to the database alias, overriding the router for the entire chain. Unknown aliases raise
DatabaseAliasNotFoundErrorwhen the query executes.users = await User.objects.using("replica").filter(is_active=True).all()
- select_for_update(nowait=False, skip_locked=False) QuerySet
Apply
SELECT FOR UPDATErow-level locking. Must be used insideatomic()ortransaction().nowait - raise immediately if a conflicting lock is held.
skip_locked - skip locked rows rather than waiting.
nowaitandskip_lockedare mutually exclusive.
async with atomic(): post = await Post.objects.select_for_update().filter(id=1).get() post.views += 1 await post.save()
Column selection:
- only(*fields) QuerySet
Restrict the
SELECTto the given field names. The primary key is always included. All other fields will beNoneon instances.
- defer(*fields) QuerySet
Exclude the given fields from the
SELECT. Mutually exclusive withonly()- the last call wins.
Relationships:
Perform a SQL
JOINto load the related objects in the same query (one query total). Best for single FK objects where you always need the related data.
Issue a separate
id__inquery per field and attach results to instances in Python (two queries total for one FK). Best for large result sets or when the related data is optional.
Annotations:
- annotate(**kwargs) QuerySet
Add computed columns using
Fexpressions or aggregate functions (Count,Sum,Avg,Max,Min). Values are accessible as attributes on returned instances.
Terminal methods:
- update(**kwargs) Awaitable[int]
Bulk-update matching rows. Accepts
Fexpressions for atomic arithmetic updates. Returns the number of affected rows.
- values(*fields) Awaitable[list[dict]]
Return each row as a plain
dict. If fields are given only those columns are included.
- values_list(*fields, flat=False) Awaitable[list[tuple] | list]
Return rows as tuples. Use
flat=Truewith exactly one field to get a flat list of scalar values.
- aggregate(**kwargs) Awaitable[dict]
Execute aggregate functions and return a single result dict. Values must be aggregate instances (
Count,Sum, etc.).
- raw_sql() str
Return the parameterized SQL query string for the current queryset.
Compiles the query with parameter placeholders (not literal values) so that sensitive filter values are not exposed in the output. Useful for debugging and logging without leaking credentials.
qs = Post.objects.filter(is_published=True).order_by("-created_at").limit(10) print(qs.raw_sql()) # SELECT posts.* FROM posts WHERE is_published = :is_published_1 # ORDER BY created_at DESC LIMIT :param_1
- paginate(page_number=1, page_size=25, cursor=None) Awaitable[Page]
Paginate the queryset efficiently using concurrent COUNT and data fetch. Returns a
Pageobject containing items, total count, and cursor.Uses
asyncio.gather()to run the count and fetch queries in parallel for ~2x faster performance. Supports both OFFSET-based pagination (viapage_number) and keyset cursor pagination for sequential navigation.# Basic pagination page = await Post.objects.filter(is_published=True).paginate( page_number=2, page_size=20 ) # page.items → list[Post] (20 items) # page.total_count → total matching rows # page.number → current page (2) # page.page_size → items per page (20) # page.next_cursor → cursor for next page (or None) # With cursor for fast sequential navigation page = await Post.objects.order_by("created_at", "id").paginate( page_number=1, page_size=20, cursor=request.query_params.get("cursor") )
Performance: COUNT and data fetch execute concurrently. Deep pages using OFFSET can be slow (O(N)); prefer cursors for Next/Prev navigation.
Streaming / large datasets:
- iterator(chunk_size=2000) AsyncGenerator[Model, None]
Yield model instances one at a time using keyset (
id > last_id) pagination. Safe for very large tables - not subject to the default 1 000-row cap.
- class F(name)
Reference a model field for database-side operations. Supports arithmetic:
+,-,*,/.
- class Q(**kwargs)
Encapsulate filter conditions supporting
|(OR),&(AND), and~(NOT).
- class Page
Pagination result container returned by
QuerySet.paginate().
Aggregate classes:
Count(field, distinct=False), Sum(field), Avg(field),
Max(field), Min(field)
openviper.db.fields
All fields accept a common set of base arguments: primary_key,
null, blank, unique, db_index, default, db_column,
choices, help_text.
Field Class |
Description |
|---|---|
|
Auto-incrementing integer primary key. Added automatically. |
|
32-bit integer (−2 147 483 648 – 2 147 483 647). |
|
64-bit integer. |
|
Floating-point number. |
|
Fixed-precision decimal stored as |
|
Variable-length string stored as |
|
Unbounded text stored as |
|
|
|
|
|
|
|
Boolean (stored as 0/1). |
|
Timezone-aware datetime. |
|
Date-only column. |
|
Time-only column. |
|
UUID stored as text. |
|
Arbitrary JSON stored as |
|
Raw binary data ( |
|
File upload; stores path relative to |
|
|
|
HTML content with XSS sanitization via nh3 (or |
|
One-way hashed password storage (e.g. user passwords). Stores
Argon2id/bcrypt hashes produced by
|
|
Encrypted secret storage (e.g. API keys, tokens). Values are
encrypted with Fernet symmetric encryption derived from
|
|
Many-to-one relationship. FK column is |
|
One-to-one relationship (unique FK). |
|
Many-to-many via a junction table (no direct column). |
|
ISO 3166-1 alpha-2 country code stored as a 2-character from openviper.db import Model
from openviper.contrib.fields.countries import CountryField
class UserProfile(Model):
country = CountryField(null=True, db_index=True)
# Property access on instances
profile.country.iso # 'GB'
profile.country.name # 'United Kingdom'
profile.country.dial_code # '+44'
profile.country.alpha3 # 'GBR'
profile.country.numeric # '826'
profile.country.continent # 'Europe'
profile.country.region # 'Northern Europe'
profile.country.capital # 'London'
profile.country.currency_code # 'GBP'
profile.country.currency_name # 'British Pound'
profile.country.currency_symbol # '£'
profile.country.languages # ['en']
profile.country.tld # '.gb'
profile.country.flag # '🇬🇧'
profile.country.is_eu # False
profile.country.is_eea # False
profile.country.timezone # 'Europe/London'
profile.country.is_valid # True
profile.country == 'GB' # True (str subclass)
|
|
Non-negative integer ( |
|
16-bit integer (−32 768 to 32 767). Stored as |
|
Auto-incrementing 64-bit integer primary key. Use when the 32-bit
|
|
|
|
|
|
IPv4 or IPv6 address stored as |
|
IPv4/IPv6 address with protocol filtering. protocol accepts
|
|
Homogeneous list of a scalar field type. On PostgreSQL the column
uses the native |
|
Monetary amount paired with an ISO 4217 currency code. Creates two
columns: a from openviper.db import Model
from openviper.contrib.fields.currencies import CurrencyField, Money
class Product(Model):
price = CurrencyField(max_digits=12, decimal_places=2, default_currency="USD")
product = Product(price=Money("19.99", "USD"))
product.price # Money('19.99', 'USD')
product.price_currency # 'USD'
# Native SQL aggregation
total = await Product.objects.aggregate(Sum("price"))
|
openviper.db.models.Index
- class Index(fields, name=None, condition=None)
Database index declaration for
Meta.indexes.- Parameters:
fields – List of column names to index.
name – Optional index name.
condition – Optional SQL
WHEREclause for partial indexes. Validated against dangerous SQL patterns (semicolons, comments, DDL/DML keywords) to prevent statement injection.
class User(Model): class Meta: indexes = [ Index(fields=["first_name", "last_name"], name="idx_user_names"), Index(fields=["email"], condition="is_active = 1", name="idx_active_email"), ]
openviper.db.models.Constraint
- class Constraint(name)
Base class for database constraints declared in
Meta.constraints.
- class CheckConstraint(name, check)
Database
CHECKconstraint. The check parameter is a raw SQL expression validated against dangerous SQL patterns to prevent statement injection.class Price(Model): amount = DecimalField(max_digits=10, decimal_places=2) class Meta: constraints = [ CheckConstraint(name="price_positive", check="amount > 0"), ]
- class UniqueConstraint(name, fields)
Database
UNIQUEconstraint spanning one or more columns. Declared inMeta.constraints.class Membership(Model): class Meta: constraints = [ UniqueConstraint(name="unique_member", fields=["user_id", "group_id"]), ]
openviper.db.models.TextChoice
openviper.db.connection
- init_db(drop_first=False) Awaitable[None]
Create all registered tables. If drop_first is
True, drop all tables before recreating them. Called automatically on startup.
- close_db() Awaitable[None]
Dispose of the engine, close all pooled connections, and clean up stale per-event-loop locks.
- get_connection() AsyncConnection
Return an async database connection. If a per-request connection is active (via
request_connection()), it is returned directly so that multiple ORM calls within a single request share the same underlying connection. Otherwise a fresh connection is acquired from the pool.
- request_connection() AsyncContextManager[AsyncConnection]
Pin a single pooled connection for the duration of a request. All
get_connection()calls inside this context return the same connection, eliminating per-query pool round-trips.async with request_connection() as conn: posts = await Post.objects.filter(published=True).all() count = await Post.objects.count()
- configure_db(database_url, echo=False) Awaitable[None]
Explicitly configure the database engine. Call before
init_db(). Disposes any existing engine before replacing it so that pooled connections are not leaked.
- transaction(using=None, read_only=False) AsyncContextManager[AsyncConnection]
Async context manager for a transaction pinned to a database alias. When using is provided, the transaction runs on the backend for that alias and pins the routing context. If read_only is
Trueand the alias is configured as read-only, the transaction is allowed.async with transaction(using="default"): await Post.objects.create(title="Hello")
- atomic() AsyncContextManager[AsyncConnection]
Async context manager that wraps a block of ORM operations in a transaction. On normal exit the transaction is committed; on any exception it is rolled back and the exception is re-raised.
async with atomic(): await Post.objects.create(title="Hello") await Tag.objects.create(name="python")
Lifecycle Hooks
Override any of the following async methods on a Model subclass to
hook into the persistence lifecycle. All default to no-ops.
Create flow (pk is None):
before_validate → validate → before_insert → before_save
→ INSERT → after_insert → on_change
Update flow (pk set):
before_validate → validate → before_save
→ UPDATE → on_update → on_change (only when data actually changed)
Delete flow:
on_delete → DELETE → after_delete
Hook |
When called |
|---|---|
|
Before field validation, on both create and update. |
|
Validates all fields. Override to add custom business rules -
call |
|
Create only, after validation, before the INSERT. |
|
Both create and update, immediately before the DB write. |
|
Create only, after the INSERT succeeds. |
|
Update only, after the UPDATE succeeds. |
|
Before the DELETE. Raise to abort. |
|
After a successful DELETE. |
|
After create or update when field values changed.
previous_state is |
Model Events (Signals)
Beyond lifecycle hooks, model events allow you to attach handlers
outside the model class using @model_event.trigger():
from openviper.db.events import model_event
@model_event.trigger("myapp.models.Post.after_insert")
async def on_post_created(post, event):
print(f"New post: {post.title}")
Example Usage
See also
Working projects that use the ORM:
examples/todoapp/ - simple model (
CharField,BooleanField,DateTimeField)examples/ecommerce_clone/ -
UUIDFieldPK,DecimalField,ImageField,after_insertlifecycle hookexamples/ai_moderation_platform/ -
ForeignKey,JSONField,ImageField, customBaseUserexamples/fx/ - root-layout project with models and schemas
Defining Models
from openviper.db.models import Model, AbstractModel
from openviper.db import fields
class TimestampMixin(AbstractModel):
"""Shared timestamp fields - no table created."""
created_at = fields.DateTimeField(auto_now_add=True)
updated_at = fields.DateTimeField(auto_now=True)
class Meta:
abstract = True
class Author(TimestampMixin):
class Meta:
table_name = "authors"
name = fields.CharField(max_length=200)
email = fields.EmailField(unique=True)
class Post(TimestampMixin):
class Meta:
table_name = "posts"
title = fields.CharField(max_length=255)
body = fields.TextField()
author = fields.ForeignKey(Author, on_delete="CASCADE")
is_published = fields.BooleanField(default=False)
views = fields.IntegerField(default=0)
async def before_save(self) -> None:
self.title = self.title.strip()
async def validate(self) -> None:
await super().validate()
if not self.title:
raise ValueError("Title is required")
Querying
from openviper.exceptions import DoesNotExist
async def example():
# Fetch all published posts ordered by newest first
posts = await Post.objects.filter(is_published=True).order_by("-created_at").all()
# Directly await a QuerySet
posts = await Post.objects.filter(is_published=True)
# Get a single post or raise DoesNotExist
try:
post = await Post.objects.get(id=42)
except DoesNotExist:
...
# Get or None
post = await Post.objects.get_or_none(id=42)
# Create
author = await Author.objects.create(name="Alice", email="alice@example.com")
# get_or_create
post, created = await Post.objects.get_or_create(
defaults={"body": "..."},
title="Hello World",
)
# Update all matching rows
updated = await Post.objects.filter(author=author).update(is_published=True)
# Count & exists
n = await Post.objects.filter(is_published=True).count()
exists = await Post.objects.filter(title__contains="OpenViper").exists()
# First and last
first_post = await Post.objects.order_by("created_at").first()
latest_post = await Post.objects.last() # uses -id by default
# only / defer
titles = await Post.objects.only("id", "title").all()
light = await Post.objects.defer("body").all()
# distinct
authors = await Post.objects.distinct().values("author_id")
# values / values_list
rows = await Post.objects.filter(is_published=True).values("id", "title")
# [{"id": 1, "title": "Hello"}, ...]
ids = await Post.objects.values_list("id", flat=True)
# [1, 2, 3, ...]
pairs = await Post.objects.values_list("id", "title")
# [(1, "Hello"), (2, "World"), ...]
F() Expressions
from openviper.db.models import F
async def example():
# Atomic increment without a Python round-trip
await Post.objects.filter(pk=1).update(views=F("views") + 1)
# Multi-field arithmetic
await Post.objects.filter(pk=1).update(score=F("likes") * 2 - F("dislikes"))
# Filter where one column > another
await Post.objects.filter(views__gte=F("min_views")).all()
Q() Objects - Complex Filters
from openviper.db.models import Q
async def example():
# OR
posts = await Post.objects.filter(
Q(is_published=True) | Q(is_featured=True)
).all()
# NOT
posts = await Post.objects.filter(~Q(status="draft")).all()
# AND via & operator
posts = await Post.objects.filter(
Q(is_published=True) & Q(views__gte=100)
).all()
# Compound: (title contains 'python' OR views >= 1000) AND published
posts = await Post.objects.filter(
Q(title__icontains="python") | Q(views__gte=1000),
is_published=True,
).all()
Aggregate Functions
from openviper.db.models import Count, Sum, Avg, Max, Min
async def example():
result = await Post.objects.filter(is_published=True).aggregate(
total=Count("id"),
total_views=Sum("views"),
avg_views=Avg("views"),
max_views=Max("views"),
min_views=Min("views"),
)
# {"total": 42, "total_views": 9820, "avg_views": 233.8, ...}
annotate
from openviper.db.models import Count, F
async def example():
posts = await (
Post.objects
.annotate(double_views=F("views") * 2, like_count=Count("likes"))
.filter(is_published=True)
.all()
)
for post in posts:
print(post.double_views, post.like_count)
bulk_create and bulk_update
async def example():
# bulk_create - INSERT all in a single statement
posts = [Post(title=f"Post {i}", body="...") for i in range(100)]
await Post.objects.bulk_create(posts)
# bulk_update - UPDATE in batches
published_posts = await Post.objects.filter(is_published=False).all()
for post in published_posts:
post.is_published = True
await Post.objects.bulk_update(published_posts, fields=["is_published"])
Large Dataset Iteration
async def example():
# iterator - keyset pagination, one instance at a time
async for post in Post.objects.filter(is_published=True).iterator(chunk_size=500):
await process(post)
# batch - OFFSET pagination, groups of instances
async for batch in Post.objects.filter(is_published=True).batch(size=200):
await index_search(batch)
# id_batch - keyset pagination, groups of instances (stable during writes)
async for batch in Post.objects.filter(is_published=True).id_batch(size=500):
await process_batch(batch)
Transactions
from openviper.db.connection import transaction, atomic
async def example():
# transaction() pins all ORM operations to a specific alias
async with transaction(using="default"):
await Post.objects.create(title="Hello")
await Tag.objects.create(name="python")
# atomic() wraps a block in a commit/rollback transaction
async with atomic():
await Post.objects.create(title="Hello")
await Tag.objects.create(name="python")
Bypassing Permissions
from openviper.core.context import ignore_permissions_ctx
async def example():
token = ignore_permissions_ctx.set(True)
try:
sensitive = await SensitiveModel.objects.all(ignore_permissions=True)
finally:
ignore_permissions_ctx.reset(token)
# Or pass flag directly:
all_users = await User.objects.filter(ignore_permissions=True).all()
Migrations
OpenViper uses a JSON-based schema synchronization system. Each model
gets a JSON schema file in <app>/schemas/<ModelName>.json that
represents the desired database schema state.
Run openviper viperctl makemigrations . to detect model changes and
update the JSON schema files, then openviper viperctl migrate . to
apply them to the database.
The system is stateless and idempotent: migrate diffs the JSON
schemas against the live database via SQLAlchemy introspection and
applies only the delta. Running migrate twice produces no changes
on the second run.
Supported databases: PostgreSQL, MariaDB/MySQL, MSSQL, Oracle, SQLite.
Supported schema operations:
CreateTable- create a new table.DropTable- drop a table entirely (data loss; use with caution).AddColumn- add a new column to an existing table.RemoveColumn- drop a column from a table.RenameColumn- rename a column. Detected automatically bymakemigrationswhen a field is renamed (matched by type).AlterColumn- change column type or primary key status.CreateIndex- create a composite or named index. Supportsunique=Truefor unique indexes.RemoveIndex- drop a composite or named index.AddConstraint- add aCHECKorUNIQUEconstraint.RemoveConstraint- remove a previously-added constraint.RunSQL- execute arbitrary SQL with optional bound parameters.
Type change validation:
makemigrations validates column type changes before writing JSON
schemas. Incompatible conversions (e.g., Integer to String) raise an
error unless --force is passed. Narrowing changes (e.g.,
VARCHAR(200) to VARCHAR(50)) produce a warning.
Rename detection:
When a field is renamed, makemigrations matches the old column
name to the new one by type and stores old_name in the JSON schema.
At migrate time, this metadata triggers a RENAME COLUMN
operation instead of drop + add, preserving existing data.
Data Patches
For one-time data transformations that cannot be expressed as schema
changes, use the @db_patch decorator. Patches are Python async
functions that run during migrate and are tracked to ensure each
runs exactly once.
from openviper.db.patches import db_patch
@db_patch
async def backfill_status():
"""Runs after schema sync (default)."""
await User.objects.filter(status=None).update(status="active")
@db_patch(post_migrate=False)
async def read_old_fields():
"""Runs before schema sync - old schema still in place."""
...
@db_patch(order=2)
async def cleanup_permissions():
"""Runs after schema sync, ordered after other post patches."""
...
Patch phases:
post_migrate=False(pre-migration): runs before schema sync. The old schema is still in the database, so patches can read fields that are about to be removed or renamed.post_migrate=True(default, post-migration): runs after schema sync. The new schema is in place, so patches can use new fields.
Patches are discovered automatically from <app>/patches/*.py
files. Each patch runs exactly once, tracked by the
openviper_patches database table.
openviper.db.executor
- bypass_permissions(*, reason=None) Generator[None]
Context manager that disables row-level permission checks for the duration of the block. Accepts an optional reason string for audit logging.
from openviper.db.executor import bypass_permissions async with bypass_permissions(reason="system cleanup"): await SensitiveModel.objects.all()
- validate_regex_pattern(pattern) None
Reject regex patterns that could cause catastrophic backtracking (ReDoS). Enforces a maximum length and blocks nested quantifier patterns. Raises
FieldErroron dangerous patterns.
- assert_safe_table_name(name) None
Raise
ValueErrorif name contains characters outside the safe identifier set ([a-zA-Z0-9_], must start with a letter or underscore). Used to prevent SQL injection through table names.
- escape_like(value) str
Escape LIKE metacharacters (
%and_) in user-provided values. Prevents LIKE injection attacks where malicious input like%could match all rows. Use withescape="\\"in the query.from openviper.db.executor import escape_like pattern = escape_like(user_input) results = await Product.objects.filter(name__contains=pattern).all()
openviper.db.utils
- class BoundedDict(maxsize)
collections.OrderedDictsubclass that evicts the oldest entries when exceeding maxsize. Thread-safe via an internal lock. Used as the compiled-statement cache for SQLAlchemy.
- validate_on_delete(action, context) str
Validate that action is a supported
ON DELETEaction. Accepts"CASCADE","PROTECT","RESTRICT","SET_NULL","SET_DEFAULT","NO_ACTION","DO_NOTHING","SET NULL", and"SET DEFAULT". Returns the normalised uppercased action. RaisesValueErroron invalid actions.
- validate_sql_expression(value, field_name, context) str
Reject SQL expressions containing destructive patterns (semicolons, SQL comments, DDL/DML keywords). Used to validate
conditionparameters onIndexandCheckConstraint. Returns value on success; raisesValueErroron dangerous input.
- validate_identifier(name, description) str
Validate that name is a safe SQL identifier (letters, digits, underscores; must start with a letter or underscore). Returns name on success; raises
ValueErroron invalid input.
- quote_identifier(name, dialect) str
Quote a table or column name based on the database dialect. Uses backtick quoting for MySQL, square brackets for MSSQL, and double-quote quoting for PostgreSQL/SQLite/Oracle.
openviper.db.exceptions
All database exceptions inherit from Python’s built-in Exception.
Exception |
Description |
|---|---|
|
Invalid |
|
Configured backend cannot be imported or found. |
|
Requested database alias is not configured. |
|
Write attempted on a read-only database alias. |
|
Router returned invalid alias or routing failed. |
|
Invalid routing behavior inside a transaction. |
|
Backend does not support the requested operation. |
|
Base error for virtual model operations. |
|
Virtual model backend name is not registered. |
|
Write operation attempted on a read-only virtual model. |
|
Virtual backend cannot execute the requested query. |
|
Virtual backend operation failed. |
|
Base error for single model operations. |
|
Requested single model instance does not exist. |
|
A single model instance already exists. |
|
Delete attempted for single model data. |
|
Duplicate attempted for single model data. |
openviper.db.events
- model_event
Module-level event dispatcher instance. Use the
@model_event.triggerdecorator to register handlers outside the model class.from openviper.db.events import model_event @model_event.trigger("myapp.models.Post.after_insert") async def on_post_created(post, event): print(f"New post: {post.title}")
ManyToManyField API
- class ManyToManyField(to, through=None, related_name=None)
Many-to-many relationship via a junction table. When through is omitted, an auto-generated junction model is created.
Accessing the field on an instance returns a
ManyToManyManagerwith the following methods:- add(*objects) Awaitable[None]
Add one or more objects to the relationship. Raises
ValueErrorif the source instance is unsaved.
LazyFK - Lazy FK Loading
- class LazyFK(fk_field, instance, fk_id)
Awaitable proxy returned by
ForeignKeydescriptor access when the related object is not yet loaded. Supports transparent comparison, hashing, and string conversion so that code using raw FK ID values continues to work without awaiting.post = await Post.objects.get(id=1) author_proxy = post.author # LazyFK - not yet loaded author = await post.author # loads from DB print(author.username) # Comparison works without loading if post.author == 5: ... # Boolean check works without loading if post.author: ...
Virtual Models
A Virtual Model uses Meta.virtual = True and a Meta.backend string
to route data operations to a custom async storage adapter instead of the
SQL database. Virtual models are useful for data that lives in external
APIs, in-memory stores, or read-only services.
from openviper.db import Model
from openviper.db.fields import BooleanField, CharField
from openviper.db.backends.base import VirtualBackend, VirtualBackendCapabilities
class SettingsBackend(VirtualBackend):
capabilities = VirtualBackendCapabilities(
supports_create=True,
supports_update=True,
supports_delete=False,
supports_filter=True,
supports_count=True,
)
async def get(self, model_class, primary_key):
...
async def list(self, model_class, query):
...
async def create(self, model_class, data):
...
async def update(self, model_class, primary_key, data):
...
async def delete(self, model_class, primary_key):
...
class RemoteSettings(Model):
site_name = CharField(max_length=255)
maintenance_mode = BooleanField(default=False)
class Meta:
virtual = True
backend = "settings_api"
Virtual models can also be single models (Meta.single = True) for
singleton settings backed by an external API. See Single Models for
details on single model behaviour.
VirtualBackendCapabilities
VirtualBackend
- class VirtualBackend
Abstract base class for virtual model storage adapters. Subclass and implement the required methods to connect models to REST APIs, caches, or other data sources.
- capabilities: VirtualBackendCapabilities
Declares which operations this backend supports.
- get(model_class, primary_key) Awaitable[Mapping | None]
Return one record by primary key, or
Noneif not found.
- list(model_class, query) Awaitable[Sequence[Mapping]]
Return records matching the
QuerySpec.
- create(model_class, data) Awaitable[Mapping]
Create and return one record.
- update(model_class, primary_key, data) Awaitable[Mapping]
Update and return one record.