Skip to main content

sdk

from vxdata.sdk import Client

# Create schemas require fields to be present according to our domain logic (e.g. there's many mandatory fields for resource versions!)
# Update schemas require no field to be present; if one is, it is being updated/set
# Response schemas contain all data available in the DB. this may also contain "auto-generated" fields such as timestamps or compiled lists of children resources
from vxdata.schemas.create import VolumeCreate
from vxdata.schemas.response import VolumeResponse
from vxdata.schemas.update import DataSourceUpdate, VolumeUpdate

vxd = Client(base_url=...)

# by configuring a timestamp along the Client instantiation, we can specify the timestamp to use for versioning
# this as_of timestamp will be injected into every subsequent retrieve or query API request
# but the retrieve and query requests also expose the as_of parameter as well (taking higher priority)
vxd = Client(base_url=..., timestamp="2025-01-01")

vxd.resources.retrieve("datasource/basel") # returns a generic ResourceResponse schema
# retrieve is overloaded: a single id returns one resource, a list returns
# list[...]. Strict in both forms: if any requested id is missing/tombstoned it
# raises ResourceNotFound (listing the missing ids) -- never silently dropped.
# to speed up large reads, we can also skip compiling lists of child resources
vxd.resources.retrieve(huge_list_of_resources, include_children=False)

vxd.histo_maps.retrieve(...)

# we use create schemas to insert new resources.
# this raises an exception if a resource with the same identifier already exists!
vxd.resources.create(
VolumeCreate(
# ResourceVersion fields ---
identifier="vol1",
parent_identifier="datasource/basel",
license="VIRDX-ANONYMIZED",
# VolumePayload fields -----
b_value=1000,
volume_type="DWI",
...
)
)
vxd.resources.create(
[
VolumeCreate(...),
HistoMapCreate(...),
]
)

# seed a create from a response (fork/re-create); response-only fields are dropped
vxd.volumes.create(VolumeCreate.from_response(vxd.volumes.retrieve("vol1")))

# we use update schemas to update existing resources (internally, new resourceversions are created)
# this raises an exception if a resource with the given identifier doesn't alreaady exist
# or the specified parent_resource doesn't exist (if parent_identifier is part of the update request)
# this is handled on the API side and errors propagated
vxd.resources.update(
"datasource/basel",
DataSourceUpdate(parent_identifier=None), # only specify what should change!
)
# soft-delete and restore are explicit verbs, never an update field:
vxd.resources.delete("datasource/basel")
vxd.resources.restore("datasource/basel")
vxd.resources.update(
"volume/vol1",
VolumeUpdate(
b_value=2000,
), # we can update the payload, and only have to specify the changing fields
)
vxd.resources.update(
{
"datasource/basel": DataSourceUpdate(description="test"),
"volume/vol1": VolumeUpdate(b_value=4000),
}
) # if a single update fails everything fails/raises, nothing should be committed!

# hard-deletes are not available (yet), we consider "delete" surface to be soft-deletes
vxd.resources.delete("volume/vol1")
vxd.resources.delete([...])

# a few general notes:
# deletes, retrieves, or updates of deleted resources raise a not-found exception
# for all list operations: if one fails, all fail - none commit.
# the API should try making use of "batched" operations as much as possible!
# high-throughput of operations is a key goal of this product

# to obtain better-typed responses, we add payload "namespaces":
# (these are only on the SDK side! for ergonomics)
vxd.patients.retrieve("patient/BB12345") -> PatientResponse
vxd.patients.retrieve("datasource/nonexistent") -> raises UnexpectedPayload
vxd.patients.retrieve([...]) -> list[PatientResponse]
vxd.patients.create(PatientCreate(...)) -> PatientResponse
vxd.patients.create([...]) -> list[PatientResponse]
vxd.patients.update(...)
vxd.patients.update({...}) # analogously, including the potential exceptions
vxd.patients.delete(...)
vxd.patients.delete([...])

# identical behavior across payload types:
vxd.histo_maps.retrieve(...) -> HistoMapResponse
vxd.volumes.retrieve([...]) -> list[VolumeResponse]
vxd.pathology_assessments.create(...) -> PathologyAssessmentResponse
vxd.vxannotate_case_records.create([...]) -> list[VxAnnotateCaseRecordResponse]
vxd.pathology_specimen.update(...) -> PathologySpecimenResponse
vxd.artefacts.update({...}) -> list[ArtefactResponse]
vxd.pirads_assessments.delete(...) -> None
vxd.imaging_studies.delete([...]) -> None
# here, code reusability is key! they all hit the same API endpoint, they just need to validate client-side

# NOTE: auto-upload on update is NOT implemented.
# Upload files explicitly via vxd.storage.upload() first, then pass the returned s3:// url to update.

# we provide a lineage namespace
vxd.lineage.parents("volumes/vol1")
vxd.lineage.children("datasource/basel") # immediate children
# optimized is-in checks are not yet implemented, could be in the future if it becomes an issue/feature request

# querying: always through the typed namespace, so the payload type is implicit
# and collect_as_pydantic() returns that type's Response model
vxd.volumes.query().filter(...).collect()

# collect_as_pydantic() returns typed Response models instead of a DataFrame (might be slower)
vxd.volumes.query().filter(...).collect_as_pydantic() # -> list[VolumeResponse]

# we preserve patient- and study-generating helper methods by extending the patients and imaging_studies namespaces
vxd_pat_identifier = vxd.patients.register(
external_uid,
datasource_id, # needed as external uid may not be globally unique
) # raises if already exists
vxd_pat_identifier = vxd.patients.resolve(external_uid, datasource_id) # raises if not present
vxd_identifier = vxd.imaging_studies.register(external_uid, vxd_pat_identifier) # parent patient identifier scopes the study; study external uids need not be globally unique
vxd_identifier = vxd.imaging_studies.resolve(external_uid, vxd_pat_identifier)

# benchmarking_results is a plain typed namespace; stow DataFrames via storage, then create:
vxd.benchmarking_results.create(BenchmarkingResultCreate(
identifier=f"benchmarking/{task_id}",
clearml_id=task_id,
eval_task="ve2e-default",
benchmarked_on=patient_ids,
summary_metrics={"auc": 0.8},
other_data=vxd.storage.upload_dataframes(
{"patient_level_results": df}, group=f"benchmarking/{task_id}"
),
))

# artefacts are created upstream (e.g. by the Argo run); attach output DataFrames then read back:
vxd.artefacts.attach_files({"results": df}, artefact_id=run_id) # raises if artefact already has files
df = vxd.artefacts.download(run_id) # single df when one file, dict[str, df] for multiple