Skip to content

builtin

Attributes

GetBufContext = _GeneratorContextManager[tuple[ShmBlock, memoryview], None, None] module-attribute

ONE_MB = 1024 * 1024 module-attribute

Classes

CodecRegistry

Ordered registry of encoders used for IPC serialization.

Source code in brmspy/_session/codec/base.py
class CodecRegistry:
    """Ordered registry of encoders used for IPC serialization."""

    def __init__(self) -> None:
        self._by_codec: dict[str, Encoder] = {}
        self._encoders: list[Encoder] = []

    def register(self, encoder: Encoder) -> None:
        """
        Register an encoder instance.

        Parameters
        ----------
        encoder : brmspy.types.session.Encoder
            Encoder to register. Its `codec` attribute is used as the key when present,
            otherwise the class name is used.
        """
        if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
            codec_name = encoder.codec  # type: ignore
        else:
            codec_name = type(encoder).__name__
        self._by_codec[codec_name] = encoder
        encoder.codec = codec_name  # type: ignore
        self._encoders.append(encoder)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        """
        Encode an object by selecting the first encoder that accepts it.

        Parameters
        ----------
        obj : Any
            Value to encode.
        shm_pool : Any
            SHM pool used by codecs for allocating buffers.

        Returns
        -------
        brmspy.types.session.EncodeResult
        """
        for enc in self._encoders:
            if enc.can_encode(obj):
                res = enc.encode(obj, shm_pool)
                if not res.codec:
                    res.codec = type(enc).__name__
                return res

        # fallback to pickle
        if "PickleCodec" not in self._by_codec:
            raise RuntimeError("No pickle codec registered")
        return self._by_codec["PickleCodec"].encode(obj, shm_pool)

    def decode(
        self,
        payload: PayloadRef,
        shm_pool: Any,
    ) -> Any:
        """
        Decode a payload using a named codec.

        Parameters
        ----------
        codec : str
            Codec identifier previously returned by `encode()`.
        meta : dict[str, Any]
            Codec metadata.
        buffers : list[memoryview]
            Memoryviews for attached SHM buffers.
        buffer_specs : list[dict]
            Original buffer specs (name/size) corresponding to `buffers`.
        shm_pool : Any
            SHM pool (some codecs may attach additional buffers).

        Returns
        -------
        Any
        """
        codec = payload["codec"]
        if codec not in self._by_codec:
            raise ValueError(
                f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
            )

        buffers = []

        @contextmanager
        def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
            buf = shm_pool.attach(ref)
            offset = ref.get("offset", 0)
            memview = memoryview(buf.shm.buf)
            view = memview[offset : offset + ref["content_size"]].cast("B")

            try:
                if not ref["temporary"]:
                    # non-temporary buffers are associated with columns / objects
                    buffers.append(buf)

                yield buf, view

            finally:
                # deterministic cleanup for temporary buffers
                if ref["temporary"]:
                    # IMPORTANT: release view before closing shm
                    try:
                        view.release()
                        memview.release()
                        shm_pool.gc(ref["name"])
                        buf.shm.close()
                    except:
                        pass

        value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
        self._attach_shm_lifetime(value, buffers)

        return value

    @classmethod
    def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
        """Keep SHM blocks alive as long as `obj` is alive."""
        if not shms:
            return
        if obj is None or isinstance(obj, (bool, str, int, float)):
            return

        try:
            weakref.finalize(obj, _noop, tuple(shms))
        except TypeError:
            # Object doesn't support weak references (e.g. list, tuple, dict).
            # SHM blocks will be cleaned up by ShmPool.gc()/close_all() instead.
            return
        except Exception as exc:
            import logging
            logging.getLogger(__name__).warning(
                "Failed to attach SHM lifetime to %s: %s", type(obj).__name__, exc
            )
            return

Attributes

_by_codec = {} instance-attribute
_encoders = [] instance-attribute

Functions

__init__()
Source code in brmspy/_session/codec/base.py
def __init__(self) -> None:
    self._by_codec: dict[str, Encoder] = {}
    self._encoders: list[Encoder] = []
register(encoder)

Register an encoder instance.

Parameters:

Name Type Description Default
encoder Encoder

Encoder to register. Its codec attribute is used as the key when present, otherwise the class name is used.

required
Source code in brmspy/_session/codec/base.py
def register(self, encoder: Encoder) -> None:
    """
    Register an encoder instance.

    Parameters
    ----------
    encoder : brmspy.types.session.Encoder
        Encoder to register. Its `codec` attribute is used as the key when present,
        otherwise the class name is used.
    """
    if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
        codec_name = encoder.codec  # type: ignore
    else:
        codec_name = type(encoder).__name__
    self._by_codec[codec_name] = encoder
    encoder.codec = codec_name  # type: ignore
    self._encoders.append(encoder)
encode(obj, shm_pool)

Encode an object by selecting the first encoder that accepts it.

Parameters:

Name Type Description Default
obj Any

Value to encode.

required
shm_pool Any

SHM pool used by codecs for allocating buffers.

required

Returns:

Type Description
EncodeResult
Source code in brmspy/_session/codec/base.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    """
    Encode an object by selecting the first encoder that accepts it.

    Parameters
    ----------
    obj : Any
        Value to encode.
    shm_pool : Any
        SHM pool used by codecs for allocating buffers.

    Returns
    -------
    brmspy.types.session.EncodeResult
    """
    for enc in self._encoders:
        if enc.can_encode(obj):
            res = enc.encode(obj, shm_pool)
            if not res.codec:
                res.codec = type(enc).__name__
            return res

    # fallback to pickle
    if "PickleCodec" not in self._by_codec:
        raise RuntimeError("No pickle codec registered")
    return self._by_codec["PickleCodec"].encode(obj, shm_pool)
decode(payload, shm_pool)

Decode a payload using a named codec.

Parameters:

Name Type Description Default
codec str

Codec identifier previously returned by encode().

required
meta dict[str, Any]

Codec metadata.

required
buffers list[memoryview]

Memoryviews for attached SHM buffers.

required
buffer_specs list[dict]

Original buffer specs (name/size) corresponding to buffers.

required
shm_pool Any

SHM pool (some codecs may attach additional buffers).

required

Returns:

Type Description
Any
Source code in brmspy/_session/codec/base.py
def decode(
    self,
    payload: PayloadRef,
    shm_pool: Any,
) -> Any:
    """
    Decode a payload using a named codec.

    Parameters
    ----------
    codec : str
        Codec identifier previously returned by `encode()`.
    meta : dict[str, Any]
        Codec metadata.
    buffers : list[memoryview]
        Memoryviews for attached SHM buffers.
    buffer_specs : list[dict]
        Original buffer specs (name/size) corresponding to `buffers`.
    shm_pool : Any
        SHM pool (some codecs may attach additional buffers).

    Returns
    -------
    Any
    """
    codec = payload["codec"]
    if codec not in self._by_codec:
        raise ValueError(
            f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
        )

    buffers = []

    @contextmanager
    def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
        buf = shm_pool.attach(ref)
        offset = ref.get("offset", 0)
        memview = memoryview(buf.shm.buf)
        view = memview[offset : offset + ref["content_size"]].cast("B")

        try:
            if not ref["temporary"]:
                # non-temporary buffers are associated with columns / objects
                buffers.append(buf)

            yield buf, view

        finally:
            # deterministic cleanup for temporary buffers
            if ref["temporary"]:
                # IMPORTANT: release view before closing shm
                try:
                    view.release()
                    memview.release()
                    shm_pool.gc(ref["name"])
                    buf.shm.close()
                except:
                    pass

    value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
    self._attach_shm_lifetime(value, buffers)

    return value
_attach_shm_lifetime(obj, shms) classmethod

Keep SHM blocks alive as long as obj is alive.

Source code in brmspy/_session/codec/base.py
@classmethod
def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
    """Keep SHM blocks alive as long as `obj` is alive."""
    if not shms:
        return
    if obj is None or isinstance(obj, (bool, str, int, float)):
        return

    try:
        weakref.finalize(obj, _noop, tuple(shms))
    except TypeError:
        # Object doesn't support weak references (e.g. list, tuple, dict).
        # SHM blocks will be cleaned up by ShmPool.gc()/close_all() instead.
        return
    except Exception as exc:
        import logging
        logging.getLogger(__name__).warning(
            "Failed to attach SHM lifetime to %s: %s", type(obj).__name__, exc
        )
        return

EncodeResult dataclass

Result of encoding a Python value for IPC transfer.

Attributes:

Name Type Description
codec str

Codec identifier.

meta dict[str, Any]

JSON-serializable metadata required for decoding.

buffers list[ShmRef]

Shared-memory blocks backing the encoded payload.

Source code in brmspy/types/session.py
@dataclass
class EncodeResult:
    """
    Result of encoding a Python value for IPC transfer.

    Attributes
    ----------
    codec : str
        Codec identifier.
    meta : dict[str, Any]
        JSON-serializable metadata required for decoding.
    buffers : list[ShmRef]
        Shared-memory blocks backing the encoded payload.
    """

    codec: str
    meta: dict[str, Any]
    buffers: list[ShmRef]

Attributes

codec instance-attribute
meta instance-attribute
buffers instance-attribute

Functions

__init__(codec, meta, buffers)

Encoder

Bases: Protocol

Protocol implemented by codecs in the session codec registry.

Source code in brmspy/types/session.py
@runtime_checkable
class Encoder(Protocol):
    """
    Protocol implemented by codecs in the session codec registry.
    """

    def can_encode(self, obj: Any) -> bool: ...

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult: ...

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args: Any,
    ) -> Any: ...

Functions

can_encode(obj)
Source code in brmspy/types/session.py
def can_encode(self, obj: Any) -> bool: ...
encode(obj, shm_pool)
Source code in brmspy/types/session.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult: ...
decode(payload, get_buf, *args)
Source code in brmspy/types/session.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args: Any,
) -> Any: ...

PayloadRef

Bases: TypedDict

Encoded argument/result payload sent over the control pipe.

A payload is:

  • codec: the codec identifier used by the registry
  • meta: JSON-serializable metadata needed to reconstruct the value
  • buffers: shared-memory buffer references backing the payload
Source code in brmspy/types/session.py
class PayloadRef(TypedDict):
    """
    Encoded argument/result payload sent over the control pipe.

    A payload is:

    - `codec`: the codec identifier used by the registry
    - `meta`: JSON-serializable metadata needed to reconstruct the value
    - `buffers`: shared-memory buffer references backing the payload
    """

    codec: str
    meta: dict[str, Any]
    buffers: list[ShmRef]

Attributes

codec instance-attribute
meta instance-attribute
buffers instance-attribute

ShmSeriesMetadata

Bases: TypedDict

Source code in brmspy/types/shm_extensions.py
class ShmSeriesMetadata(TypedDict):
    name: Hashable | None
    np_dtype: str
    pd_dtype: str
    params: dict[str, Any]  # extra info per logical type
    block: ShmRef

Attributes

name instance-attribute
np_dtype instance-attribute
pd_dtype instance-attribute
params instance-attribute
block instance-attribute

ShmArray

Bases: ndarray

NumPy array view backed by a shared-memory block.

Attributes:

Name Type Description
block ShmRef

Reference to the shared-memory block backing the array data.

Notes

This is a view over SharedMemory.buf. Closing/unlinking the underlying shared memory while the array is still in use will lead to undefined behavior.

Source code in brmspy/types/shm_extensions.py
class ShmArray(np.ndarray):
    """
    NumPy array view backed by a shared-memory block.

    Attributes
    ----------
    block : ShmRef
        Reference to the shared-memory block backing the array data.

    Notes
    -----
    This is a *view* over `SharedMemory.buf`. Closing/unlinking the underlying
    shared memory while the array is still in use will lead to undefined
    behavior.
    """

    _shm_metadata: ShmRef  # for type checkers

    @classmethod
    def from_metadata(
        cls, meta: ShmArrayMetadata | dict[str, Any], block: ShmBlock
    ) -> np.ndarray:
        dtype = np.dtype(meta["dtype"])
        shape = tuple(meta["shape"])
        order = meta["order"]

        return ShmArray.from_block(block=block, shape=shape, dtype=dtype, order=order)

    @classmethod
    def from_block(
        cls, block: ShmBlock, shape: tuple[int, ...], dtype: np.dtype, **kwargs
    ) -> Union["ShmArray", np.ndarray]:
        """
        Create an array view backed by an existing shared-memory block.

        Parameters
        ----------
        block : ShmBlock
            Attached shared-memory block.
        shape : tuple[int, ...]
            Desired array shape.
        dtype : numpy.dtype
            NumPy dtype of the array.
        **kwargs
            Reserved for future compatibility. Currently unused.

        Returns
        -------
        ShmArray
            Array view into the shared-memory buffer.
        """
        is_object = np.dtype(dtype) == np.dtype("O")

        if not is_object:
            if block.shm.buf:
                view = memoryview(block.shm.buf)
                view = view[block.offset : block.offset + block.content_size]
            else:
                view = None
            base = np.ndarray(
                shape=shape,
                dtype=dtype,
                buffer=view,
                order=kwargs.get("order", "F"),
            )
            obj = base.view(ShmArray)
            obj._shm_metadata = block.to_ref()
        else:
            assert block.shm.buf
            view = memoryview(block.shm.buf)
            view = view[block.offset : block.offset + block.content_size]
            payload = bytes(view)
            obj = pickle.loads(payload)
            assert isinstance(obj, np.ndarray)

        return obj

    @classmethod
    def array_order(cls, a: np.ndarray) -> Literal["C", "F", "non-contiguous"]:
        """
        Determine how an array can be reconstructed from a raw buffer.

        Returns `"C"` for C-contiguous arrays, `"F"` for Fortran-contiguous arrays,
        otherwise `"non-contiguous"` (meaning: bytes were obtained by forcing
        a contiguous copy during encoding).
        """
        if a.flags["C_CONTIGUOUS"]:
            return "C"
        if a.flags["F_CONTIGUOUS"]:
            return "F"
        return "non-contiguous"

    @classmethod
    def is_string_object(cls, a: np.ndarray, sample: int = 1000):
        if np.dtype(a.dtype) != np.dtype("O"):
            return False
        it = a.flat
        for _ in range(min(sample, a.size)):
            v = next(it, None)
            if v is not None and not isinstance(v, str):
                return False
        return True

    @classmethod
    def to_shm(
        cls, obj: np.ndarray | pd.Series | list, shm_pool: Any
    ) -> tuple[np.ndarray | None, ShmRef, str, list[int], str]:
        if isinstance(obj, pd.Series):
            if isinstance(obj.dtype, pd.CategoricalDtype):
                arr = obj.cat.codes.to_numpy(copy=False)
            else:
                arr = obj.to_numpy(copy=False)
        elif not isinstance(obj, np.ndarray):
            arr = np.asarray(obj)
        else:
            arr = obj

        is_object = np.dtype(arr.dtype) == np.dtype("O")
        is_string = cls.is_string_object(arr)

        arr_modified = None
        if isinstance(arr, ShmArray):
            ref = arr._shm_metadata

        else:
            temporary = False
            if not is_object:
                data = arr.tobytes(order="C")
            elif is_string:
                arr = arr.astype("U")
                arr_modified = arr
                data = arr.tobytes(order="C")
            else:
                data = pickle.dumps(arr, protocol=pickle.HIGHEST_PROTOCOL)
                temporary = True

            nbytes = len(data)

            # Ask for exactly nbytes; OS may round up internally, that's fine.
            block = shm_pool.alloc(nbytes, temporary=temporary)
            block.shm.buf[block.offset : block.offset + nbytes] = data
            ref = block.to_ref()

        ref, dtype, shape, order = (
            ref,
            str(arr.dtype),
            list(arr.shape),
            cls.array_order(arr),
        )

        return arr_modified, ref, dtype, shape, order

Attributes

_shm_metadata instance-attribute

Functions

from_metadata(meta, block) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def from_metadata(
    cls, meta: ShmArrayMetadata | dict[str, Any], block: ShmBlock
) -> np.ndarray:
    dtype = np.dtype(meta["dtype"])
    shape = tuple(meta["shape"])
    order = meta["order"]

    return ShmArray.from_block(block=block, shape=shape, dtype=dtype, order=order)
from_block(block, shape, dtype, **kwargs) classmethod

Create an array view backed by an existing shared-memory block.

Parameters:

Name Type Description Default
block ShmBlock

Attached shared-memory block.

required
shape tuple[int, ...]

Desired array shape.

required
dtype dtype

NumPy dtype of the array.

required
**kwargs

Reserved for future compatibility. Currently unused.

{}

Returns:

Type Description
ShmArray

Array view into the shared-memory buffer.

Source code in brmspy/types/shm_extensions.py
@classmethod
def from_block(
    cls, block: ShmBlock, shape: tuple[int, ...], dtype: np.dtype, **kwargs
) -> Union["ShmArray", np.ndarray]:
    """
    Create an array view backed by an existing shared-memory block.

    Parameters
    ----------
    block : ShmBlock
        Attached shared-memory block.
    shape : tuple[int, ...]
        Desired array shape.
    dtype : numpy.dtype
        NumPy dtype of the array.
    **kwargs
        Reserved for future compatibility. Currently unused.

    Returns
    -------
    ShmArray
        Array view into the shared-memory buffer.
    """
    is_object = np.dtype(dtype) == np.dtype("O")

    if not is_object:
        if block.shm.buf:
            view = memoryview(block.shm.buf)
            view = view[block.offset : block.offset + block.content_size]
        else:
            view = None
        base = np.ndarray(
            shape=shape,
            dtype=dtype,
            buffer=view,
            order=kwargs.get("order", "F"),
        )
        obj = base.view(ShmArray)
        obj._shm_metadata = block.to_ref()
    else:
        assert block.shm.buf
        view = memoryview(block.shm.buf)
        view = view[block.offset : block.offset + block.content_size]
        payload = bytes(view)
        obj = pickle.loads(payload)
        assert isinstance(obj, np.ndarray)

    return obj
array_order(a) classmethod

Determine how an array can be reconstructed from a raw buffer.

Returns "C" for C-contiguous arrays, "F" for Fortran-contiguous arrays, otherwise "non-contiguous" (meaning: bytes were obtained by forcing a contiguous copy during encoding).

Source code in brmspy/types/shm_extensions.py
@classmethod
def array_order(cls, a: np.ndarray) -> Literal["C", "F", "non-contiguous"]:
    """
    Determine how an array can be reconstructed from a raw buffer.

    Returns `"C"` for C-contiguous arrays, `"F"` for Fortran-contiguous arrays,
    otherwise `"non-contiguous"` (meaning: bytes were obtained by forcing
    a contiguous copy during encoding).
    """
    if a.flags["C_CONTIGUOUS"]:
        return "C"
    if a.flags["F_CONTIGUOUS"]:
        return "F"
    return "non-contiguous"
is_string_object(a, sample=1000) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def is_string_object(cls, a: np.ndarray, sample: int = 1000):
    if np.dtype(a.dtype) != np.dtype("O"):
        return False
    it = a.flat
    for _ in range(min(sample, a.size)):
        v = next(it, None)
        if v is not None and not isinstance(v, str):
            return False
    return True
to_shm(obj, shm_pool) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def to_shm(
    cls, obj: np.ndarray | pd.Series | list, shm_pool: Any
) -> tuple[np.ndarray | None, ShmRef, str, list[int], str]:
    if isinstance(obj, pd.Series):
        if isinstance(obj.dtype, pd.CategoricalDtype):
            arr = obj.cat.codes.to_numpy(copy=False)
        else:
            arr = obj.to_numpy(copy=False)
    elif not isinstance(obj, np.ndarray):
        arr = np.asarray(obj)
    else:
        arr = obj

    is_object = np.dtype(arr.dtype) == np.dtype("O")
    is_string = cls.is_string_object(arr)

    arr_modified = None
    if isinstance(arr, ShmArray):
        ref = arr._shm_metadata

    else:
        temporary = False
        if not is_object:
            data = arr.tobytes(order="C")
        elif is_string:
            arr = arr.astype("U")
            arr_modified = arr
            data = arr.tobytes(order="C")
        else:
            data = pickle.dumps(arr, protocol=pickle.HIGHEST_PROTOCOL)
            temporary = True

        nbytes = len(data)

        # Ask for exactly nbytes; OS may round up internally, that's fine.
        block = shm_pool.alloc(nbytes, temporary=temporary)
        block.shm.buf[block.offset : block.offset + nbytes] = data
        ref = block.to_ref()

    ref, dtype, shape, order = (
        ref,
        str(arr.dtype),
        list(arr.shape),
        cls.array_order(arr),
    )

    return arr_modified, ref, dtype, shape, order

ShmDataFrameColumns

Bases: DataFrame

pandas DataFrame backed by per-column shared-memory blocks (numeric only).

Attributes:

Name Type Description
_blocks_columns dict[str, PandasColumnMetadata]

Mapping from column name to data required for its reconstruction

Source code in brmspy/types/shm_extensions.py
class ShmDataFrameColumns(pd.DataFrame):
    """
    pandas DataFrame backed by per-column shared-memory blocks (numeric only).

    Attributes
    ----------
    _blocks_columns : dict[str, PandasColumnMetadata]
        Mapping from column name to data required for its reconstruction
    """

    _metadata = ["_shm_metadata"]
    _shm_metadata: dict[str, ShmSeriesMetadata]

    @property
    def _constructor(self):
        # We INTENTIONALLY do not return ShmSeriesMetadata
        # whenever the dataframe is reindexed, slices, we want to get rid of all _shm_metadata,
        # as otherwise we will have immediate problems with buffer alignment
        return pd.DataFrame

    @classmethod
    def _create_col_metadata(
        cls, series: pd.Series, block: ShmRef, arr: np.ndarray | None = None, **params
    ) -> ShmSeriesMetadata:
        pd_dtype = series.dtype

        # Decide what ndarray is actually stored in SHM
        if isinstance(pd_dtype, pd.CategoricalDtype):
            # store dtype params
            params["categories"] = pd_dtype.categories.to_numpy(dtype=object).tolist()
            params["ordered"] = bool(pd_dtype.ordered)
            params["categories_dtype"] = pd_dtype.categories.dtype.name

            # IMPORTANT: store integer codes, not values
            # -1 means missing
            array = series.cat.codes.to_numpy(copy=False)
        elif arr is not None:
            assert isinstance(arr, np.ndarray)
            array = arr
        else:
            # for numeric-only SHM: require a real numpy array output here
            array = series.to_numpy(copy=False)

        # Optional param dtypes you mentioned (only if you support them)
        if isinstance(pd_dtype, pd.PeriodDtype):
            params["freq"] = str(pd_dtype.freq)

        if isinstance(pd_dtype, pd.IntervalDtype):
            params["subtype"] = np.dtype(pd_dtype.subtype).str
            params["closed"] = str(pd_dtype.closed)  # type: ignore[attr-defined]

        meta: ShmSeriesMetadata = {
            "name": series.name,
            "np_dtype": str(array.dtype),
            "pd_dtype": str(pd_dtype.name),
            "block": block,
            "params": params,
        }

        if np.dtype(meta["np_dtype"]) == np.dtype("O"):
            # Sanity check. If this goes wrong, it will be frustrating to debug
            if ShmArray.is_string_object(array, sample=25):
                raise Exception(
                    f"{series.name} column is string, but stored as object!"
                )

        return meta

    def _set_col_raw(self, col: str, value) -> None:
        # bypass our __setitem__
        pd.DataFrame.__setitem__(self, col, value)

    def _set_shm_metadata(self, meta: dict[str, ShmSeriesMetadata]):
        self._shm_metadata = meta

    @classmethod
    def _put_col_in_shm(
        cls, df: "ShmDataFrameColumns", col: str, shm_pool: Any, replace=False
    ):
        vals = df[col].to_numpy(copy=False)
        if (
            isinstance(vals, ShmArray)
            and hasattr(vals, "_shm_metadata")
            and not replace
        ):
            pass
        elif isinstance(vals, np.ndarray):
            if col in df._shm_metadata:
                del df._shm_metadata[col]
            arr_modified, ref, dtype, shape, order = ShmArray.to_shm(df[col], shm_pool)

            if arr_modified is not None:
                # Only needed for string-object normalization; for numeric/codes it's None
                df._set_col_raw(
                    col, pd.Series(arr_modified, index=df.index, name=col, copy=False)
                )
            df._shm_metadata[col] = cls._create_col_metadata(df[col], ref, arr_modified)
            return
        else:
            print(
                f"Failed to update shm metadata for column '{col}' dtype {vals.dtype}"
            )
            return

    @classmethod
    def _reconstruct_series(
        cls,
        meta: ShmSeriesMetadata,
        block: ShmBlock,
        nrows: int,
        index: list | None,
    ) -> pd.Series:
        col_name = meta["name"]
        col_name = str(col_name)
        dtype = np.dtype(meta["np_dtype"])
        pd_dtype = meta["pd_dtype"]
        params = meta["params"]

        arr = ShmArray.from_block(block=block, shape=(nrows,), dtype=dtype, order="C")

        if pd_dtype == "category":
            cats = params.get("categories", None)
            ordered = bool(params.get("ordered", False))

            if cats is None:
                raise ValueError("category dtype requires params['categories']")

            cats_pd_dtype = params.get("categories_dtype")
            if cats_pd_dtype is not None:
                cats_index = pd.Index(cats, dtype=str(cats_pd_dtype))
            else:
                cats_index = pd.Index(cats)

            cat_dtype = pd.CategoricalDtype(categories=cats_index, ordered=ordered)

            # arr should hold integer codes
            # If arr holds codes: build categorical from codes without copying codes.
            # Pandas uses -1 for missing.
            cat = pd.Categorical.from_codes(cast(Sequence[int], arr), dtype=cat_dtype)
            return pd.Series(cat, name=col_name, index=index)

        # 2) tz-aware datetimes
        # Expect arr to be int64 ns timestamps
        if pd_dtype.startswith("datetime64[ns,") or pd_dtype == "datetime64[ns, tz]":
            tz = params.get("tz")
            if not tz:
                # if stored as a plain datetime64[ns] string, just fall through
                pass
            else:
                dt = pd.to_datetime(arr, unit="ns", utc=True).tz_convert(tz)
                return pd.Series(dt, name=col_name, index=index)

        return pd.Series(arr, name=col_name, index=index)

    def __setitem__(self, key, value):
        is_existing = key in self.columns

        super().__setitem__(key, value)

        if is_existing:
            self._on_column_replaced(key)
        else:
            self._on_column_added(key)

    def _on_column_added(self, col: str):
        from brmspy._singleton._shm_singleton import _get_shm

        shm = _get_shm()
        if not shm:
            return
        self._put_col_in_shm(self, col, shm)

    def _on_column_replaced(self, col: str):
        from brmspy._singleton._shm_singleton import _get_shm

        shm = _get_shm()
        if not shm:
            return
        self._put_col_in_shm(self, col, shm, replace=True)

Attributes

_metadata = ['_shm_metadata'] class-attribute instance-attribute
_shm_metadata instance-attribute
_constructor property

Functions

_create_col_metadata(series, block, arr=None, **params) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def _create_col_metadata(
    cls, series: pd.Series, block: ShmRef, arr: np.ndarray | None = None, **params
) -> ShmSeriesMetadata:
    pd_dtype = series.dtype

    # Decide what ndarray is actually stored in SHM
    if isinstance(pd_dtype, pd.CategoricalDtype):
        # store dtype params
        params["categories"] = pd_dtype.categories.to_numpy(dtype=object).tolist()
        params["ordered"] = bool(pd_dtype.ordered)
        params["categories_dtype"] = pd_dtype.categories.dtype.name

        # IMPORTANT: store integer codes, not values
        # -1 means missing
        array = series.cat.codes.to_numpy(copy=False)
    elif arr is not None:
        assert isinstance(arr, np.ndarray)
        array = arr
    else:
        # for numeric-only SHM: require a real numpy array output here
        array = series.to_numpy(copy=False)

    # Optional param dtypes you mentioned (only if you support them)
    if isinstance(pd_dtype, pd.PeriodDtype):
        params["freq"] = str(pd_dtype.freq)

    if isinstance(pd_dtype, pd.IntervalDtype):
        params["subtype"] = np.dtype(pd_dtype.subtype).str
        params["closed"] = str(pd_dtype.closed)  # type: ignore[attr-defined]

    meta: ShmSeriesMetadata = {
        "name": series.name,
        "np_dtype": str(array.dtype),
        "pd_dtype": str(pd_dtype.name),
        "block": block,
        "params": params,
    }

    if np.dtype(meta["np_dtype"]) == np.dtype("O"):
        # Sanity check. If this goes wrong, it will be frustrating to debug
        if ShmArray.is_string_object(array, sample=25):
            raise Exception(
                f"{series.name} column is string, but stored as object!"
            )

    return meta
_set_col_raw(col, value)
Source code in brmspy/types/shm_extensions.py
def _set_col_raw(self, col: str, value) -> None:
    # bypass our __setitem__
    pd.DataFrame.__setitem__(self, col, value)
_set_shm_metadata(meta)
Source code in brmspy/types/shm_extensions.py
def _set_shm_metadata(self, meta: dict[str, ShmSeriesMetadata]):
    self._shm_metadata = meta
_put_col_in_shm(df, col, shm_pool, replace=False) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def _put_col_in_shm(
    cls, df: "ShmDataFrameColumns", col: str, shm_pool: Any, replace=False
):
    vals = df[col].to_numpy(copy=False)
    if (
        isinstance(vals, ShmArray)
        and hasattr(vals, "_shm_metadata")
        and not replace
    ):
        pass
    elif isinstance(vals, np.ndarray):
        if col in df._shm_metadata:
            del df._shm_metadata[col]
        arr_modified, ref, dtype, shape, order = ShmArray.to_shm(df[col], shm_pool)

        if arr_modified is not None:
            # Only needed for string-object normalization; for numeric/codes it's None
            df._set_col_raw(
                col, pd.Series(arr_modified, index=df.index, name=col, copy=False)
            )
        df._shm_metadata[col] = cls._create_col_metadata(df[col], ref, arr_modified)
        return
    else:
        print(
            f"Failed to update shm metadata for column '{col}' dtype {vals.dtype}"
        )
        return
_reconstruct_series(meta, block, nrows, index) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def _reconstruct_series(
    cls,
    meta: ShmSeriesMetadata,
    block: ShmBlock,
    nrows: int,
    index: list | None,
) -> pd.Series:
    col_name = meta["name"]
    col_name = str(col_name)
    dtype = np.dtype(meta["np_dtype"])
    pd_dtype = meta["pd_dtype"]
    params = meta["params"]

    arr = ShmArray.from_block(block=block, shape=(nrows,), dtype=dtype, order="C")

    if pd_dtype == "category":
        cats = params.get("categories", None)
        ordered = bool(params.get("ordered", False))

        if cats is None:
            raise ValueError("category dtype requires params['categories']")

        cats_pd_dtype = params.get("categories_dtype")
        if cats_pd_dtype is not None:
            cats_index = pd.Index(cats, dtype=str(cats_pd_dtype))
        else:
            cats_index = pd.Index(cats)

        cat_dtype = pd.CategoricalDtype(categories=cats_index, ordered=ordered)

        # arr should hold integer codes
        # If arr holds codes: build categorical from codes without copying codes.
        # Pandas uses -1 for missing.
        cat = pd.Categorical.from_codes(cast(Sequence[int], arr), dtype=cat_dtype)
        return pd.Series(cat, name=col_name, index=index)

    # 2) tz-aware datetimes
    # Expect arr to be int64 ns timestamps
    if pd_dtype.startswith("datetime64[ns,") or pd_dtype == "datetime64[ns, tz]":
        tz = params.get("tz")
        if not tz:
            # if stored as a plain datetime64[ns] string, just fall through
            pass
        else:
            dt = pd.to_datetime(arr, unit="ns", utc=True).tz_convert(tz)
            return pd.Series(dt, name=col_name, index=index)

    return pd.Series(arr, name=col_name, index=index)
__setitem__(key, value)
Source code in brmspy/types/shm_extensions.py
def __setitem__(self, key, value):
    is_existing = key in self.columns

    super().__setitem__(key, value)

    if is_existing:
        self._on_column_replaced(key)
    else:
        self._on_column_added(key)
_on_column_added(col)
Source code in brmspy/types/shm_extensions.py
def _on_column_added(self, col: str):
    from brmspy._singleton._shm_singleton import _get_shm

    shm = _get_shm()
    if not shm:
        return
    self._put_col_in_shm(self, col, shm)
_on_column_replaced(col)
Source code in brmspy/types/shm_extensions.py
def _on_column_replaced(self, col: str):
    from brmspy._singleton._shm_singleton import _get_shm

    shm = _get_shm()
    if not shm:
        return
    self._put_col_in_shm(self, col, shm, replace=True)

ShmDataFrameSimple

Bases: DataFrame

pandas DataFrame backed by a single shared-memory block (numeric only).

Attributes:

Name Type Description
block ShmRef

Reference to the shared-memory block backing the DataFrame's values.

Source code in brmspy/types/shm_extensions.py
class ShmDataFrameSimple(pd.DataFrame):
    """
    pandas DataFrame backed by a single shared-memory block (numeric only).

    Attributes
    ----------
    block : ShmRef
        Reference to the shared-memory block backing the DataFrame's values.
    """

    _metadata = ["_shm_metadata"]
    _shm_metadata: ShmRef

    @classmethod
    def from_block(
        cls,
        block: ShmBlock,
        nrows: int,
        ncols: int,
        columns: list[Any] | None,
        index: list[Any] | None,
        dtype: str | np.dtype,
    ) -> "ShmDataFrameSimple":
        """
        Construct a DataFrame backed by a single SHM block.

        Parameters
        ----------
        block : ShmBlock
            Attached shared-memory block containing a contiguous 2D numeric matrix.
        nrows, ncols : int
            DataFrame shape.
        columns, index : list[Any] or None
            Column/index labels.
        dtype : str or numpy.dtype
            Dtype of the matrix stored in the block.

        Returns
        -------
        ShmDataFrameSimple
        """
        _dtype = np.dtype(dtype)
        arr = ShmArray.from_block(shape=(ncols, nrows), dtype=_dtype, block=block)

        df = ShmDataFrameSimple(data=arr.T, index=index, columns=columns)
        df._set_shm_metadata(block.to_ref())
        return df

    def _set_shm_metadata(self, meta: ShmRef):
        self._shm_metadata = meta

Attributes

_metadata = ['_shm_metadata'] class-attribute instance-attribute
_shm_metadata instance-attribute

Functions

from_block(block, nrows, ncols, columns, index, dtype) classmethod

Construct a DataFrame backed by a single SHM block.

Parameters:

Name Type Description Default
block ShmBlock

Attached shared-memory block containing a contiguous 2D numeric matrix.

required
nrows int

DataFrame shape.

required
ncols int

DataFrame shape.

required
columns list[Any] or None

Column/index labels.

required
index list[Any] or None

Column/index labels.

required
dtype str or dtype

Dtype of the matrix stored in the block.

required

Returns:

Type Description
ShmDataFrameSimple
Source code in brmspy/types/shm_extensions.py
@classmethod
def from_block(
    cls,
    block: ShmBlock,
    nrows: int,
    ncols: int,
    columns: list[Any] | None,
    index: list[Any] | None,
    dtype: str | np.dtype,
) -> "ShmDataFrameSimple":
    """
    Construct a DataFrame backed by a single SHM block.

    Parameters
    ----------
    block : ShmBlock
        Attached shared-memory block containing a contiguous 2D numeric matrix.
    nrows, ncols : int
        DataFrame shape.
    columns, index : list[Any] or None
        Column/index labels.
    dtype : str or numpy.dtype
        Dtype of the matrix stored in the block.

    Returns
    -------
    ShmDataFrameSimple
    """
    _dtype = np.dtype(dtype)
    arr = ShmArray.from_block(shape=(ncols, nrows), dtype=_dtype, block=block)

    df = ShmDataFrameSimple(data=arr.T, index=index, columns=columns)
    df._set_shm_metadata(block.to_ref())
    return df
_set_shm_metadata(meta)
Source code in brmspy/types/shm_extensions.py
def _set_shm_metadata(self, meta: ShmRef):
    self._shm_metadata = meta

ShmBlock dataclass

Attached shared-memory block (name/size + live SharedMemory handle).

Notes

This object owns a SharedMemory handle and must be closed when no longer needed. In brmspy this is managed by a ShmPool implementation.

Source code in brmspy/types/shm.py
@dataclass
class ShmBlock:
    """
    Attached shared-memory block (name/size + live `SharedMemory` handle).

    Notes
    -----
    This object owns a `SharedMemory` handle and must be closed when no longer
    needed. In brmspy this is managed by a `ShmPool` implementation.
    """

    name: str
    size: int
    content_size: int
    shm: SharedMemory
    temporary: bool
    offset: int = 0

    def to_ref(self) -> ShmRef:
        return {
            "name": self.name,
            "size": self.size,
            "content_size": self.content_size,
            "temporary": self.temporary,
            "offset": self.offset,
        }

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
shm instance-attribute
temporary instance-attribute
offset = 0 class-attribute instance-attribute

Functions

to_ref()
Source code in brmspy/types/shm.py
def to_ref(self) -> ShmRef:
    return {
        "name": self.name,
        "size": self.size,
        "content_size": self.content_size,
        "temporary": self.temporary,
        "offset": self.offset,
    }
__init__(name, size, content_size, shm, temporary, offset=0)

ShmRef

Bases: TypedDict

Reference to a shared-memory block sent over IPC.

Attributes:

Name Type Description
name str

Shared memory block name (as assigned by SharedMemoryManager).

size int

Allocated block size in bytes.

content_size int

Actual used size

temporary bool

Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.

offset int

Byte offset within the SHM block (for slab sub-allocations).

Notes

Codecs may store a logical payload smaller than size. In that case, the codec metadata must include the logical nbytes/length so that decoders can slice the buffer appropriately.

Source code in brmspy/types/shm.py
class ShmRef(TypedDict):
    """
    Reference to a shared-memory block sent over IPC.

    Attributes
    ----------
    name : str
        Shared memory block name (as assigned by `SharedMemoryManager`).
    size : int
        Allocated block size in bytes.
    content_size : int
        Actual used size
    temporary : bool
        Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.
    offset : int
        Byte offset within the SHM block (for slab sub-allocations).

    Notes
    -----
    Codecs may store a logical payload smaller than `size`. In that case, the
    codec metadata must include the logical `nbytes`/length so that decoders can
    slice the buffer appropriately.
    """

    name: str
    size: int
    content_size: int
    temporary: bool
    offset: int

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
temporary instance-attribute
offset instance-attribute

NumpyArrayCodec

Bases: Encoder

SHM-backed codec for numpy.ndarray.

Source code in brmspy/_session/codec/builtin.py
class NumpyArrayCodec(Encoder):
    """SHM-backed codec for [`numpy.ndarray`](https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html)."""

    def can_encode(self, obj: Any) -> bool:
        return isinstance(obj, np.ndarray)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        _, ref, dtype, shape, order = ShmArray.to_shm(obj, shm_pool)
        return EncodeResult(
            codec=type(self).__name__,
            meta={"dtype": dtype, "shape": shape, "order": order},
            buffers=[ref],
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        with get_buf(payload["buffers"][0]) as (buf, _):
            return ShmArray.from_metadata(payload["meta"], buf)

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    return isinstance(obj, np.ndarray)
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    _, ref, dtype, shape, order = ShmArray.to_shm(obj, shm_pool)
    return EncodeResult(
        codec=type(self).__name__,
        meta={"dtype": dtype, "shape": shape, "order": order},
        buffers=[ref],
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    with get_buf(payload["buffers"][0]) as (buf, _):
        return ShmArray.from_metadata(payload["meta"], buf)

PandasDFCodec

Bases: Encoder

SHM-backed codec for numeric-only pandas.DataFrame.

Object-dtype columns are intentionally rejected to avoid surprising implicit conversions; those cases fall back to pickle.

Source code in brmspy/_session/codec/builtin.py
class PandasDFCodec(Encoder):
    """
    SHM-backed codec for numeric-only [`pandas.DataFrame`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).

    Object-dtype columns are intentionally rejected to avoid surprising implicit
    conversions; those cases fall back to pickle.
    """

    def can_encode(self, obj: Any) -> bool:
        if not isinstance(obj, pd.DataFrame):
            return False

        return True

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        assert isinstance(obj, pd.DataFrame)  # assert type

        meta: dict[str, Any] = {
            "columns": list(obj.columns),
            "index": pickle.dumps(obj.index, protocol=pickle.HIGHEST_PROTOCOL),
            "variant": "single",
        }
        buffers: list[ShmRef] = []

        if obj.empty:
            meta["variant"] = "empty"
        elif isinstance(obj, ShmDataFrameSimple):
            # single dtype matrix
            meta["variant"] = "single"
            meta["dtype"] = str(obj.values.dtype)
            meta["order"] = ShmArray.array_order(obj.values)
            buffers.append(obj._shm_metadata)
        elif isinstance(obj, ShmDataFrameColumns):
            # per column buffers
            meta["variant"] = "columnar"
            meta["order"] = "F"
            meta["columns"] = obj._shm_metadata
        else:
            # Fallback: put each column in its own SHM block
            meta["variant"] = "columnar"
            meta["order"] = "C"
            columns: dict[str, ShmSeriesMetadata] = {}

            # Estimate total bytes for slab pre-allocation
            estimated = sum(obj[c].nbytes for c in obj.columns)
            if estimated > 0:
                shm_pool.open_slab(estimated)
            try:
                for col_name in obj.columns:
                    col = obj[col_name]

                    arr_modified, spec, dtype, shape, order = ShmArray.to_shm(col, shm_pool)

                    columns[col_name] = ShmDataFrameColumns._create_col_metadata(
                        obj[col_name], spec, arr_modified
                    )
            finally:
                if estimated > 0:
                    shm_pool.seal_slab()
            meta["columns"] = columns

        return EncodeResult(codec=type(self).__name__, meta=meta, buffers=buffers)

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        meta = payload["meta"]
        if meta.get("variant") == "empty":
            return pd.DataFrame({})

        buffer_specs = payload["buffers"]

        index = pickle.loads(meta["index"])

        if meta.get("variant") == "single":
            spec = buffer_specs[0]
            with get_buf(buffer_specs[0]) as (buf, memview):
                dtype = np.dtype(meta["dtype"])
                nbytes = spec["size"]
                order = meta["order"]

                columns = meta["columns"]
                shape = (len(index), len(columns))

                # Only use the slice that actually holds array data
                view = memview[:nbytes]
                arr = np.ndarray(shape=shape, dtype=dtype, buffer=view, order=order)

                df = ShmDataFrameSimple(data=arr, index=index, columns=columns)
                df._set_shm_metadata(spec)

                return df
        elif meta.get("variant") == "columnar":
            columns_metadata: dict[str, ShmSeriesMetadata] = meta["columns"]
            nrows = len(index)

            columns = list(columns_metadata.keys())

            data: dict[str, pd.Series] = {}

            for i, col_name in enumerate(columns):
                metadata = columns_metadata[col_name]
                spec = metadata["block"]
                dtype = metadata["np_dtype"]
                with get_buf(spec) as (buf, view):
                    data[col_name] = ShmDataFrameColumns._reconstruct_series(
                        metadata, buf, nrows, index
                    )

            df = ShmDataFrameColumns(data=data)
            df._set_shm_metadata(columns_metadata)

            return df
        else:
            raise Exception(f"Unknown DataFrame variant {meta.get('variant')}")

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    if not isinstance(obj, pd.DataFrame):
        return False

    return True
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    assert isinstance(obj, pd.DataFrame)  # assert type

    meta: dict[str, Any] = {
        "columns": list(obj.columns),
        "index": pickle.dumps(obj.index, protocol=pickle.HIGHEST_PROTOCOL),
        "variant": "single",
    }
    buffers: list[ShmRef] = []

    if obj.empty:
        meta["variant"] = "empty"
    elif isinstance(obj, ShmDataFrameSimple):
        # single dtype matrix
        meta["variant"] = "single"
        meta["dtype"] = str(obj.values.dtype)
        meta["order"] = ShmArray.array_order(obj.values)
        buffers.append(obj._shm_metadata)
    elif isinstance(obj, ShmDataFrameColumns):
        # per column buffers
        meta["variant"] = "columnar"
        meta["order"] = "F"
        meta["columns"] = obj._shm_metadata
    else:
        # Fallback: put each column in its own SHM block
        meta["variant"] = "columnar"
        meta["order"] = "C"
        columns: dict[str, ShmSeriesMetadata] = {}

        # Estimate total bytes for slab pre-allocation
        estimated = sum(obj[c].nbytes for c in obj.columns)
        if estimated > 0:
            shm_pool.open_slab(estimated)
        try:
            for col_name in obj.columns:
                col = obj[col_name]

                arr_modified, spec, dtype, shape, order = ShmArray.to_shm(col, shm_pool)

                columns[col_name] = ShmDataFrameColumns._create_col_metadata(
                    obj[col_name], spec, arr_modified
                )
        finally:
            if estimated > 0:
                shm_pool.seal_slab()
        meta["columns"] = columns

    return EncodeResult(codec=type(self).__name__, meta=meta, buffers=buffers)
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    meta = payload["meta"]
    if meta.get("variant") == "empty":
        return pd.DataFrame({})

    buffer_specs = payload["buffers"]

    index = pickle.loads(meta["index"])

    if meta.get("variant") == "single":
        spec = buffer_specs[0]
        with get_buf(buffer_specs[0]) as (buf, memview):
            dtype = np.dtype(meta["dtype"])
            nbytes = spec["size"]
            order = meta["order"]

            columns = meta["columns"]
            shape = (len(index), len(columns))

            # Only use the slice that actually holds array data
            view = memview[:nbytes]
            arr = np.ndarray(shape=shape, dtype=dtype, buffer=view, order=order)

            df = ShmDataFrameSimple(data=arr, index=index, columns=columns)
            df._set_shm_metadata(spec)

            return df
    elif meta.get("variant") == "columnar":
        columns_metadata: dict[str, ShmSeriesMetadata] = meta["columns"]
        nrows = len(index)

        columns = list(columns_metadata.keys())

        data: dict[str, pd.Series] = {}

        for i, col_name in enumerate(columns):
            metadata = columns_metadata[col_name]
            spec = metadata["block"]
            dtype = metadata["np_dtype"]
            with get_buf(spec) as (buf, view):
                data[col_name] = ShmDataFrameColumns._reconstruct_series(
                    metadata, buf, nrows, index
                )

        df = ShmDataFrameColumns(data=data)
        df._set_shm_metadata(columns_metadata)

        return df
    else:
        raise Exception(f"Unknown DataFrame variant {meta.get('variant')}")

PickleCodec

Bases: Encoder

Pickle fallback codec (internal).

Always encodes successfully, so it must be registered last. The pickled bytes are still stored in SHM to keep pipe traffic small and bounded.

Source code in brmspy/_session/codec/builtin.py
class PickleCodec(Encoder):
    """
    Pickle fallback codec (internal).

    Always encodes successfully, so it must be registered last. The pickled bytes
    are still stored in SHM to keep pipe traffic small and bounded.
    """

    def can_encode(self, obj: Any) -> bool:
        # Fallback – always True
        return True

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        if obj is None:
            return EncodeResult(
                codec=type(self).__name__,
                meta={},
                buffers=[],
            )

        data = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
        block = shm_pool.alloc(len(data), temporary=True)
        block.shm.buf[: len(data)] = data

        # dont waste SHM, use meta instead of buffer
        meta: dict[str, Any] = {"length": len(data)}

        size_bytes = len(data)
        if size_bytes > ONE_MB * 10:
            size_mb = size_bytes / ONE_MB
            log_warning(
                f"PickleCodec encoding large object: type={type(obj)}, size={size_mb:,.2f} MB"
            )

        return EncodeResult(
            codec=type(self).__name__,
            meta=meta,
            buffers=[block.to_ref()],
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        specs = payload["buffers"]
        if len(specs) == 0:
            return None

        with get_buf(specs[0]) as (block, buf):
            length = block.content_size
            b = bytes(buf[:length])
            return pickle.loads(b)

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    # Fallback – always True
    return True
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    if obj is None:
        return EncodeResult(
            codec=type(self).__name__,
            meta={},
            buffers=[],
        )

    data = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
    block = shm_pool.alloc(len(data), temporary=True)
    block.shm.buf[: len(data)] = data

    # dont waste SHM, use meta instead of buffer
    meta: dict[str, Any] = {"length": len(data)}

    size_bytes = len(data)
    if size_bytes > ONE_MB * 10:
        size_mb = size_bytes / ONE_MB
        log_warning(
            f"PickleCodec encoding large object: type={type(obj)}, size={size_mb:,.2f} MB"
        )

    return EncodeResult(
        codec=type(self).__name__,
        meta=meta,
        buffers=[block.to_ref()],
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    specs = payload["buffers"]
    if len(specs) == 0:
        return None

    with get_buf(specs[0]) as (block, buf):
        length = block.content_size
        b = bytes(buf[:length])
        return pickle.loads(b)

InferenceDataCodec

Bases: Encoder

Encode arviz.InferenceData by pushing its underlying arrays into shm.

Source code in brmspy/_session/codec/builtin.py
class InferenceDataCodec(Encoder):
    """Encode arviz.InferenceData by pushing its underlying arrays into shm."""

    def can_encode(self, obj: Any) -> bool:
        return isinstance(obj, az.InferenceData)

    def encode(self, obj: az.InferenceData, shm_pool: Any) -> EncodeResult:
        buffers: list[ShmRef] = []
        groups_meta: dict[str, Any] = {}
        total_bytes = 0

        # -- Pass 1: estimate total SHM bytes needed -----------------------
        estimated = 0
        for group_name in obj.groups():
            ds: xr.Dataset = getattr(obj, group_name)
            for coord in ds.coords.values():
                values = np.asarray(coord.values)
                if values.dtype.kind in "iufb":
                    estimated += values.nbytes
            for da in ds.data_vars.values():
                estimated += np.asarray(da.data).nbytes

        # -- Open slab (all allocs below go into one SHM block) -----------
        if estimated > 0:
            shm_pool.open_slab(estimated)

        try:
            # Walk each group: posterior, posterior_predictive, etc.
            for group_name in obj.groups():
                ds: xr.Dataset = getattr(obj, group_name)
                g_meta: dict[str, Any] = {
                    "data_vars": {},
                    "coords": {},
                }

                # COORDS: generally smaller, but can be arrays.
                for cname, coord in ds.coords.items():
                    values = np.asarray(coord.values)
                    if values.dtype.kind in "iufb":  # numeric-ish
                        data = values.tobytes(order="C")
                        nbytes = len(data)
                        block = shm_pool.alloc(nbytes)
                        block.shm.buf[block.offset : block.offset + nbytes] = data

                        buffer_idx = len(buffers)
                        buffers.append(block.to_ref())
                        total_bytes += nbytes

                        g_meta["coords"][cname] = {
                            "kind": "array",
                            "buffer_idx": buffer_idx,
                            "dtype": str(values.dtype),
                            "shape": list(values.shape),
                            "dims": list(coord.dims),
                            "nbytes": nbytes,
                        }
                    else:
                        # Non-numeric / object coords: keep them small & pickle in meta.
                        g_meta["coords"][cname] = {
                            "kind": "pickle",
                            "dims": list(coord.dims),
                            "payload": pickle.dumps(
                                coord.values, protocol=pickle.HIGHEST_PROTOCOL
                            ),
                        }

                # DATA VARS: main heavy arrays
                for vname, da in ds.data_vars.items():
                    arr = np.asarray(da.data)
                    _, spec, dtype, shape, order = ShmArray.to_shm(arr, shm_pool)
                    meta = {"dtype": dtype, "shape": shape, "order": order}
                    nbytes = spec["content_size"]

                    buffer_idx = len(buffers)
                    buffers.append(spec)
                    total_bytes += nbytes

                    g_meta["data_vars"][vname] = {
                        "buffer_idx": buffer_idx,
                        "dtype": str(meta["dtype"]),
                        "shape": list(meta["shape"]),
                        "dims": list(da.dims),
                        "nbytes": nbytes,
                    }

                groups_meta[group_name] = g_meta
        finally:
            if estimated > 0:
                shm_pool.seal_slab()

        meta: dict[str, Any] = {
            "groups": groups_meta,
            "codec_version": 1,
        }

        return EncodeResult(
            codec=type(self).__name__,
            meta=meta,
            buffers=buffers,
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        meta = payload["meta"]
        specs = payload["buffers"]
        groups_meta = meta["groups"]
        groups: dict[str, xr.Dataset] = {}

        for group_name, g_meta in groups_meta.items():
            data_vars = {}
            coords = {}

            # Rebuild coords
            for cname, cmeta in g_meta["coords"].items():
                kind = cmeta["kind"]
                if kind == "array":
                    spec = specs[cmeta["buffer_idx"]]
                    with get_buf(spec) as (block, _):
                        arr = ShmArray.from_block(
                            block, shape=cmeta["shape"], dtype=np.dtype(cmeta["dtype"])
                        )
                        coords[cname] = (tuple(cmeta["dims"]), arr)
                elif kind == "pickle":
                    values = pickle.loads(cmeta["payload"])
                    coords[cname] = (tuple(cmeta["dims"]), values)
                else:
                    raise ValueError(f"Unknown coord kind: {kind!r}")

            # Rebuild data_vars
            for vname, vmeta in g_meta["data_vars"].items():
                spec = specs[vmeta["buffer_idx"]]
                with get_buf(spec) as (block, _):
                    arr = ShmArray.from_block(
                        block, vmeta["shape"], dtype=np.dtype(vmeta["dtype"])
                    )
                    data_vars[vname] = (tuple(vmeta["dims"]), arr)

            ds = xr.Dataset(
                data_vars=data_vars,
                coords=coords,
            )
            groups[group_name] = ds

        # Construct InferenceData from datasets
        idata = az.InferenceData(**groups, warn_on_custom_groups=False)
        return idata

Functions

can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    return isinstance(obj, az.InferenceData)
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: az.InferenceData, shm_pool: Any) -> EncodeResult:
    buffers: list[ShmRef] = []
    groups_meta: dict[str, Any] = {}
    total_bytes = 0

    # -- Pass 1: estimate total SHM bytes needed -----------------------
    estimated = 0
    for group_name in obj.groups():
        ds: xr.Dataset = getattr(obj, group_name)
        for coord in ds.coords.values():
            values = np.asarray(coord.values)
            if values.dtype.kind in "iufb":
                estimated += values.nbytes
        for da in ds.data_vars.values():
            estimated += np.asarray(da.data).nbytes

    # -- Open slab (all allocs below go into one SHM block) -----------
    if estimated > 0:
        shm_pool.open_slab(estimated)

    try:
        # Walk each group: posterior, posterior_predictive, etc.
        for group_name in obj.groups():
            ds: xr.Dataset = getattr(obj, group_name)
            g_meta: dict[str, Any] = {
                "data_vars": {},
                "coords": {},
            }

            # COORDS: generally smaller, but can be arrays.
            for cname, coord in ds.coords.items():
                values = np.asarray(coord.values)
                if values.dtype.kind in "iufb":  # numeric-ish
                    data = values.tobytes(order="C")
                    nbytes = len(data)
                    block = shm_pool.alloc(nbytes)
                    block.shm.buf[block.offset : block.offset + nbytes] = data

                    buffer_idx = len(buffers)
                    buffers.append(block.to_ref())
                    total_bytes += nbytes

                    g_meta["coords"][cname] = {
                        "kind": "array",
                        "buffer_idx": buffer_idx,
                        "dtype": str(values.dtype),
                        "shape": list(values.shape),
                        "dims": list(coord.dims),
                        "nbytes": nbytes,
                    }
                else:
                    # Non-numeric / object coords: keep them small & pickle in meta.
                    g_meta["coords"][cname] = {
                        "kind": "pickle",
                        "dims": list(coord.dims),
                        "payload": pickle.dumps(
                            coord.values, protocol=pickle.HIGHEST_PROTOCOL
                        ),
                    }

            # DATA VARS: main heavy arrays
            for vname, da in ds.data_vars.items():
                arr = np.asarray(da.data)
                _, spec, dtype, shape, order = ShmArray.to_shm(arr, shm_pool)
                meta = {"dtype": dtype, "shape": shape, "order": order}
                nbytes = spec["content_size"]

                buffer_idx = len(buffers)
                buffers.append(spec)
                total_bytes += nbytes

                g_meta["data_vars"][vname] = {
                    "buffer_idx": buffer_idx,
                    "dtype": str(meta["dtype"]),
                    "shape": list(meta["shape"]),
                    "dims": list(da.dims),
                    "nbytes": nbytes,
                }

            groups_meta[group_name] = g_meta
    finally:
        if estimated > 0:
            shm_pool.seal_slab()

    meta: dict[str, Any] = {
        "groups": groups_meta,
        "codec_version": 1,
    }

    return EncodeResult(
        codec=type(self).__name__,
        meta=meta,
        buffers=buffers,
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    meta = payload["meta"]
    specs = payload["buffers"]
    groups_meta = meta["groups"]
    groups: dict[str, xr.Dataset] = {}

    for group_name, g_meta in groups_meta.items():
        data_vars = {}
        coords = {}

        # Rebuild coords
        for cname, cmeta in g_meta["coords"].items():
            kind = cmeta["kind"]
            if kind == "array":
                spec = specs[cmeta["buffer_idx"]]
                with get_buf(spec) as (block, _):
                    arr = ShmArray.from_block(
                        block, shape=cmeta["shape"], dtype=np.dtype(cmeta["dtype"])
                    )
                    coords[cname] = (tuple(cmeta["dims"]), arr)
            elif kind == "pickle":
                values = pickle.loads(cmeta["payload"])
                coords[cname] = (tuple(cmeta["dims"]), values)
            else:
                raise ValueError(f"Unknown coord kind: {kind!r}")

        # Rebuild data_vars
        for vname, vmeta in g_meta["data_vars"].items():
            spec = specs[vmeta["buffer_idx"]]
            with get_buf(spec) as (block, _):
                arr = ShmArray.from_block(
                    block, vmeta["shape"], dtype=np.dtype(vmeta["dtype"])
                )
                data_vars[vname] = (tuple(vmeta["dims"]), arr)

        ds = xr.Dataset(
            data_vars=data_vars,
            coords=coords,
        )
        groups[group_name] = ds

    # Construct InferenceData from datasets
    idata = az.InferenceData(**groups, warn_on_custom_groups=False)
    return idata

GenericDataClassCodec

Bases: Encoder

Generic codec for dataclasses (internal).

Encodes each init=True field by delegating to a CodecRegistry. Use skip_fields to exclude fields that must not cross the boundary.

Source code in brmspy/_session/codec/builtin.py
class GenericDataClassCodec(Encoder):
    """
    Generic codec for dataclasses (internal).

    Encodes each `init=True` field by delegating to a
    [`CodecRegistry`][brmspy._session.codec.base.CodecRegistry]. Use `skip_fields` to exclude
    fields that must not cross the boundary.
    """

    def __init__(
        self,
        cls: type[Any],
        registry: CodecRegistry,
        *,
        skip_fields: set[str] | None = None,
    ) -> None:
        if not is_dataclass(cls):
            raise TypeError(f"{cls!r} is not a dataclass")

        self._cls = cls
        self._registry = registry
        self.codec = f"dataclass::{cls.__module__}.{cls.__qualname__}"

        self._skip_fields = skip_fields or set()
        self._field_names: list[str] = []

        # Precompute which fields we actually encode
        for f in dc_fields(cls):
            if not f.init:
                continue
            if f.name in self._skip_fields:
                continue
            self._field_names.append(f.name)

    def can_encode(self, obj: Any) -> bool:
        return isinstance(obj, self._cls)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        buffers: list[ShmRef] = []
        fields_meta: dict[str, Any] = {}

        for field_name in self._field_names:
            value = getattr(obj, field_name)

            # Delegate to registry; chooses right encoder for the actual *runtime* type
            res = self._registry.encode(value, shm_pool)

            start = len(buffers)
            count = len(res.buffers)

            fields_meta[field_name] = {
                "codec": res.codec,
                "meta": res.meta,
                "start": start,
                "count": count,
            }

            buffers.extend(res.buffers)

        meta: dict[str, Any] = {
            "module": self._cls.__module__,
            "qualname": self._cls.__qualname__,
            "fields": fields_meta,
        }

        return EncodeResult(
            codec=self.codec,
            meta=meta,
            buffers=buffers,
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        meta = payload["meta"]
        fields_meta: dict[str, Any] = meta["fields"]
        kwargs: dict[str, Any] = {}

        assert len(args) > 0
        pool = args[0]

        specs = payload["buffers"]

        for field_name, fmeta in fields_meta.items():
            codec_name = fmeta["codec"]
            start = fmeta["start"]
            count = fmeta["count"]

            subpayload: PayloadRef = {
                "codec": codec_name,
                "meta": fmeta["meta"],
                "buffers": specs[start : start + count],
            }

            # IMPORTANT: slice buffer_specs in the same way as buffers
            value = self._registry.decode(subpayload, pool)
            kwargs[field_name] = value

        return self._cls(**kwargs)

Attributes

_cls = cls instance-attribute
_registry = registry instance-attribute
codec = f'dataclass::{cls.__module__}.{cls.__qualname__}' instance-attribute
_skip_fields = skip_fields or set() instance-attribute
_field_names = [] instance-attribute

Functions

__init__(cls, registry, *, skip_fields=None)
Source code in brmspy/_session/codec/builtin.py
def __init__(
    self,
    cls: type[Any],
    registry: CodecRegistry,
    *,
    skip_fields: set[str] | None = None,
) -> None:
    if not is_dataclass(cls):
        raise TypeError(f"{cls!r} is not a dataclass")

    self._cls = cls
    self._registry = registry
    self.codec = f"dataclass::{cls.__module__}.{cls.__qualname__}"

    self._skip_fields = skip_fields or set()
    self._field_names: list[str] = []

    # Precompute which fields we actually encode
    for f in dc_fields(cls):
        if not f.init:
            continue
        if f.name in self._skip_fields:
            continue
        self._field_names.append(f.name)
can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    return isinstance(obj, self._cls)
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    buffers: list[ShmRef] = []
    fields_meta: dict[str, Any] = {}

    for field_name in self._field_names:
        value = getattr(obj, field_name)

        # Delegate to registry; chooses right encoder for the actual *runtime* type
        res = self._registry.encode(value, shm_pool)

        start = len(buffers)
        count = len(res.buffers)

        fields_meta[field_name] = {
            "codec": res.codec,
            "meta": res.meta,
            "start": start,
            "count": count,
        }

        buffers.extend(res.buffers)

    meta: dict[str, Any] = {
        "module": self._cls.__module__,
        "qualname": self._cls.__qualname__,
        "fields": fields_meta,
    }

    return EncodeResult(
        codec=self.codec,
        meta=meta,
        buffers=buffers,
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    meta = payload["meta"]
    fields_meta: dict[str, Any] = meta["fields"]
    kwargs: dict[str, Any] = {}

    assert len(args) > 0
    pool = args[0]

    specs = payload["buffers"]

    for field_name, fmeta in fields_meta.items():
        codec_name = fmeta["codec"]
        start = fmeta["start"]
        count = fmeta["count"]

        subpayload: PayloadRef = {
            "codec": codec_name,
            "meta": fmeta["meta"],
            "buffers": specs[start : start + count],
        }

        # IMPORTANT: slice buffer_specs in the same way as buffers
        value = self._registry.decode(subpayload, pool)
        kwargs[field_name] = value

    return self._cls(**kwargs)

Functions

log_warning(msg, method_name=None)

Log a warning message.

Parameters:

Name Type Description Default
msg str

The warning message to log

required
method_name str

The name of the method/function. If None, will auto-detect from call stack.

None
Source code in brmspy/helpers/log.py
def log_warning(msg: str, method_name: str | None = None):
    """
    Log a warning message.

    Parameters
    ----------
    msg : str
        The warning message to log
    method_name : str, optional
        The name of the method/function. If None, will auto-detect from call stack.

    """
    log(msg, method_name=method_name, level=logging.WARNING)