Real-World Example: scylladb/argus
Argus is ScyllaDB’s test-run tracking system — a production application built on cqlengine with users, test runs, events, notifications, and comments. Below is a walkthrough of migrating its key models and operations to coodie.
Models
User — secondary indexes, list column, defaults
cqlengine:
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
import uuid
from datetime import datetime, timezone
class ArgusUser(Model):
__table_name__ = "argus_user"
__keyspace__ = "argus"
id = columns.UUID(primary_key=True, default=uuid.uuid4)
username = columns.Text(index=True)
full_name = columns.Text()
email = columns.Text(index=True)
registration_date = columns.DateTime(default=lambda: datetime.now(timezone.utc))
roles = columns.List(value_type=columns.Text)
picture_id = columns.UUID()
api_token = columns.Text(index=True)
coodie:
from coodie.sync import Document
from coodie.fields import PrimaryKey, Indexed
from pydantic import Field
from typing import Annotated, Optional
from uuid import UUID, uuid4
from datetime import datetime, timezone
class ArgusUser(Document):
id: Annotated[UUID, PrimaryKey()] = Field(default_factory=uuid4)
username: Annotated[str, Indexed()] = ""
full_name: str = ""
email: Annotated[str, Indexed()] = ""
registration_date: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
roles: list[str] = Field(default_factory=list)
picture_id: Optional[UUID] = None
api_token: Annotated[Optional[str], Indexed()] = None
class Settings:
name = "argus_user"
keyspace = "argus"
What changed:
columns.UUID(primary_key=True, default=uuid.uuid4)→Annotated[UUID, PrimaryKey()] = Field(default_factory=uuid4)columns.Text(index=True)→Annotated[str, Indexed()]columns.List(value_type=columns.Text)→list[str]columns.UUID()(nullable) →Optional[UUID] = Nonecolumns.DateTime(default=...)→datetime = Field(default_factory=...)__table_name__/__keyspace__→Settingsinner class
TestRun — composite partition key, clustering order, many indexes
cqlengine:
class ArgusTestRun(Model):
__table_name__ = "argus_test_run"
__keyspace__ = "argus"
build_id = columns.Text(primary_key=True, partition_key=True)
start_time = columns.DateTime(
primary_key=True,
clustering_order="DESC",
default=lambda: datetime.now(timezone.utc),
)
id = columns.UUID(index=True, default=uuid.uuid4)
release_id = columns.UUID(index=True)
group_id = columns.UUID(index=True)
test_id = columns.UUID(index=True)
assignee = columns.UUID(index=True)
status = columns.Text(default=lambda: "created")
investigation_status = columns.Text(default=lambda: "not_investigated")
heartbeat = columns.Integer(default=lambda: 0)
end_time = columns.DateTime()
build_job_url = columns.Text()
scylla_version = columns.Text()
coodie:
from coodie.fields import PrimaryKey, ClusteringKey, Indexed
class ArgusTestRun(Document):
build_id: Annotated[str, PrimaryKey()]
start_time: Annotated[datetime, ClusteringKey(order="DESC")] = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
id: Annotated[UUID, Indexed()] = Field(default_factory=uuid4)
release_id: Annotated[Optional[UUID], Indexed()] = None
group_id: Annotated[Optional[UUID], Indexed()] = None
test_id: Annotated[Optional[UUID], Indexed()] = None
assignee: Annotated[Optional[UUID], Indexed()] = None
status: str = "created"
investigation_status: str = "not_investigated"
heartbeat: int = 0
end_time: Optional[datetime] = None
build_job_url: Optional[str] = None
scylla_version: Optional[str] = None
class Settings:
name = "argus_test_run"
keyspace = "argus"
What changed:
primary_key=True, partition_key=True→PrimaryKey()primary_key=True, clustering_order="DESC"→ClusteringKey(order="DESC")columns.UUID(index=True)→Annotated[Optional[UUID], Indexed()]columns.Integer(default=lambda: 0)→int = 0columns.Text(default=lambda: "created")→str = "created"
Notification — partition + TimeUUID clustering
cqlengine:
class ArgusNotification(Model):
__table_name__ = "argus_notification"
__keyspace__ = "argus"
receiver = columns.UUID(primary_key=True, partition_key=True)
id = columns.TimeUUID(primary_key=True, clustering_order="DESC", default=uuid.uuid1)
type = columns.Text()
state = columns.Integer(default=lambda: 0)
sender = columns.UUID()
source_type = columns.Text()
source_id = columns.UUID()
title = columns.Text()
content = columns.Text()
coodie:
from coodie.fields import PrimaryKey, ClusteringKey, TimeUUID
from uuid import uuid1
class ArgusNotification(Document):
receiver: Annotated[UUID, PrimaryKey()]
id: Annotated[UUID, TimeUUID(), ClusteringKey(order="DESC")] = Field(
default_factory=uuid1
)
type: str = ""
state: int = 0
sender: Optional[UUID] = None
source_type: str = ""
source_id: Optional[UUID] = None
title: str = ""
content: str = ""
class Settings:
name = "argus_notification"
keyspace = "argus"
What changed:
columns.TimeUUID(primary_key=True, clustering_order="DESC")→Annotated[UUID, TimeUUID(), ClusteringKey(order="DESC")]— markers compose insideAnnotatedcolumns.UUID(primary_key=True, partition_key=True)→Annotated[UUID, PrimaryKey()]
Event — UUID partition, multiple indexes
cqlengine:
class ArgusEvent(Model):
__table_name__ = "argus_event"
__keyspace__ = "argus"
id = columns.UUID(primary_key=True, default=uuid.uuid4)
release_id = columns.UUID(index=True)
group_id = columns.UUID(index=True)
test_id = columns.UUID(index=True)
run_id = columns.UUID(index=True)
user_id = columns.UUID(index=True)
kind = columns.Text(index=True)
body = columns.Text()
created_at = columns.DateTime(default=lambda: datetime.now(timezone.utc))
coodie:
class ArgusEvent(Document):
id: Annotated[UUID, PrimaryKey()] = Field(default_factory=uuid4)
release_id: Annotated[Optional[UUID], Indexed()] = None
group_id: Annotated[Optional[UUID], Indexed()] = None
test_id: Annotated[Optional[UUID], Indexed()] = None
run_id: Annotated[Optional[UUID], Indexed()] = None
user_id: Annotated[Optional[UUID], Indexed()] = None
kind: Annotated[str, Indexed()] = ""
body: str = ""
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
class Settings:
name = "argus_event"
keyspace = "argus"
What changed:
Six
columns.UUID(index=True)fields →Annotated[Optional[UUID], Indexed()]columns.Text(index=True)→Annotated[str, Indexed()]
Operations
Connection and table setup
cqlengine:
from cassandra.cqlengine import connection
from cassandra.cqlengine.management import sync_table
connection.setup(["127.0.0.1"], "argus", protocol_version=4)
sync_table(ArgusUser)
sync_table(ArgusTestRun)
sync_table(ArgusNotification)
sync_table(ArgusComment)
coodie:
from coodie.sync import init_coodie
init_coodie(hosts=["127.0.0.1"], keyspace="argus")
ArgusUser.sync_table()
ArgusTestRun.sync_table()
ArgusNotification.sync_table()
ArgusComment.sync_table()
Get-or-create user
cqlengine:
try:
user = ArgusUser.get(id=user_id)
except ArgusUser.DoesNotExist:
user = ArgusUser.create(
id=user_id,
username="alice",
email="alice@example.com",
roles=["ROLE_USER"],
)
coodie:
from coodie.exceptions import DocumentNotFound
try:
user = ArgusUser.get(id=user_id)
except DocumentNotFound:
user = ArgusUser(
id=user_id,
username="alice",
email="alice@example.com",
roles=["ROLE_USER"],
)
user.save()
Query latest test runs (partition + limit)
cqlengine:
runs = list(
ArgusTestRun.filter(build_id="nightly-x86")
.limit(10)
.all()
)
coodie:
runs = ArgusTestRun.find(build_id="nightly-x86").limit(10).all()
Status update (read-modify-save)
cqlengine:
runs = list(ArgusTestRun.filter(build_id="nightly-x86").limit(1).all())
run = runs[0]
run.status = "running"
run.heartbeat = int(time.time())
run.save()
coodie:
run = ArgusTestRun.find(build_id="nightly-x86").limit(1).all()[0]
run.update(status="running", heartbeat=int(time.time()))
Batch event creation
cqlengine:
from cassandra.cqlengine.query import BatchQuery
with BatchQuery() as b:
for event_data in events:
ArgusEvent.batch(b).create(
release_id=release_id,
run_id=run_id,
user_id=event_data["user"],
kind="STATUS_CHANGE",
body=event_data["body"],
)
coodie:
from coodie.sync import BatchQuery
with BatchQuery() as batch:
for event_data in events:
evt = ArgusEvent(
release_id=release_id,
run_id=run_id,
user_id=event_data["user"],
kind="STATUS_CHANGE",
body=event_data["body"],
)
evt.save(batch=batch)
Notification feed (time-ordered partition read)
cqlengine:
notifications = list(
ArgusNotification.filter(receiver=user_id)
.limit(20)
.all()
)
coodie:
notifications = (
ArgusNotification.find(receiver=user_id)
.limit(20)
.all()
)
Comment with collections
cqlengine:
ArgusComment.create(
test_run_id=run_id,
user_id=user_id,
release_id=release_id,
posted_at=int(time.time() * 1000),
message="Investigating flaky test.",
mentions=[reviewer_id, assignee_id],
reactions={"+1": 3, "eyes": 1},
)
coodie:
comment = ArgusComment(
test_run_id=run_id,
user_id=user_id,
release_id=release_id,
posted_at=int(time.time() * 1000),
message="Investigating flaky test.",
mentions=[reviewer_id, assignee_id],
reactions={"+1": 3, "eyes": 1},
)
comment.save()
Comment — BigInt clustering, Map and List collections
cqlengine:
coodie:
What changed:
columns.BigInt(primary_key=True, clustering_order="DESC")→Annotated[int, BigInt(), ClusteringKey(order="DESC")]columns.List(value_type=columns.UUID, default=[])→list[UUID] = Field(default_factory=list)columns.Map(key_type=columns.Text, value_type=columns.Integer)→dict[str, int] = Field(default_factory=dict)