Queries¶
Databasez supports queries using SQLAlchemy Core and raw SQL.
Table declarations¶
To use SQLAlchemy Core queries, define your metadata and tables first.
import sqlalchemy
metadata = sqlalchemy.MetaData()
user = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)
Another example with JSON columns:
import sqlalchemy
metadata = sqlalchemy.MetaData()
user = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
sqlalchemy.Column("config", sqlalchemy.JSON(none_as_null=True)),
)
Creating tables¶
Databasez provides create_all and drop_all helpers that call SQLAlchemy metadata operations through the active connection.
You can also compile DDL manually when needed.
import sqlalchemy
from databasez import Database
database = Database("postgresql+asyncpg://localhost/example")
# Establish the connection pool
await database.connect()
metadata = sqlalchemy.MetaData()
dialect = sqlalchemy.dialects.postgresql.dialect()
# Define your table(s)
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)
# Create tables (manually)
for table in metadata.tables.values():
# Set `if_not_exists=False` if you want the query to throw an
# exception when the table already exists
schema = sqlalchemy.schema.CreateTable(table, if_not_exists=True)
query = str(schema.compile(dialect=dialect))
await database.execute(query=query)
# Create tables automatically using SQLAlchemy metadata.
await database.create_all(metadata)
# Drop all tables when needed.
await database.drop_all(metadata)
# Close all connections in the connection pool
await database.disconnect()
Note
For production projects, use a migration tool such as Alembic.
SQLAlchemy Core queries¶
import sqlalchemy
from databasez import Database
database = Database("postgresql+asyncpg://localhost/example")
# Establish the connection pool
await database.connect()
metadata = sqlalchemy.MetaData()
# Define your table(s)
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)
# Execute
query = users.insert()
values = {"name": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)
# Execute many
query = users.insert()
values = [
{"name": "databasez", "address": "London, United Kingdom"},
{"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)
# Fetch multiple rows
query = users.select()
rows = await database.fetch_all(query=query)
# Fetch single row
query = users.select()
row = await database.fetch_one(query=query)
# Fetch single value, defaults to `column=0`.
query = users.select()
value = await database.fetch_val(query=query)
# Fetch multiple rows without loading them all into memory at once
query = users.select()
async for row in database.iterate(query=query):
print(row)
# Fetch multiple rows in batches
query = users.select()
async for batch_tuple in database.batched_iterate(query=query, batch_size=10):
print(batch_tuple)
# Fetch multiple rows in batches and use as wrapper list
query = users.select()
async for batch_list in database.batched_iterate(query=query, batch_size=10, batch_wrapper=list):
print(batch_list)
# Close all connections in the connection pool
await database.disconnect()
Raw SQL queries¶
from databasez import Database
database = Database("postgresql+asyncpg://localhost/example")
# Establish the connection pool
await database.connect()
# Execute
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = {"name": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)
# Execute many
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = [
{"name": "databasez", "address": "London, United Kingdom"},
{"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)
# Fetch multiple rows
query = "SELECT * FROM users WHERE address = :address"
rows = await database.fetch_all(query=query, values={"address": "London, United Kingdom"})
# Fetch single row
query = "SELECT * FROM users WHERE id = :id"
result = await database.fetch_one(query=query, values={"id": 1})
# Close all connections in the connection pool
await database.disconnect()
Tip
Use named bind parameters with :param_name style.
Iteration and batching¶
iterate(...)yields row-by-row.batched_iterate(...)yields batches.batch_wrappercan transform each batch (tuple,list, custom callable).
Timeouts¶
Most query methods accept timeout=....
For cross-loop debug timeout, use:
databasez.utils.DATABASEZ_RESULT_TIMEOUT