Skip to content

Queries

Databasez supports queries using SQLAlchemy Core and raw SQL.

Table declarations

To use SQLAlchemy Core queries, define your metadata and tables first.

import sqlalchemy

metadata = sqlalchemy.MetaData()

user = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(length=150)),
    sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)

Another example with JSON columns:

import sqlalchemy

metadata = sqlalchemy.MetaData()

user = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(length=150)),
    sqlalchemy.Column("address", sqlalchemy.String(length=500)),
    sqlalchemy.Column("config", sqlalchemy.JSON(none_as_null=True)),
)

Creating tables

Databasez provides create_all and drop_all helpers that call SQLAlchemy metadata operations through the active connection.

You can also compile DDL manually when needed.

import sqlalchemy

from databasez import Database

database = Database("postgresql+asyncpg://localhost/example")

# Establish the connection pool
await database.connect()

metadata = sqlalchemy.MetaData()
dialect = sqlalchemy.dialects.postgresql.dialect()

# Define your table(s)
users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(length=150)),
    sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)

# Create tables (manually)
for table in metadata.tables.values():
    # Set `if_not_exists=False` if you want the query to throw an
    # exception when the table already exists
    schema = sqlalchemy.schema.CreateTable(table, if_not_exists=True)
    query = str(schema.compile(dialect=dialect))
    await database.execute(query=query)

# Create tables automatically using SQLAlchemy metadata.
await database.create_all(metadata)

# Drop all tables when needed.
await database.drop_all(metadata)

# Close all connections in the connection pool
await database.disconnect()

Note

For production projects, use a migration tool such as Alembic.

SQLAlchemy Core queries

import sqlalchemy

from databasez import Database

database = Database("postgresql+asyncpg://localhost/example")


# Establish the connection pool
await database.connect()


metadata = sqlalchemy.MetaData()
# Define your table(s)
users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(length=150)),
    sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)

# Execute
query = users.insert()
values = {"name": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)

# Execute many
query = users.insert()
values = [
    {"name": "databasez", "address": "London, United Kingdom"},
    {"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)

# Fetch multiple rows
query = users.select()
rows = await database.fetch_all(query=query)

# Fetch single row
query = users.select()
row = await database.fetch_one(query=query)

# Fetch single value, defaults to `column=0`.
query = users.select()
value = await database.fetch_val(query=query)

# Fetch multiple rows without loading them all into memory at once
query = users.select()
async for row in database.iterate(query=query):
    print(row)

# Fetch multiple rows in batches
query = users.select()
async for batch_tuple in database.batched_iterate(query=query, batch_size=10):
    print(batch_tuple)

# Fetch multiple rows in batches and use as wrapper list
query = users.select()
async for batch_list in database.batched_iterate(query=query, batch_size=10, batch_wrapper=list):
    print(batch_list)

# Close all connections in the connection pool
await database.disconnect()

Raw SQL queries

from databasez import Database

database = Database("postgresql+asyncpg://localhost/example")


# Establish the connection pool
await database.connect()

# Execute
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = {"name": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)

# Execute many
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = [
    {"name": "databasez", "address": "London, United Kingdom"},
    {"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)

# Fetch multiple rows
query = "SELECT * FROM users WHERE address = :address"
rows = await database.fetch_all(query=query, values={"address": "London, United Kingdom"})

# Fetch single row
query = "SELECT * FROM users WHERE id = :id"
result = await database.fetch_one(query=query, values={"id": 1})

# Close all connections in the connection pool
await database.disconnect()

Tip

Use named bind parameters with :param_name style.

Iteration and batching

  • iterate(...) yields row-by-row.
  • batched_iterate(...) yields batches.
  • batch_wrapper can transform each batch (tuple, list, custom callable).

Timeouts

Most query methods accept timeout=....

For cross-loop debug timeout, use:

databasez.utils.DATABASEZ_RESULT_TIMEOUT