Skip to content

registry

Codec registry construction helpers (internal).

The session layer uses a single default CodecRegistry instance per process. The registry is ordered: the first codec that accepts a value wins, so the registration order is significant.

Important invariants: - SHM-backed codecs should be registered before pickle. - Pickle fallback MUST be registered last.

Attributes

_default_registry = None module-attribute

Classes

CodecRegistry

Ordered registry of encoders used for IPC serialization.

Source code in brmspy/_session/codec/base.py
class CodecRegistry:
    """Ordered registry of encoders used for IPC serialization."""

    def __init__(self) -> None:
        self._by_codec: dict[str, Encoder] = {}
        self._encoders: list[Encoder] = []

    def register(self, encoder: Encoder) -> None:
        """
        Register an encoder instance.

        Parameters
        ----------
        encoder : brmspy.types.session.Encoder
            Encoder to register. Its `codec` attribute is used as the key when present,
            otherwise the class name is used.
        """
        if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
            codec_name = encoder.codec  # type: ignore
        else:
            codec_name = type(encoder).__name__
        self._by_codec[codec_name] = encoder
        encoder.codec = codec_name  # type: ignore
        self._encoders.append(encoder)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        """
        Encode an object by selecting the first encoder that accepts it.

        Parameters
        ----------
        obj : Any
            Value to encode.
        shm_pool : Any
            SHM pool used by codecs for allocating buffers.

        Returns
        -------
        brmspy.types.session.EncodeResult
        """
        for enc in self._encoders:
            if enc.can_encode(obj):
                res = enc.encode(obj, shm_pool)
                if not res.codec:
                    res.codec = type(enc).__name__
                return res

        # fallback to pickle
        if "PickleCodec" not in self._by_codec:
            raise RuntimeError("No pickle codec registered")
        return self._by_codec["PickleCodec"].encode(obj, shm_pool)

    def decode(
        self,
        payload: PayloadRef,
        shm_pool: Any,
    ) -> Any:
        """
        Decode a payload using a named codec.

        Parameters
        ----------
        codec : str
            Codec identifier previously returned by `encode()`.
        meta : dict[str, Any]
            Codec metadata.
        buffers : list[memoryview]
            Memoryviews for attached SHM buffers.
        buffer_specs : list[dict]
            Original buffer specs (name/size) corresponding to `buffers`.
        shm_pool : Any
            SHM pool (some codecs may attach additional buffers).

        Returns
        -------
        Any
        """
        codec = payload["codec"]
        if codec not in self._by_codec:
            raise ValueError(
                f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
            )

        buffers = []

        @contextmanager
        def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
            buf = shm_pool.attach(ref)
            memview = memoryview(buf.shm.buf)
            view = memview[: ref["content_size"]].cast("B")

            try:
                if not ref["temporary"]:
                    # non-temporary buffers are associated with columns / objects
                    buffers.append(buf)

                yield buf, view

            finally:
                # deterministic cleanup for temporary buffers
                if ref["temporary"]:
                    # IMPORTANT: release view before closing shm
                    try:
                        view.release()
                        memview.release()
                        shm_pool.gc(ref["name"])
                        buf.shm.close()
                    except:
                        pass

        value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
        self._attach_shm_lifetime(value, buffers)

        return value

    @classmethod
    def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
        """Keep SHM blocks alive as long as `obj` is alive."""
        if not shms:
            return
        if obj is None or isinstance(obj, (bool, str, int, float)):
            return

        try:
            weakref.finalize(obj, _noop, tuple(shms))
        except:
            return

Attributes

_by_codec = {} instance-attribute
_encoders = [] instance-attribute

Functions

__init__()
Source code in brmspy/_session/codec/base.py
def __init__(self) -> None:
    self._by_codec: dict[str, Encoder] = {}
    self._encoders: list[Encoder] = []
register(encoder)

Register an encoder instance.

Parameters:

Name Type Description Default
encoder Encoder

Encoder to register. Its codec attribute is used as the key when present, otherwise the class name is used.

required
Source code in brmspy/_session/codec/base.py
def register(self, encoder: Encoder) -> None:
    """
    Register an encoder instance.

    Parameters
    ----------
    encoder : brmspy.types.session.Encoder
        Encoder to register. Its `codec` attribute is used as the key when present,
        otherwise the class name is used.
    """
    if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
        codec_name = encoder.codec  # type: ignore
    else:
        codec_name = type(encoder).__name__
    self._by_codec[codec_name] = encoder
    encoder.codec = codec_name  # type: ignore
    self._encoders.append(encoder)
encode(obj, shm_pool)

Encode an object by selecting the first encoder that accepts it.

Parameters:

Name Type Description Default
obj Any

Value to encode.

required
shm_pool Any

SHM pool used by codecs for allocating buffers.

required

Returns:

Type Description
EncodeResult
Source code in brmspy/_session/codec/base.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    """
    Encode an object by selecting the first encoder that accepts it.

    Parameters
    ----------
    obj : Any
        Value to encode.
    shm_pool : Any
        SHM pool used by codecs for allocating buffers.

    Returns
    -------
    brmspy.types.session.EncodeResult
    """
    for enc in self._encoders:
        if enc.can_encode(obj):
            res = enc.encode(obj, shm_pool)
            if not res.codec:
                res.codec = type(enc).__name__
            return res

    # fallback to pickle
    if "PickleCodec" not in self._by_codec:
        raise RuntimeError("No pickle codec registered")
    return self._by_codec["PickleCodec"].encode(obj, shm_pool)
decode(payload, shm_pool)

Decode a payload using a named codec.

Parameters:

Name Type Description Default
codec str

Codec identifier previously returned by encode().

required
meta dict[str, Any]

Codec metadata.

required
buffers list[memoryview]

Memoryviews for attached SHM buffers.

required
buffer_specs list[dict]

Original buffer specs (name/size) corresponding to buffers.

required
shm_pool Any

SHM pool (some codecs may attach additional buffers).

required

Returns:

Type Description
Any
Source code in brmspy/_session/codec/base.py
def decode(
    self,
    payload: PayloadRef,
    shm_pool: Any,
) -> Any:
    """
    Decode a payload using a named codec.

    Parameters
    ----------
    codec : str
        Codec identifier previously returned by `encode()`.
    meta : dict[str, Any]
        Codec metadata.
    buffers : list[memoryview]
        Memoryviews for attached SHM buffers.
    buffer_specs : list[dict]
        Original buffer specs (name/size) corresponding to `buffers`.
    shm_pool : Any
        SHM pool (some codecs may attach additional buffers).

    Returns
    -------
    Any
    """
    codec = payload["codec"]
    if codec not in self._by_codec:
        raise ValueError(
            f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
        )

    buffers = []

    @contextmanager
    def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
        buf = shm_pool.attach(ref)
        memview = memoryview(buf.shm.buf)
        view = memview[: ref["content_size"]].cast("B")

        try:
            if not ref["temporary"]:
                # non-temporary buffers are associated with columns / objects
                buffers.append(buf)

            yield buf, view

        finally:
            # deterministic cleanup for temporary buffers
            if ref["temporary"]:
                # IMPORTANT: release view before closing shm
                try:
                    view.release()
                    memview.release()
                    shm_pool.gc(ref["name"])
                    buf.shm.close()
                except:
                    pass

    value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
    self._attach_shm_lifetime(value, buffers)

    return value
_attach_shm_lifetime(obj, shms) classmethod

Keep SHM blocks alive as long as obj is alive.

Source code in brmspy/_session/codec/base.py
@classmethod
def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
    """Keep SHM blocks alive as long as `obj` is alive."""
    if not shms:
        return
    if obj is None or isinstance(obj, (bool, str, int, float)):
        return

    try:
        weakref.finalize(obj, _noop, tuple(shms))
    except:
        return

InferenceDataCodec

Bases: Encoder

Encode arviz.InferenceData by pushing its underlying arrays into shm.

Source code in brmspy/_session/codec/builtin.py
class InferenceDataCodec(Encoder):
    """Encode arviz.InferenceData by pushing its underlying arrays into shm."""

    def can_encode(self, obj: Any) -> bool:
        return isinstance(obj, az.InferenceData)

    def encode(self, obj: az.InferenceData, shm_pool: Any) -> EncodeResult:
        buffers: list[ShmRef] = []
        groups_meta: dict[str, Any] = {}
        total_bytes = 0

        # Walk each group: posterior, posterior_predictive, etc.
        for group_name in obj.groups():
            ds: xr.Dataset = getattr(obj, group_name)
            g_meta: dict[str, Any] = {
                "data_vars": {},
                "coords": {},
            }

            # COORDS: generally smaller, but can be arrays.
            for cname, coord in ds.coords.items():
                values = np.asarray(coord.values)
                if values.dtype.kind in "iufb":  # numeric-ish
                    data = values.tobytes(order="C")
                    nbytes = len(data)
                    block = shm_pool.alloc(nbytes)
                    block.shm.buf[:nbytes] = data

                    buffer_idx = len(buffers)
                    buffers.append(block.to_ref())
                    total_bytes += nbytes

                    g_meta["coords"][cname] = {
                        "kind": "array",
                        "buffer_idx": buffer_idx,
                        "dtype": str(values.dtype),
                        "shape": list(values.shape),
                        "dims": list(coord.dims),
                        "nbytes": nbytes,
                    }
                else:
                    # Non-numeric / object coords: keep them small & pickle in meta.
                    g_meta["coords"][cname] = {
                        "kind": "pickle",
                        "dims": list(coord.dims),
                        "payload": pickle.dumps(
                            coord.values, protocol=pickle.HIGHEST_PROTOCOL
                        ),
                    }

            # DATA VARS: main heavy arrays
            for vname, da in ds.data_vars.items():
                arr = np.asarray(da.data)
                _, spec, dtype, shape, order = ShmArray.to_shm(arr, shm_pool)
                meta = {"dtype": dtype, "shape": shape, "order": order}
                nbytes = spec["content_size"]

                buffer_idx = len(buffers)
                buffers.append(spec)
                total_bytes += nbytes

                g_meta["data_vars"][vname] = {
                    "buffer_idx": buffer_idx,
                    "dtype": str(meta["dtype"]),
                    "shape": list(meta["shape"]),
                    "dims": list(da.dims),
                    "nbytes": nbytes,
                }

            groups_meta[group_name] = g_meta

        meta: dict[str, Any] = {
            "groups": groups_meta,
            "codec_version": 1,
        }

        return EncodeResult(
            codec=type(self).__name__,
            meta=meta,
            buffers=buffers,
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        meta = payload["meta"]
        specs = payload["buffers"]
        groups_meta = meta["groups"]
        groups: dict[str, xr.Dataset] = {}

        for group_name, g_meta in groups_meta.items():
            data_vars = {}
            coords = {}

            # Rebuild coords
            for cname, cmeta in g_meta["coords"].items():
                kind = cmeta["kind"]
                if kind == "array":
                    spec = specs[cmeta["buffer_idx"]]
                    with get_buf(spec) as (block, _):
                        arr = ShmArray.from_block(
                            block, shape=cmeta["shape"], dtype=np.dtype(cmeta["dtype"])
                        )
                        coords[cname] = (tuple(cmeta["dims"]), arr)
                elif kind == "pickle":
                    values = pickle.loads(cmeta["payload"])
                    coords[cname] = (tuple(cmeta["dims"]), values)
                else:
                    raise ValueError(f"Unknown coord kind: {kind!r}")

            # Rebuild data_vars
            for vname, vmeta in g_meta["data_vars"].items():
                spec = specs[vmeta["buffer_idx"]]
                with get_buf(spec) as (block, _):
                    arr = ShmArray.from_block(
                        block, vmeta["shape"], dtype=np.dtype(vmeta["dtype"])
                    )
                    data_vars[vname] = (tuple(vmeta["dims"]), arr)

            ds = xr.Dataset(
                data_vars=data_vars,
                coords=coords,
            )
            groups[group_name] = ds

        # Construct InferenceData from datasets
        idata = az.InferenceData(**groups, warn_on_custom_groups=False)
        return idata

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    return isinstance(obj, az.InferenceData)
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: az.InferenceData, shm_pool: Any) -> EncodeResult:
    buffers: list[ShmRef] = []
    groups_meta: dict[str, Any] = {}
    total_bytes = 0

    # Walk each group: posterior, posterior_predictive, etc.
    for group_name in obj.groups():
        ds: xr.Dataset = getattr(obj, group_name)
        g_meta: dict[str, Any] = {
            "data_vars": {},
            "coords": {},
        }

        # COORDS: generally smaller, but can be arrays.
        for cname, coord in ds.coords.items():
            values = np.asarray(coord.values)
            if values.dtype.kind in "iufb":  # numeric-ish
                data = values.tobytes(order="C")
                nbytes = len(data)
                block = shm_pool.alloc(nbytes)
                block.shm.buf[:nbytes] = data

                buffer_idx = len(buffers)
                buffers.append(block.to_ref())
                total_bytes += nbytes

                g_meta["coords"][cname] = {
                    "kind": "array",
                    "buffer_idx": buffer_idx,
                    "dtype": str(values.dtype),
                    "shape": list(values.shape),
                    "dims": list(coord.dims),
                    "nbytes": nbytes,
                }
            else:
                # Non-numeric / object coords: keep them small & pickle in meta.
                g_meta["coords"][cname] = {
                    "kind": "pickle",
                    "dims": list(coord.dims),
                    "payload": pickle.dumps(
                        coord.values, protocol=pickle.HIGHEST_PROTOCOL
                    ),
                }

        # DATA VARS: main heavy arrays
        for vname, da in ds.data_vars.items():
            arr = np.asarray(da.data)
            _, spec, dtype, shape, order = ShmArray.to_shm(arr, shm_pool)
            meta = {"dtype": dtype, "shape": shape, "order": order}
            nbytes = spec["content_size"]

            buffer_idx = len(buffers)
            buffers.append(spec)
            total_bytes += nbytes

            g_meta["data_vars"][vname] = {
                "buffer_idx": buffer_idx,
                "dtype": str(meta["dtype"]),
                "shape": list(meta["shape"]),
                "dims": list(da.dims),
                "nbytes": nbytes,
            }

        groups_meta[group_name] = g_meta

    meta: dict[str, Any] = {
        "groups": groups_meta,
        "codec_version": 1,
    }

    return EncodeResult(
        codec=type(self).__name__,
        meta=meta,
        buffers=buffers,
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    meta = payload["meta"]
    specs = payload["buffers"]
    groups_meta = meta["groups"]
    groups: dict[str, xr.Dataset] = {}

    for group_name, g_meta in groups_meta.items():
        data_vars = {}
        coords = {}

        # Rebuild coords
        for cname, cmeta in g_meta["coords"].items():
            kind = cmeta["kind"]
            if kind == "array":
                spec = specs[cmeta["buffer_idx"]]
                with get_buf(spec) as (block, _):
                    arr = ShmArray.from_block(
                        block, shape=cmeta["shape"], dtype=np.dtype(cmeta["dtype"])
                    )
                    coords[cname] = (tuple(cmeta["dims"]), arr)
            elif kind == "pickle":
                values = pickle.loads(cmeta["payload"])
                coords[cname] = (tuple(cmeta["dims"]), values)
            else:
                raise ValueError(f"Unknown coord kind: {kind!r}")

        # Rebuild data_vars
        for vname, vmeta in g_meta["data_vars"].items():
            spec = specs[vmeta["buffer_idx"]]
            with get_buf(spec) as (block, _):
                arr = ShmArray.from_block(
                    block, vmeta["shape"], dtype=np.dtype(vmeta["dtype"])
                )
                data_vars[vname] = (tuple(vmeta["dims"]), arr)

        ds = xr.Dataset(
            data_vars=data_vars,
            coords=coords,
        )
        groups[group_name] = ds

    # Construct InferenceData from datasets
    idata = az.InferenceData(**groups, warn_on_custom_groups=False)
    return idata

NumpyArrayCodec

Bases: Encoder

SHM-backed codec for numpy.ndarray.

Source code in brmspy/_session/codec/builtin.py
class NumpyArrayCodec(Encoder):
    """SHM-backed codec for [`numpy.ndarray`](https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html)."""

    def can_encode(self, obj: Any) -> bool:
        return isinstance(obj, np.ndarray)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        _, ref, dtype, shape, order = ShmArray.to_shm(obj, shm_pool)
        return EncodeResult(
            codec=type(self).__name__,
            meta={"dtype": dtype, "shape": shape, "order": order},
            buffers=[ref],
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        with get_buf(payload["buffers"][0]) as (buf, _):
            return ShmArray.from_metadata(payload["meta"], buf)

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    return isinstance(obj, np.ndarray)
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    _, ref, dtype, shape, order = ShmArray.to_shm(obj, shm_pool)
    return EncodeResult(
        codec=type(self).__name__,
        meta={"dtype": dtype, "shape": shape, "order": order},
        buffers=[ref],
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    with get_buf(payload["buffers"][0]) as (buf, _):
        return ShmArray.from_metadata(payload["meta"], buf)

PandasDFCodec

Bases: Encoder

SHM-backed codec for numeric-only pandas.DataFrame.

Object-dtype columns are intentionally rejected to avoid surprising implicit conversions; those cases fall back to pickle.

Source code in brmspy/_session/codec/builtin.py
class PandasDFCodec(Encoder):
    """
    SHM-backed codec for numeric-only [`pandas.DataFrame`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).

    Object-dtype columns are intentionally rejected to avoid surprising implicit
    conversions; those cases fall back to pickle.
    """

    def can_encode(self, obj: Any) -> bool:
        if not isinstance(obj, pd.DataFrame):
            return False

        return True

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        assert isinstance(obj, pd.DataFrame)  # assert type

        meta: dict[str, Any] = {
            "columns": list(obj.columns),
            "index": pickle.dumps(obj.index, protocol=pickle.HIGHEST_PROTOCOL),
            "variant": "single",
        }
        buffers: list[ShmRef] = []

        if obj.empty:
            meta["variant"] = "empty"
        elif isinstance(obj, ShmDataFrameSimple):
            # single dtype matrix
            meta["variant"] = "single"
            meta["dtype"] = str(obj.values.dtype)
            meta["order"] = ShmArray.array_order(obj.values)
            buffers.append(obj._shm_metadata)
        elif isinstance(obj, ShmDataFrameColumns):
            # per column buffers
            meta["variant"] = "columnar"
            meta["order"] = "F"
            meta["columns"] = obj._shm_metadata
        else:
            # Fallback: put each column in its own SHM block
            meta["variant"] = "columnar"
            meta["order"] = "C"
            columns: dict[str, ShmSeriesMetadata] = {}

            for col_name in obj.columns:
                col = obj[col_name]

                arr_modified, spec, dtype, shape, order = ShmArray.to_shm(col, shm_pool)

                columns[col_name] = ShmDataFrameColumns._create_col_metadata(
                    obj[col_name], spec, arr_modified
                )
            meta["columns"] = columns

        return EncodeResult(codec=type(self).__name__, meta=meta, buffers=buffers)

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        meta = payload["meta"]
        if meta.get("variant") == "empty":
            return pd.DataFrame({})

        buffer_specs = payload["buffers"]

        index = pickle.loads(meta["index"])

        if meta.get("variant") == "single":
            spec = buffer_specs[0]
            with get_buf(buffer_specs[0]) as (buf, memview):
                dtype = np.dtype(meta["dtype"])
                nbytes = spec["size"]
                order = meta["order"]

                columns = meta["columns"]
                shape = (len(index), len(columns))

                # Only use the slice that actually holds array data
                view = memview[:nbytes]
                arr = np.ndarray(shape=shape, dtype=dtype, buffer=view, order=order)

                df = ShmDataFrameSimple(data=arr, index=index, columns=columns)
                df._set_shm_metadata(spec)

                return df
        elif meta.get("variant") == "columnar":
            columns_metadata: dict[str, ShmSeriesMetadata] = meta["columns"]
            nrows = len(index)

            columns = list(columns_metadata.keys())

            data: dict[str, pd.Series] = {}

            for i, col_name in enumerate(columns):
                metadata = columns_metadata[col_name]
                spec = metadata["block"]
                dtype = metadata["np_dtype"]
                with get_buf(spec) as (buf, view):
                    data[col_name] = ShmDataFrameColumns._reconstruct_series(
                        metadata, buf, nrows, index
                    )

            df = ShmDataFrameColumns(data=data)
            df._set_shm_metadata(columns_metadata)

            return df
        else:
            raise Exception(f"Unknown DataFrame variant {meta.get('variant')}")

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    if not isinstance(obj, pd.DataFrame):
        return False

    return True
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    assert isinstance(obj, pd.DataFrame)  # assert type

    meta: dict[str, Any] = {
        "columns": list(obj.columns),
        "index": pickle.dumps(obj.index, protocol=pickle.HIGHEST_PROTOCOL),
        "variant": "single",
    }
    buffers: list[ShmRef] = []

    if obj.empty:
        meta["variant"] = "empty"
    elif isinstance(obj, ShmDataFrameSimple):
        # single dtype matrix
        meta["variant"] = "single"
        meta["dtype"] = str(obj.values.dtype)
        meta["order"] = ShmArray.array_order(obj.values)
        buffers.append(obj._shm_metadata)
    elif isinstance(obj, ShmDataFrameColumns):
        # per column buffers
        meta["variant"] = "columnar"
        meta["order"] = "F"
        meta["columns"] = obj._shm_metadata
    else:
        # Fallback: put each column in its own SHM block
        meta["variant"] = "columnar"
        meta["order"] = "C"
        columns: dict[str, ShmSeriesMetadata] = {}

        for col_name in obj.columns:
            col = obj[col_name]

            arr_modified, spec, dtype, shape, order = ShmArray.to_shm(col, shm_pool)

            columns[col_name] = ShmDataFrameColumns._create_col_metadata(
                obj[col_name], spec, arr_modified
            )
        meta["columns"] = columns

    return EncodeResult(codec=type(self).__name__, meta=meta, buffers=buffers)
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    meta = payload["meta"]
    if meta.get("variant") == "empty":
        return pd.DataFrame({})

    buffer_specs = payload["buffers"]

    index = pickle.loads(meta["index"])

    if meta.get("variant") == "single":
        spec = buffer_specs[0]
        with get_buf(buffer_specs[0]) as (buf, memview):
            dtype = np.dtype(meta["dtype"])
            nbytes = spec["size"]
            order = meta["order"]

            columns = meta["columns"]
            shape = (len(index), len(columns))

            # Only use the slice that actually holds array data
            view = memview[:nbytes]
            arr = np.ndarray(shape=shape, dtype=dtype, buffer=view, order=order)

            df = ShmDataFrameSimple(data=arr, index=index, columns=columns)
            df._set_shm_metadata(spec)

            return df
    elif meta.get("variant") == "columnar":
        columns_metadata: dict[str, ShmSeriesMetadata] = meta["columns"]
        nrows = len(index)

        columns = list(columns_metadata.keys())

        data: dict[str, pd.Series] = {}

        for i, col_name in enumerate(columns):
            metadata = columns_metadata[col_name]
            spec = metadata["block"]
            dtype = metadata["np_dtype"]
            with get_buf(spec) as (buf, view):
                data[col_name] = ShmDataFrameColumns._reconstruct_series(
                    metadata, buf, nrows, index
                )

        df = ShmDataFrameColumns(data=data)
        df._set_shm_metadata(columns_metadata)

        return df
    else:
        raise Exception(f"Unknown DataFrame variant {meta.get('variant')}")

PickleCodec

Bases: Encoder

Pickle fallback codec (internal).

Always encodes successfully, so it must be registered last. The pickled bytes are still stored in SHM to keep pipe traffic small and bounded.

Source code in brmspy/_session/codec/builtin.py
class PickleCodec(Encoder):
    """
    Pickle fallback codec (internal).

    Always encodes successfully, so it must be registered last. The pickled bytes
    are still stored in SHM to keep pipe traffic small and bounded.
    """

    def can_encode(self, obj: Any) -> bool:
        # Fallback – always True
        return True

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        if obj is None:
            return EncodeResult(
                codec=type(self).__name__,
                meta={},
                buffers=[],
            )

        data = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
        block = shm_pool.alloc(len(data), temporary=True)
        block.shm.buf[: len(data)] = data

        # dont waste SHM, use meta instead of buffer
        meta: dict[str, Any] = {"length": len(data)}

        size_bytes = len(data)
        if size_bytes > ONE_MB * 10:
            size_mb = size_bytes / ONE_MB
            log_warning(
                f"PickleCodec encoding large object: type={type(obj)}, size={size_mb:,.2f} MB"
            )

        return EncodeResult(
            codec=type(self).__name__,
            meta=meta,
            buffers=[block.to_ref()],
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        specs = payload["buffers"]
        if len(specs) == 0:
            return None

        with get_buf(specs[0]) as (block, buf):
            length = block.content_size
            b = bytes(buf[:length])
            return pickle.loads(b)

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    # Fallback – always True
    return True
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    if obj is None:
        return EncodeResult(
            codec=type(self).__name__,
            meta={},
            buffers=[],
        )

    data = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
    block = shm_pool.alloc(len(data), temporary=True)
    block.shm.buf[: len(data)] = data

    # dont waste SHM, use meta instead of buffer
    meta: dict[str, Any] = {"length": len(data)}

    size_bytes = len(data)
    if size_bytes > ONE_MB * 10:
        size_mb = size_bytes / ONE_MB
        log_warning(
            f"PickleCodec encoding large object: type={type(obj)}, size={size_mb:,.2f} MB"
        )

    return EncodeResult(
        codec=type(self).__name__,
        meta=meta,
        buffers=[block.to_ref()],
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    specs = payload["buffers"]
    if len(specs) == 0:
        return None

    with get_buf(specs[0]) as (block, buf):
        length = block.content_size
        b = bytes(buf[:length])
        return pickle.loads(b)

Functions

register_dataclasses(registry)

Register codecs for known dataclass types.

Parameters:

Name Type Description Default
registry CodecRegistry

Registry to populate.

required
Source code in brmspy/_session/codec/dataclass.py
def register_dataclasses(registry: CodecRegistry) -> None:
    """
    Register codecs for known dataclass types.

    Parameters
    ----------
    registry : brmspy._session.codec.base.CodecRegistry
        Registry to populate.
    """
    for _cls in _classes:
        codec = GenericDataClassCodec(cls=_cls, registry=registry)
        registry.register(codec)

get_default_registry()

Return the process-global default codec registry.

Returns:

Type Description
CodecRegistry

Registry with SHM-first codecs registered, plus a pickle fallback.

Source code in brmspy/_session/codec/registry.py
def get_default_registry() -> CodecRegistry:
    """
    Return the process-global default codec registry.

    Returns
    -------
    brmspy._session.codec.base.CodecRegistry
        Registry with SHM-first codecs registered, plus a pickle fallback.
    """
    global _default_registry
    if _default_registry is None:
        reg = CodecRegistry()
        reg.register(NumpyArrayCodec())
        reg.register(InferenceDataCodec())
        reg.register(PandasDFCodec())

        register_dataclasses(reg)

        # MUST BE LAST
        reg.register(PickleCodec())

        _default_registry = reg
    return _default_registry