QuerySet & Chaining
Document.find() returns a QuerySet — a lazy query builder that
doesn’t hit the database until you call a terminal method such as
all(), first(), or count(). Chain as many modifiers as you like
before executing.
Setup
The examples below assume this Document definition:
from coodie.fields import PrimaryKey, ClusteringKey, Indexed
from pydantic import Field
from typing import Annotated, Optional
from uuid import UUID, uuid4
from datetime import datetime
class BugReport(Document):
project: Annotated[str, PrimaryKey()]
created_at: Annotated[datetime, ClusteringKey(order="DESC")]
title: str
severity: Annotated[str, Indexed()] = "low"
status: str = "open"
assignee: Optional[str] = None
class Settings:
name = "bug_reports"
For sync usage, import from coodie.sync:
from coodie.sync import Document, init_coodie
init_coodie(hosts=["127.0.0.1"], keyspace="my_ks")
For async usage, import from coodie.aio:
from coodie.aio import Document, init_coodie
await init_coodie(hosts=["127.0.0.1"], keyspace="my_ks")
Chainable Methods
Chainable methods return a new QuerySet, so you can stack them:
qs = (
BugReport.find(project="coodie")
.filter(severity="critical")
.order_by("-created_at")
.limit(10)
.allow_filtering()
)
filter(**kwargs)
Add extra WHERE conditions. Supports Django-style lookups (see Filtering (Django-Style Lookups) for the full list):
qs = BugReport.find(project="coodie").filter(severity="critical")
limit(n)
Cap the number of rows returned:
qs = BugReport.find(project="coodie").limit(5)
per_partition_limit(n)
Cap the number of rows per partition:
qs = BugReport.find().per_partition_limit(3).allow_filtering()
order_by(*columns)
Set the clustering order. Prefix a column name with - for descending:
# Newest first (descending)
qs = BugReport.find(project="coodie").order_by("-created_at")
# Oldest first (ascending)
qs = BugReport.find(project="coodie").order_by("created_at")
Warning
order_by only works on clustering columns and must match or reverse
the table’s declared clustering order.
only(*columns) / defer(*columns)
Select a subset of columns:
# Fetch only title and severity
qs = BugReport.find(project="coodie").only("title", "severity")
# Fetch everything except assignee
qs = BugReport.find(project="coodie").defer("assignee")
values_list(*columns)
Return raw tuples instead of Document instances. This avoids the overhead of constructing full Document objects and is useful when you only need a few column values — for example, feeding data into a report, building a quick lookup dict, or exporting rows:
rows = BugReport.find(project="coodie").values_list("title", "severity").all()
# [("Crash on startup", "critical"), ...]
allow_filtering()
Enable ALLOW FILTERING in the generated CQL. Use this when
Cassandra/ScyllaDB refuses a query because it would require a full
table scan:
qs = BugReport.find(status="open").allow_filtering()
Warning
ALLOW FILTERING can be expensive on large tables. Prefer queries that
hit the partition key or a secondary index.
ttl(seconds)
Set a TTL (time-to-live) for write operations (create, update).
The written cells will expire after the given number of seconds:
qs = BugReport.find(project="coodie").ttl(3600)
See TTL (Time-To-Live) for full details.
timestamp(ts)
Set an explicit write timestamp (in microseconds since epoch) for
create or update operations. This overrides the server-assigned
timestamp and can be used for deterministic replays or conflict
resolution:
import time
qs = BugReport.find(project="coodie").timestamp(int(time.time() * 1_000_000))
if_not_exists() / if_exists()
Enable lightweight-transaction guards:
# Insert only if row doesn't already exist
BugReport.find().if_not_exists().create(
project="coodie", created_at=datetime.now(), title="New bug"
)
# Delete only if row exists
BugReport.find(project="coodie", created_at=some_dt).if_exists().delete()
consistency(level)
Override the consistency level for this query. The level is a string
matching a cassandra.ConsistencyLevel attribute:
Level |
Description |
|---|---|
|
Response from one replica (fastest) |
|
Response from two replicas |
|
Response from three replicas |
|
Majority of all replicas across all data centres |
|
Majority of replicas in the local data centre |
|
Quorum in each data centre (writes only) |
|
All replicas must respond (slowest, highest consistency) |
|
One replica in the local data centre |
|
At least one node (writes only, includes hinted handoff) |
|
For LWT reads — linearisable consistency |
|
Like |
qs = BugReport.find(project="coodie").consistency("LOCAL_QUORUM")
timeout(seconds)
Override the query timeout:
qs = BugReport.find(project="coodie").timeout(5.0)
using(ttl=, timestamp=, consistency=, timeout=)
Set multiple query options at once:
qs = BugReport.find(project="coodie").using(
ttl=3600, consistency="LOCAL_QUORUM", timeout=5.0
)
fetch_size(n) / page(paging_state)
Control pagination:
qs = BugReport.find(project="coodie").fetch_size(100)
Terminal Methods
Terminal methods execute the query. In async mode, await the call.
all()
Execute the query and return a list of documents:
bugs = BugReport.find(project="coodie").all() # sync
bugs = await BugReport.find(project="coodie").all() # async
paged_all()
Like all(), but returns a PagedResult with a paging_state for
cursor-based pagination. When paging_state is None, there are no
more pages:
from coodie.results import PagedResult
# Fetch the first page
result = BugReport.find(project="coodie").fetch_size(50).paged_all()
bugs = result.data # list of documents for this page
# Fetch subsequent pages
while result.paging_state is not None:
result = (
BugReport.find(project="coodie")
.fetch_size(50)
.page(result.paging_state)
.paged_all()
)
bugs.extend(result.data)
In async mode:
result = await BugReport.find(project="coodie").fetch_size(50).paged_all()
while result.paging_state is not None:
result = await (
BugReport.find(project="coodie")
.fetch_size(50)
.page(result.paging_state)
.paged_all()
)
first()
Return the first matching document, or None:
bug = BugReport.find(project="coodie").first() # sync
bug = await BugReport.find(project="coodie").first() # async
count()
Return the number of matching rows:
total = BugReport.find(project="coodie").count() # sync
total = await BugReport.find(project="coodie").count() # async
create(**kwargs)
Insert a new row. Combine with if_not_exists() for conditional
inserts:
result = BugReport.find().if_not_exists().create(
project="coodie",
created_at=datetime.now(),
title="New bug",
)
update(**kwargs)
Bulk update matching rows:
BugReport.find(project="coodie", created_at=some_dt).update(
status="closed"
)
delete()
Delete matching rows:
BugReport.find(project="coodie", created_at=some_dt).delete() # sync
await BugReport.find(project="coodie", created_at=some_dt).delete() # async
Iteration
Sync
for bug in BugReport.find(project="coodie").all():
print(bug.title)
Async
async for bug in BugReport.find(project="coodie"):
print(bug.title)
What’s Next?
Filtering (Django-Style Lookups) — Django-style lookup operators for WHERE clauses
Collection Operations — set, list, and map operations
TTL (Time-To-Live) — time-to-live support
Counter Tables — counter tables and increment/decrement