Connections and Transactions¶
Connections¶
Connections are the core execution unit in Databasez. Even when you call high-level methods on Database, those calls are routed through a Connection.
If you want multiple operations in one scoped connection, use:
async with database.connection() as connection:
await connection.execute(
"INSERT INTO notes(text) VALUES (:text)",
{"text": "example"},
)
rows = await connection.fetch_all("SELECT * FROM notes")
This is the recommended style for explicit control.
Task-local behavior¶
Connections are task-local. Inside one task, repeated database.connection() calls reuse the same object. Different tasks receive different connections.
Global connection in rollback mode¶
When force_rollback=True, database.connection() returns a global connection that is wrapped in rollback behavior.
This is useful for tests, but remember:
- you are effectively sharing one connection
- nested/parallel usage on the same connection must be planned carefully
Runnable example:
from databasez import Database
setup_database = Database("sqlite+aiosqlite:///testsuite.sqlite3")
database = Database("sqlite+aiosqlite:///testsuite.sqlite3", force_rollback=True)
async def test_foo() -> None:
async with setup_database:
await setup_database.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
async with database:
await database.execute("DELETE FROM notes")
await database.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "inside-test"})
row = await database.fetch_val("SELECT COUNT(*) FROM notes")
assert row == 1
async with database:
row = await database.fetch_val("SELECT COUNT(*) FROM notes")
assert row == 0
Multiloop and multithreading¶
Databasez supports loop-aware routing:
- same loop: direct call
- different loop: operation is proxied using cross-loop helpers
- optional full isolation: global rollback connection in a dedicated thread
flowchart TD
A["Call from current loop"] --> B{"Loop matches database loop?"}
B -->|Yes| C["Execute directly"]
B -->|No| D{"Known sub-database for loop?"}
D -->|Yes| E["Forward to sub-database"]
D -->|No| F["Proxy call with async helper"]
Transactions¶
Transactions are lazily initialized and can be used in three ways.
1. Async context manager¶
from databasez import Database
async def main() -> None:
async with Database("sqlite+aiosqlite:///example.db") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
async with database.transaction():
await database.execute(
"INSERT INTO notes(text) VALUES (:text)", values={"text": "committed"}
)
async with database.transaction(force_rollback=True):
await database.execute(
"INSERT INTO notes(text) VALUES (:text)", {"text": "rolled back"}
)
Or from a specific connection:
from databasez import Database
async def main() -> None:
async with (
Database("sqlite+aiosqlite:///example.db") as database,
database.connection() as connection,
):
await connection.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
async with connection.transaction():
await connection.execute(
"INSERT INTO notes(text) VALUES (:text)", values={"text": "committed"}
)
async with connection.transaction(force_rollback=True):
await connection.execute(
"INSERT INTO notes(text) VALUES (:text)", {"text": "rolled back"}
)
2. Decorator¶
from databasez import Database
database = Database("sqlite+aiosqlite:///example.db")
@database.transaction()
async def create_note(text: str) -> None:
await database.execute("INSERT INTO notes(text) VALUES (:text)", values={"text": text})
async def main() -> None:
async with database:
await database.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
await create_note("created inside a transaction")
3. Manual control¶
from databasez import Database
async def main() -> None:
async with Database("sqlite+aiosqlite:///example.db") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
transaction = await database.transaction()
try:
await database.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "manual"})
except Exception:
await transaction.rollback()
else:
await transaction.commit()
Transaction options¶
force_rollback¶
transaction(force_rollback=True) always rolls back on exit.
from databasez import Database
database = Database("sqlite+aiosqlite:///example.db")
async def test_foo() -> None:
async with database:
await database.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
await database.execute("DELETE FROM notes")
async with database.transaction():
await database.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "saved"})
async with database.transaction(force_rollback=True):
await database.execute(
"INSERT INTO notes(text) VALUES (:text)", {"text": "rolled"}
)
rows = await database.fetch_all("SELECT text FROM notes ORDER BY id")
assert [row.text for row in rows] == ["saved"]
Isolation level¶
You can pass backend-supported isolation levels:
import asyncio
from databasez import Database
async def add_excitement(connection, id: int) -> None:
await connection.execute(
"UPDATE notes SET text = :text WHERE id = :id",
{"id": id, "text": "databasez is cool!!!"},
)
async def main() -> None:
async with Database("sqlite+aiosqlite:///example.db") as database:
await database.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
await database.execute("DELETE FROM notes")
async with database.transaction(isolation_level="SERIALIZABLE"):
await database.execute("INSERT INTO notes(id, text) values (1, 'databasez is cool')")
await asyncio.create_task(add_excitement(database.connection(), id=1))
value = await database.fetch_val("SELECT text FROM notes WHERE id = :id", {"id": 1})
assert value == "databasez is cool!!!"
Nested transactions¶
Nested transactions are supported through savepoints:
import contextlib
import databasez
async def main() -> None:
async with databasez.Database("sqlite+aiosqlite:///example.db") as db:
await db.execute(
"CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
)
await db.execute("DELETE FROM notes")
async with db.transaction():
await db.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "outer"})
# Suppress so the inner transaction rolls back without breaking
# the outer one.
with contextlib.suppress(ValueError):
async with db.transaction():
await db.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "inner"})
raise ValueError("Abort the inner transaction")
rows = await db.fetch_all("SELECT text FROM notes ORDER BY id")
assert [row.text for row in rows] == ["outer"]
Transaction stack model¶
sequenceDiagram
participant App
participant Conn as Connection
participant Stack as Transaction Stack
App->>Conn: begin outer transaction
Conn->>Stack: push outer
App->>Conn: begin nested transaction
Conn->>Stack: push savepoint
App->>Conn: rollback nested
Conn->>Stack: pop savepoint
App->>Conn: commit outer
Conn->>Stack: pop outer