AGGC Preprocessing Demo
This doc shows a vxData usage and AGGC integration in a very draft-y way. Not yet run or tested. Should rather show general concepts.
from dataclasses import dataclass
from pathlib import Path
from vxp_client import PlatformClient, F, payloads, ResourceCreateRequest
import polars as pl
PREPROC_PARAMETERS = {}
def main():
client = PlatformClient("http://127.0.0.1:12300/")
# querying all available AGGC histoscans
# we prepare a few query terms for less verbose code later
is_aggc = F.datasource_id == "datasource/aggc2022"
is_g3 = F.task == "GLEASON_3_SEGMENTATION"
is_g4 = F.task == "GLEASON_4_SEGMENTATION"
is_g5 = F.task == "GLEASON_5_SEGMENTATION"
is_epithelium = F.task == "HEALTHY_EPITHELIUM_SEGMENTATION"
is_stroma = F.task == "STROMA_SEGMENTATION"
# and run client.query(...).filter(...).collect() -> pl.DateFrame
scans = client.query("HistoScan").filter(is_aggc).collect()
maps_g3 = client.query("HistoMap").filter(is_aggc & is_g3).collect()
maps_g4 = client.query("HistoMap").filter(is_aggc & is_g4).collect()
maps_g5 = client.query("HistoMap").filter(is_aggc & is_g5).collect()
maps_epithelium = client.query("HistoMap").filter(is_aggc & is_epithelium).collect()
maps_stroma = client.query("HistoMap").filter(is_aggc & is_stroma).collect()
# AGGC's `OthersMask.tif` (catch-all class, only 2 source files) is dropped
# at ingest time, so the consumer never sees it -- there is nothing to drop
# here. See apps/vxdata-jobs/src/vxdata/jobs/f_20260528_aggc/README.md.
# construct the input to the processing schema -------------------------------------
samples = list()
for scan in scans.iter_rows(named=True):
f_ref = pl.col("reference_histo_scan") == scan["identifier"]
g3 = maps_g3.filter(f_ref).first().to_dict()
g4 = maps_g4.filter(f_ref).first().to_dict()
g5 = maps_g5.filter(f_ref).first().to_dict()
epithelium = maps_epithelium.filter(f_ref).first().to_dict()
stroma = maps_stroma.filter(f_ref).first().to_dict()
sample = InputSample(
histo_scan=scan,
map_g3=g3,
map_g4=g4,
map_g5=g5,
map_epithelium=epithelium,
map_stroma=stroma,
)
samples.append(sample)
# note: yes, it can sometimes be cumbersome to construct the data in a format that
# works elegantly w downstream processing. we havent yet figured out the best way
# to make data queriable very quickly but also ergonomically. for now, getting
# dataframes and wrangling them into a shape that works for you is a very fast and
# fairly ergonomical way to go about this
# run the processing! --------------------------------------------------------------
outputs = list()
for sample in samples:
output = preprocess(sample, PREPROC_PARAMETERS)
outputs.append(output)
# now produce objects to store back into vxData ------------------------------------
for sample, output in zip(samples, outputs):
# first: create the new histoscan ----------------------------------------------
# we initialize the resource to create from the existing resource (copying all metadata)
resource_histoscan = ResourceCreateRequest(**sample.histo_scan)
# we then update the identifier to avoid overrides
resource_histoscan.identifier = sample.histo_scan.identifier + "/preprocessed" # TODO: add some more uniquely identifying information such as version
# we keep the same parent (aka the pseudo-patient)!
# note: this may be a debatable modeling choice. vxData supports later changing of parent/child structure
resource_histoscan.parent_identifier = resource_histoscan.parent_identifier
# we then set provenance fields. Presence of `preprocessing_version`
# (or any other preprocessing_* field) is what marks a HistoScan as
# preprocessed; there is intentionally no separate `is_preprocessed`
# flag.
resource_histoscan.payload.preprocessing_version = "TODO"
resource_histoscan.payload.preprocessing_descriptor = "TODO"
resource_histoscan.payload.preprocessing_parameters = PREPROC_PARAMETERS
resource_histoscan.payload.preprocessing_outputs = output.extra
# we then set the location of the processed scan
resource_histoscan.payload.url = output.histo_scan
# second: we add the G3 map ---------------------------------------------------
resource_g3 = ResourceCreateRequest(**sample.map_g3)
resource_g3.identifier = sample.map_g3.identifier + "/preprocessed"
# we attach this map to the processed parent, we can therefore assume identical processing parameters and don't need to set any others
resource_g3.parent_identifier = resource_histoscan.identifier
resource_g3.payload.url = output.map_g3
# ... repeat for other scans ---------------------------------------------------
...
# lastly: also add the foreground mask -----------------------------------------
resource_fg = ResourceCreateRequest(
identifier=sample.histo_scan.identifier + "/foreground_mask",
datasource_id=sample.histo_scan.datasource_id,
payload=payloads.HistoMap(
task="FOREGROUND_MASK",
labels={"0": "background", "1": "foreground"},
generation_method=...,
generation_version=...,
generation_parameters=...,
generation_outputs=...,
)
)
# upload the data to the vxData database! no need to handle files atm, as long as paths are accessible on disk
client.add_resources([
resource_histoscan,
resource_g3,
...,
resource_fg,
])
# for better readability define simple dataclasses holding this info
@dataclass
class InputSample:
histo_scan: dict # we'll add better types into the SDK soon
map_g3: dict | None
map_g4: dict | None
map_g5: dict | None
map_epithelium: dict | None
map_stroma: dict | None
@dataclass
class OutputSample:
histo_scan: Path # to processed tiff
map_g3: Path | None
map_g4: Path | None
map_g5: Path | None
map_epithelium: Path | None
map_stroma: Path | None
foreground_mask: Path # to tiff
extra: dict | None
def preprocess(sample: InputSample, run_args: dict) -> list[OutputSample]:
"""
This is our histo-package routine!
Runs preprocessing of histo scan.
Applies X, Y, and Z to the provided scan. Transforms associated maps (e.g. gleason maps) appropriately.
Returns paths to locations of processed images, plus the determined foreground mask to be persisted as well.
"""
raise NotImplementedError()
if __name__ == "__main__":
main()