Skip to content

Connections and Transactions

Connections

Connections are the core execution unit in Databasez. Even when you call high-level methods on Database, those calls are routed through a Connection.

If you want multiple operations in one scoped connection, use:

async with database.connection() as connection:
    await connection.execute(
        "INSERT INTO notes(text) VALUES (:text)",
        {"text": "example"},
    )
    rows = await connection.fetch_all("SELECT * FROM notes")

This is the recommended style for explicit control.

Task-local behavior

Connections are task-local. Inside one task, repeated database.connection() calls reuse the same object. Different tasks receive different connections.

Global connection in rollback mode

When force_rollback=True, database.connection() returns a global connection that is wrapped in rollback behavior.

This is useful for tests, but remember:

  • you are effectively sharing one connection
  • nested/parallel usage on the same connection must be planned carefully

Runnable example:

from databasez import Database

setup_database = Database("sqlite+aiosqlite:///testsuite.sqlite3")
database = Database("sqlite+aiosqlite:///testsuite.sqlite3", force_rollback=True)


async def test_foo() -> None:
    async with setup_database:
        await setup_database.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )

    async with database:
        await database.execute("DELETE FROM notes")
        await database.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "inside-test"})
        row = await database.fetch_val("SELECT COUNT(*) FROM notes")
        assert row == 1

    async with database:
        row = await database.fetch_val("SELECT COUNT(*) FROM notes")
        assert row == 0

Multiloop and multithreading

Databasez supports loop-aware routing:

  • same loop: direct call
  • different loop: operation is proxied using cross-loop helpers
  • optional full isolation: global rollback connection in a dedicated thread
flowchart TD
    A["Call from current loop"] --> B{"Loop matches database loop?"}
    B -->|Yes| C["Execute directly"]
    B -->|No| D{"Known sub-database for loop?"}
    D -->|Yes| E["Forward to sub-database"]
    D -->|No| F["Proxy call with async helper"]

Transactions

Transactions are lazily initialized and can be used in three ways.

1. Async context manager

from databasez import Database


async def main() -> None:
    async with Database("sqlite+aiosqlite:///example.db") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )

        async with database.transaction():
            await database.execute(
                "INSERT INTO notes(text) VALUES (:text)", values={"text": "committed"}
            )

        async with database.transaction(force_rollback=True):
            await database.execute(
                "INSERT INTO notes(text) VALUES (:text)", {"text": "rolled back"}
            )

Or from a specific connection:

from databasez import Database


async def main() -> None:
    async with (
        Database("sqlite+aiosqlite:///example.db") as database,
        database.connection() as connection,
    ):
        await connection.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )

        async with connection.transaction():
            await connection.execute(
                "INSERT INTO notes(text) VALUES (:text)", values={"text": "committed"}
            )

        async with connection.transaction(force_rollback=True):
            await connection.execute(
                "INSERT INTO notes(text) VALUES (:text)", {"text": "rolled back"}
            )

2. Decorator

from databasez import Database

database = Database("sqlite+aiosqlite:///example.db")


@database.transaction()
async def create_note(text: str) -> None:
    await database.execute("INSERT INTO notes(text) VALUES (:text)", values={"text": text})


async def main() -> None:
    async with database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )
        await create_note("created inside a transaction")

3. Manual control

from databasez import Database


async def main() -> None:
    async with Database("sqlite+aiosqlite:///example.db") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )
        transaction = await database.transaction()

        try:
            await database.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "manual"})
        except Exception:
            await transaction.rollback()
        else:
            await transaction.commit()

Transaction options

force_rollback

transaction(force_rollback=True) always rolls back on exit.

from databasez import Database

database = Database("sqlite+aiosqlite:///example.db")


async def test_foo() -> None:
    async with database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )
        await database.execute("DELETE FROM notes")

        async with database.transaction():
            await database.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "saved"})

            async with database.transaction(force_rollback=True):
                await database.execute(
                    "INSERT INTO notes(text) VALUES (:text)", {"text": "rolled"}
                )

        rows = await database.fetch_all("SELECT text FROM notes ORDER BY id")
        assert [row.text for row in rows] == ["saved"]

Isolation level

You can pass backend-supported isolation levels:

import asyncio

from databasez import Database


async def add_excitement(connection, id: int) -> None:
    await connection.execute(
        "UPDATE notes SET text = :text WHERE id = :id",
        {"id": id, "text": "databasez is cool!!!"},
    )


async def main() -> None:
    async with Database("sqlite+aiosqlite:///example.db") as database:
        await database.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )
        await database.execute("DELETE FROM notes")

        async with database.transaction(isolation_level="SERIALIZABLE"):
            await database.execute("INSERT INTO notes(id, text) values (1, 'databasez is cool')")
            await asyncio.create_task(add_excitement(database.connection(), id=1))

        value = await database.fetch_val("SELECT text FROM notes WHERE id = :id", {"id": 1})
        assert value == "databasez is cool!!!"

Nested transactions

Nested transactions are supported through savepoints:

import contextlib

import databasez


async def main() -> None:
    async with databasez.Database("sqlite+aiosqlite:///example.db") as db:
        await db.execute(
            "CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, text VARCHAR(100))"
        )
        await db.execute("DELETE FROM notes")

        async with db.transaction():
            await db.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "outer"})

            # Suppress so the inner transaction rolls back without breaking
            # the outer one.
            with contextlib.suppress(ValueError):
                async with db.transaction():
                    await db.execute("INSERT INTO notes(text) VALUES (:text)", {"text": "inner"})
                    raise ValueError("Abort the inner transaction")

        rows = await db.fetch_all("SELECT text FROM notes ORDER BY id")
        assert [row.text for row in rows] == ["outer"]

Transaction stack model

sequenceDiagram
    participant App
    participant Conn as Connection
    participant Stack as Transaction Stack

    App->>Conn: begin outer transaction
    Conn->>Stack: push outer
    App->>Conn: begin nested transaction
    Conn->>Stack: push savepoint
    App->>Conn: rollback nested
    Conn->>Stack: pop savepoint
    App->>Conn: commit outer
    Conn->>Stack: pop outer

See also