Skip to content

_arrays

Attributes

PyObject = Union[dict, list, str, float, int, np.dtype, None, Any, pd.DataFrame, pd.Series, np.ndarray, az.InferenceData, xr.DataArray, xr.Dataset] module-attribute

Union of common Python-side objects produced by R→Python conversion.

This is intentionally broad: brmspy frequently returns standard scientific Python types (NumPy/pandas/xarray/ArviZ), plus plain dict/list primitives.

Note

Avoid adding Any here unless absolutely necessary; it defeats the purpose of having this alias.

Classes

ShmPool

Minimal interface for allocating and attaching shared-memory blocks.

The concrete implementation lives in brmspy._session.transport.ShmPool and tracks blocks so they can be closed on teardown.

Source code in brmspy/types/shm.py
class ShmPool:
    """
    Minimal interface for allocating and attaching shared-memory blocks.

    The concrete implementation lives in
    [`brmspy._session.transport.ShmPool`][brmspy._session.transport.ShmPool] and tracks
    blocks so they can be closed on teardown.
    """

    def __init__(self, manager: SharedMemoryManager) -> None:
        """
        Create a pool bound to an existing `SharedMemoryManager`.

        Parameters
        ----------
        manager : multiprocessing.managers.SharedMemoryManager
            Manager used to allocate blocks.
        """
        ...

    def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
        """
        Allocate a new shared-memory block.

        Parameters
        ----------
        size : int
            Size in bytes.

        Returns
        -------
        ShmBlock
            Newly allocated block.
        """
        ...

    def attach(self, ref: ShmRef) -> ShmBlock:
        """
        Attach to an existing shared-memory block by name.

        Returns
        -------
        ShmBlock
            Attached block.
        """
        ...

    def close_all(self) -> None:
        """
        Close all tracked shared-memory handles owned by this pool.

        Returns
        -------
        None
        """
        ...

    def gc(self, name: str | None = None) -> None: ...

Functions

__init__(manager)

Create a pool bound to an existing SharedMemoryManager.

Parameters:

Name Type Description Default
manager SharedMemoryManager

Manager used to allocate blocks.

required
Source code in brmspy/types/shm.py
def __init__(self, manager: SharedMemoryManager) -> None:
    """
    Create a pool bound to an existing `SharedMemoryManager`.

    Parameters
    ----------
    manager : multiprocessing.managers.SharedMemoryManager
        Manager used to allocate blocks.
    """
    ...
alloc(size, temporary=False)

Allocate a new shared-memory block.

Parameters:

Name Type Description Default
size int

Size in bytes.

required

Returns:

Type Description
ShmBlock

Newly allocated block.

Source code in brmspy/types/shm.py
def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
    """
    Allocate a new shared-memory block.

    Parameters
    ----------
    size : int
        Size in bytes.

    Returns
    -------
    ShmBlock
        Newly allocated block.
    """
    ...
attach(ref)

Attach to an existing shared-memory block by name.

Returns:

Type Description
ShmBlock

Attached block.

Source code in brmspy/types/shm.py
def attach(self, ref: ShmRef) -> ShmBlock:
    """
    Attach to an existing shared-memory block by name.

    Returns
    -------
    ShmBlock
        Attached block.
    """
    ...
close_all()

Close all tracked shared-memory handles owned by this pool.

Returns:

Type Description
None
Source code in brmspy/types/shm.py
def close_all(self) -> None:
    """
    Close all tracked shared-memory handles owned by this pool.

    Returns
    -------
    None
    """
    ...
gc(name=None)
Source code in brmspy/types/shm.py
def gc(self, name: str | None = None) -> None: ...

ShmArray

Bases: ndarray

NumPy array view backed by a shared-memory block.

Attributes:

Name Type Description
block ShmRef

Reference to the shared-memory block backing the array data.

Notes

This is a view over SharedMemory.buf. Closing/unlinking the underlying shared memory while the array is still in use will lead to undefined behavior.

Source code in brmspy/types/shm_extensions.py
class ShmArray(np.ndarray):
    """
    NumPy array view backed by a shared-memory block.

    Attributes
    ----------
    block : ShmRef
        Reference to the shared-memory block backing the array data.

    Notes
    -----
    This is a *view* over `SharedMemory.buf`. Closing/unlinking the underlying
    shared memory while the array is still in use will lead to undefined
    behavior.
    """

    _shm_metadata: ShmRef  # for type checkers

    @classmethod
    def from_metadata(
        cls, meta: ShmArrayMetadata | dict[str, Any], block: ShmBlock
    ) -> np.ndarray:
        dtype = np.dtype(meta["dtype"])
        shape = tuple(meta["shape"])
        order = meta["order"]

        return ShmArray.from_block(block=block, shape=shape, dtype=dtype, order=order)

    @classmethod
    def from_block(
        cls, block: ShmBlock, shape: tuple[int, ...], dtype: np.dtype, **kwargs
    ) -> Union["ShmArray", np.ndarray]:
        """
        Create an array view backed by an existing shared-memory block.

        Parameters
        ----------
        block : ShmBlock
            Attached shared-memory block.
        shape : tuple[int, ...]
            Desired array shape.
        dtype : numpy.dtype
            NumPy dtype of the array.
        **kwargs
            Reserved for future compatibility. Currently unused.

        Returns
        -------
        ShmArray
            Array view into the shared-memory buffer.
        """
        is_object = np.dtype(dtype) == np.dtype("O")

        if not is_object:
            if block.shm.buf:
                view = memoryview(block.shm.buf)
                view = view[: block.content_size]
            else:
                view = None
            base = np.ndarray(
                shape=shape,
                dtype=dtype,
                buffer=view,
                order=kwargs.get("order", "F"),
            )
            obj = base.view(ShmArray)
            obj._shm_metadata = block.to_ref()
        else:
            assert block.shm.buf
            view = memoryview(block.shm.buf)
            view = view[: block.content_size]
            payload = bytes(view)
            obj = pickle.loads(payload)
            assert isinstance(obj, np.ndarray)

        return obj

    @classmethod
    def array_order(cls, a: np.ndarray) -> Literal["C", "F", "non-contiguous"]:
        """
        Determine how an array can be reconstructed from a raw buffer.

        Returns `"C"` for C-contiguous arrays, `"F"` for Fortran-contiguous arrays,
        otherwise `"non-contiguous"` (meaning: bytes were obtained by forcing
        a contiguous copy during encoding).
        """
        if a.flags["C_CONTIGUOUS"]:
            return "C"
        if a.flags["F_CONTIGUOUS"]:
            return "F"
        return "non-contiguous"

    @classmethod
    def is_string_object(cls, a: np.ndarray, sample: int = 1000):
        if np.dtype(a.dtype) != np.dtype("O"):
            return False
        it = a.flat
        for _ in range(min(sample, a.size)):
            v = next(it, None)
            if v is not None and not isinstance(v, str):
                return False
        return True

    @classmethod
    def to_shm(
        cls, obj: np.ndarray | pd.Series | list, shm_pool: Any
    ) -> tuple[np.ndarray | None, ShmRef, str, list[int], str]:
        if isinstance(obj, pd.Series):
            if isinstance(obj.dtype, pd.CategoricalDtype):
                arr = obj.cat.codes.to_numpy(copy=False)
            else:
                arr = obj.to_numpy(copy=False)
        elif not isinstance(obj, np.ndarray):
            arr = np.asarray(obj)
        else:
            arr = obj

        is_object = np.dtype(arr.dtype) == np.dtype("O")
        is_string = cls.is_string_object(arr)

        arr_modified = None
        if isinstance(arr, ShmArray):
            ref = arr._shm_metadata

        else:
            temporary = False
            if not is_object:
                data = arr.tobytes(order="C")
            elif is_string:
                arr = arr.astype("U")
                arr_modified = arr
                data = arr.tobytes(order="C")
            else:
                data = pickle.dumps(arr, protocol=pickle.HIGHEST_PROTOCOL)
                temporary = True

            nbytes = len(data)

            # Ask for exactly nbytes; OS may round up internally, that's fine.
            block = shm_pool.alloc(nbytes, temporary=temporary)
            block.shm.buf[:nbytes] = data
            ref = block.to_ref()

        ref, dtype, shape, order = (
            ref,
            str(arr.dtype),
            list(arr.shape),
            cls.array_order(arr),
        )

        return arr_modified, ref, dtype, shape, order

Attributes

_shm_metadata instance-attribute

Functions

from_metadata(meta, block) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def from_metadata(
    cls, meta: ShmArrayMetadata | dict[str, Any], block: ShmBlock
) -> np.ndarray:
    dtype = np.dtype(meta["dtype"])
    shape = tuple(meta["shape"])
    order = meta["order"]

    return ShmArray.from_block(block=block, shape=shape, dtype=dtype, order=order)
from_block(block, shape, dtype, **kwargs) classmethod

Create an array view backed by an existing shared-memory block.

Parameters:

Name Type Description Default
block ShmBlock

Attached shared-memory block.

required
shape tuple[int, ...]

Desired array shape.

required
dtype dtype

NumPy dtype of the array.

required
**kwargs

Reserved for future compatibility. Currently unused.

{}

Returns:

Type Description
ShmArray

Array view into the shared-memory buffer.

Source code in brmspy/types/shm_extensions.py
@classmethod
def from_block(
    cls, block: ShmBlock, shape: tuple[int, ...], dtype: np.dtype, **kwargs
) -> Union["ShmArray", np.ndarray]:
    """
    Create an array view backed by an existing shared-memory block.

    Parameters
    ----------
    block : ShmBlock
        Attached shared-memory block.
    shape : tuple[int, ...]
        Desired array shape.
    dtype : numpy.dtype
        NumPy dtype of the array.
    **kwargs
        Reserved for future compatibility. Currently unused.

    Returns
    -------
    ShmArray
        Array view into the shared-memory buffer.
    """
    is_object = np.dtype(dtype) == np.dtype("O")

    if not is_object:
        if block.shm.buf:
            view = memoryview(block.shm.buf)
            view = view[: block.content_size]
        else:
            view = None
        base = np.ndarray(
            shape=shape,
            dtype=dtype,
            buffer=view,
            order=kwargs.get("order", "F"),
        )
        obj = base.view(ShmArray)
        obj._shm_metadata = block.to_ref()
    else:
        assert block.shm.buf
        view = memoryview(block.shm.buf)
        view = view[: block.content_size]
        payload = bytes(view)
        obj = pickle.loads(payload)
        assert isinstance(obj, np.ndarray)

    return obj
array_order(a) classmethod

Determine how an array can be reconstructed from a raw buffer.

Returns "C" for C-contiguous arrays, "F" for Fortran-contiguous arrays, otherwise "non-contiguous" (meaning: bytes were obtained by forcing a contiguous copy during encoding).

Source code in brmspy/types/shm_extensions.py
@classmethod
def array_order(cls, a: np.ndarray) -> Literal["C", "F", "non-contiguous"]:
    """
    Determine how an array can be reconstructed from a raw buffer.

    Returns `"C"` for C-contiguous arrays, `"F"` for Fortran-contiguous arrays,
    otherwise `"non-contiguous"` (meaning: bytes were obtained by forcing
    a contiguous copy during encoding).
    """
    if a.flags["C_CONTIGUOUS"]:
        return "C"
    if a.flags["F_CONTIGUOUS"]:
        return "F"
    return "non-contiguous"
is_string_object(a, sample=1000) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def is_string_object(cls, a: np.ndarray, sample: int = 1000):
    if np.dtype(a.dtype) != np.dtype("O"):
        return False
    it = a.flat
    for _ in range(min(sample, a.size)):
        v = next(it, None)
        if v is not None and not isinstance(v, str):
            return False
    return True
to_shm(obj, shm_pool) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def to_shm(
    cls, obj: np.ndarray | pd.Series | list, shm_pool: Any
) -> tuple[np.ndarray | None, ShmRef, str, list[int], str]:
    if isinstance(obj, pd.Series):
        if isinstance(obj.dtype, pd.CategoricalDtype):
            arr = obj.cat.codes.to_numpy(copy=False)
        else:
            arr = obj.to_numpy(copy=False)
    elif not isinstance(obj, np.ndarray):
        arr = np.asarray(obj)
    else:
        arr = obj

    is_object = np.dtype(arr.dtype) == np.dtype("O")
    is_string = cls.is_string_object(arr)

    arr_modified = None
    if isinstance(arr, ShmArray):
        ref = arr._shm_metadata

    else:
        temporary = False
        if not is_object:
            data = arr.tobytes(order="C")
        elif is_string:
            arr = arr.astype("U")
            arr_modified = arr
            data = arr.tobytes(order="C")
        else:
            data = pickle.dumps(arr, protocol=pickle.HIGHEST_PROTOCOL)
            temporary = True

        nbytes = len(data)

        # Ask for exactly nbytes; OS may round up internally, that's fine.
        block = shm_pool.alloc(nbytes, temporary=temporary)
        block.shm.buf[:nbytes] = data
        ref = block.to_ref()

    ref, dtype, shape, order = (
        ref,
        str(arr.dtype),
        list(arr.shape),
        cls.array_order(arr),
    )

    return arr_modified, ref, dtype, shape, order

ShmDataFrameColumns

Bases: DataFrame

pandas DataFrame backed by per-column shared-memory blocks (numeric only).

Attributes:

Name Type Description
_blocks_columns dict[str, PandasColumnMetadata]

Mapping from column name to data required for its reconstruction

Source code in brmspy/types/shm_extensions.py
class ShmDataFrameColumns(pd.DataFrame):
    """
    pandas DataFrame backed by per-column shared-memory blocks (numeric only).

    Attributes
    ----------
    _blocks_columns : dict[str, PandasColumnMetadata]
        Mapping from column name to data required for its reconstruction
    """

    _metadata = ["_shm_metadata"]
    _shm_metadata: dict[str, ShmSeriesMetadata]

    @property
    def _constructor(self):
        # We INTENTIONALLY do not return ShmSeriesMetadata
        # whenever the dataframe is reindexed, slices, we want to get rid of all _shm_metadata,
        # as otherwise we will have immediate problems with buffer alignment
        return pd.DataFrame

    @classmethod
    def _create_col_metadata(
        cls, series: pd.Series, block: ShmRef, arr: np.ndarray | None = None, **params
    ) -> ShmSeriesMetadata:
        pd_dtype = series.dtype

        # Decide what ndarray is actually stored in SHM
        if isinstance(pd_dtype, pd.CategoricalDtype):
            # store dtype params
            params["categories"] = pd_dtype.categories.to_numpy(dtype=object).tolist()
            params["ordered"] = bool(pd_dtype.ordered)
            params["categories_dtype"] = pd_dtype.categories.dtype.name

            # IMPORTANT: store integer codes, not values
            # -1 means missing
            array = series.cat.codes.to_numpy(copy=False)
        elif arr is not None:
            assert isinstance(arr, np.ndarray)
            array = arr
        else:
            # for numeric-only SHM: require a real numpy array output here
            array = series.to_numpy(copy=False)

        # Optional param dtypes you mentioned (only if you support them)
        if isinstance(pd_dtype, pd.PeriodDtype):
            params["freq"] = str(pd_dtype.freq)

        if isinstance(pd_dtype, pd.IntervalDtype):
            params["subtype"] = np.dtype(pd_dtype.subtype).str
            params["closed"] = str(pd_dtype.closed)  # type: ignore[attr-defined]

        meta: ShmSeriesMetadata = {
            "name": series.name,
            "np_dtype": str(array.dtype),
            "pd_dtype": str(pd_dtype.name),
            "block": block,
            "params": params,
        }

        if np.dtype(meta["np_dtype"]) == np.dtype("O"):
            # Sanity check. If this goes wrong, it will be frustrating to debug
            if ShmArray.is_string_object(array, sample=25):
                raise Exception(
                    f"{series.name} column is string, but stored as object!"
                )

        return meta

    def _set_col_raw(self, col: str, value) -> None:
        # bypass our __setitem__
        pd.DataFrame.__setitem__(self, col, value)

    def _set_shm_metadata(self, meta: dict[str, ShmSeriesMetadata]):
        self._shm_metadata = meta

    @classmethod
    def _put_col_in_shm(
        cls, df: "ShmDataFrameColumns", col: str, shm_pool: Any, replace=False
    ):
        vals = df[col].to_numpy(copy=False)
        if (
            isinstance(vals, ShmArray)
            and hasattr(vals, "_shm_metadata")
            and not replace
        ):
            pass
        elif isinstance(vals, np.ndarray):
            if col in df._shm_metadata:
                del df._shm_metadata[col]
            arr_modified, ref, dtype, shape, order = ShmArray.to_shm(df[col], shm_pool)

            if arr_modified is not None:
                # Only needed for string-object normalization; for numeric/codes it's None
                df._set_col_raw(
                    col, pd.Series(arr_modified, index=df.index, name=col, copy=False)
                )
            df._shm_metadata[col] = cls._create_col_metadata(df[col], ref, arr_modified)
            return
        else:
            print(
                f"Failed to update shm metadata for column '{col}' dtype {vals.dtype}"
            )
            return

    @classmethod
    def _reconstruct_series(
        cls,
        meta: ShmSeriesMetadata,
        block: ShmBlock,
        nrows: int,
        index: list | None,
    ) -> pd.Series:
        col_name = meta["name"]
        col_name = str(col_name)
        dtype = np.dtype(meta["np_dtype"])
        pd_dtype = meta["pd_dtype"]
        params = meta["params"]

        arr = ShmArray.from_block(block=block, shape=(nrows,), dtype=dtype, order="C")

        if pd_dtype == "category":
            cats = params.get("categories", None)
            ordered = bool(params.get("ordered", False))

            if cats is None:
                raise ValueError("category dtype requires params['categories']")

            cats_pd_dtype = params.get("categories_dtype")
            if cats_pd_dtype is not None:
                cats_index = pd.Index(cats, dtype=str(cats_pd_dtype))
            else:
                cats_index = pd.Index(cats)

            cat_dtype = pd.CategoricalDtype(categories=cats_index, ordered=ordered)

            # arr should hold integer codes
            # If arr holds codes: build categorical from codes without copying codes.
            # Pandas uses -1 for missing.
            cat = pd.Categorical.from_codes(cast(Sequence[int], arr), dtype=cat_dtype)
            return pd.Series(cat, name=col_name, index=index)

        # 2) tz-aware datetimes
        # Expect arr to be int64 ns timestamps
        if pd_dtype.startswith("datetime64[ns,") or pd_dtype == "datetime64[ns, tz]":
            tz = params.get("tz")
            if not tz:
                # if stored as a plain datetime64[ns] string, just fall through
                pass
            else:
                dt = pd.to_datetime(arr, unit="ns", utc=True).tz_convert(tz)
                return pd.Series(dt, name=col_name, index=index)

        return pd.Series(arr, name=col_name, index=index)

    def __setitem__(self, key, value):
        is_existing = key in self.columns

        super().__setitem__(key, value)

        if is_existing:
            self._on_column_replaced(key)
        else:
            self._on_column_added(key)

    def _on_column_added(self, col: str):
        from brmspy._singleton._shm_singleton import _get_shm

        shm = _get_shm()
        if not shm:
            return
        self._put_col_in_shm(self, col, shm)

    def _on_column_replaced(self, col: str):
        from brmspy._singleton._shm_singleton import _get_shm

        shm = _get_shm()
        if not shm:
            return
        self._put_col_in_shm(self, col, shm, replace=True)

Attributes

_metadata = ['_shm_metadata'] class-attribute instance-attribute
_shm_metadata instance-attribute
_constructor property

Functions

_create_col_metadata(series, block, arr=None, **params) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def _create_col_metadata(
    cls, series: pd.Series, block: ShmRef, arr: np.ndarray | None = None, **params
) -> ShmSeriesMetadata:
    pd_dtype = series.dtype

    # Decide what ndarray is actually stored in SHM
    if isinstance(pd_dtype, pd.CategoricalDtype):
        # store dtype params
        params["categories"] = pd_dtype.categories.to_numpy(dtype=object).tolist()
        params["ordered"] = bool(pd_dtype.ordered)
        params["categories_dtype"] = pd_dtype.categories.dtype.name

        # IMPORTANT: store integer codes, not values
        # -1 means missing
        array = series.cat.codes.to_numpy(copy=False)
    elif arr is not None:
        assert isinstance(arr, np.ndarray)
        array = arr
    else:
        # for numeric-only SHM: require a real numpy array output here
        array = series.to_numpy(copy=False)

    # Optional param dtypes you mentioned (only if you support them)
    if isinstance(pd_dtype, pd.PeriodDtype):
        params["freq"] = str(pd_dtype.freq)

    if isinstance(pd_dtype, pd.IntervalDtype):
        params["subtype"] = np.dtype(pd_dtype.subtype).str
        params["closed"] = str(pd_dtype.closed)  # type: ignore[attr-defined]

    meta: ShmSeriesMetadata = {
        "name": series.name,
        "np_dtype": str(array.dtype),
        "pd_dtype": str(pd_dtype.name),
        "block": block,
        "params": params,
    }

    if np.dtype(meta["np_dtype"]) == np.dtype("O"):
        # Sanity check. If this goes wrong, it will be frustrating to debug
        if ShmArray.is_string_object(array, sample=25):
            raise Exception(
                f"{series.name} column is string, but stored as object!"
            )

    return meta
_set_col_raw(col, value)
Source code in brmspy/types/shm_extensions.py
def _set_col_raw(self, col: str, value) -> None:
    # bypass our __setitem__
    pd.DataFrame.__setitem__(self, col, value)
_set_shm_metadata(meta)
Source code in brmspy/types/shm_extensions.py
def _set_shm_metadata(self, meta: dict[str, ShmSeriesMetadata]):
    self._shm_metadata = meta
_put_col_in_shm(df, col, shm_pool, replace=False) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def _put_col_in_shm(
    cls, df: "ShmDataFrameColumns", col: str, shm_pool: Any, replace=False
):
    vals = df[col].to_numpy(copy=False)
    if (
        isinstance(vals, ShmArray)
        and hasattr(vals, "_shm_metadata")
        and not replace
    ):
        pass
    elif isinstance(vals, np.ndarray):
        if col in df._shm_metadata:
            del df._shm_metadata[col]
        arr_modified, ref, dtype, shape, order = ShmArray.to_shm(df[col], shm_pool)

        if arr_modified is not None:
            # Only needed for string-object normalization; for numeric/codes it's None
            df._set_col_raw(
                col, pd.Series(arr_modified, index=df.index, name=col, copy=False)
            )
        df._shm_metadata[col] = cls._create_col_metadata(df[col], ref, arr_modified)
        return
    else:
        print(
            f"Failed to update shm metadata for column '{col}' dtype {vals.dtype}"
        )
        return
_reconstruct_series(meta, block, nrows, index) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def _reconstruct_series(
    cls,
    meta: ShmSeriesMetadata,
    block: ShmBlock,
    nrows: int,
    index: list | None,
) -> pd.Series:
    col_name = meta["name"]
    col_name = str(col_name)
    dtype = np.dtype(meta["np_dtype"])
    pd_dtype = meta["pd_dtype"]
    params = meta["params"]

    arr = ShmArray.from_block(block=block, shape=(nrows,), dtype=dtype, order="C")

    if pd_dtype == "category":
        cats = params.get("categories", None)
        ordered = bool(params.get("ordered", False))

        if cats is None:
            raise ValueError("category dtype requires params['categories']")

        cats_pd_dtype = params.get("categories_dtype")
        if cats_pd_dtype is not None:
            cats_index = pd.Index(cats, dtype=str(cats_pd_dtype))
        else:
            cats_index = pd.Index(cats)

        cat_dtype = pd.CategoricalDtype(categories=cats_index, ordered=ordered)

        # arr should hold integer codes
        # If arr holds codes: build categorical from codes without copying codes.
        # Pandas uses -1 for missing.
        cat = pd.Categorical.from_codes(cast(Sequence[int], arr), dtype=cat_dtype)
        return pd.Series(cat, name=col_name, index=index)

    # 2) tz-aware datetimes
    # Expect arr to be int64 ns timestamps
    if pd_dtype.startswith("datetime64[ns,") or pd_dtype == "datetime64[ns, tz]":
        tz = params.get("tz")
        if not tz:
            # if stored as a plain datetime64[ns] string, just fall through
            pass
        else:
            dt = pd.to_datetime(arr, unit="ns", utc=True).tz_convert(tz)
            return pd.Series(dt, name=col_name, index=index)

    return pd.Series(arr, name=col_name, index=index)
__setitem__(key, value)
Source code in brmspy/types/shm_extensions.py
def __setitem__(self, key, value):
    is_existing = key in self.columns

    super().__setitem__(key, value)

    if is_existing:
        self._on_column_replaced(key)
    else:
        self._on_column_added(key)
_on_column_added(col)
Source code in brmspy/types/shm_extensions.py
def _on_column_added(self, col: str):
    from brmspy._singleton._shm_singleton import _get_shm

    shm = _get_shm()
    if not shm:
        return
    self._put_col_in_shm(self, col, shm)
_on_column_replaced(col)
Source code in brmspy/types/shm_extensions.py
def _on_column_replaced(self, col: str):
    from brmspy._singleton._shm_singleton import _get_shm

    shm = _get_shm()
    if not shm:
        return
    self._put_col_in_shm(self, col, shm, replace=True)

ShmDataFrameSimple

Bases: DataFrame

pandas DataFrame backed by a single shared-memory block (numeric only).

Attributes:

Name Type Description
block ShmRef

Reference to the shared-memory block backing the DataFrame's values.

Source code in brmspy/types/shm_extensions.py
class ShmDataFrameSimple(pd.DataFrame):
    """
    pandas DataFrame backed by a single shared-memory block (numeric only).

    Attributes
    ----------
    block : ShmRef
        Reference to the shared-memory block backing the DataFrame's values.
    """

    _metadata = ["_shm_metadata"]
    _shm_metadata: ShmRef

    @classmethod
    def from_block(
        cls,
        block: ShmBlock,
        nrows: int,
        ncols: int,
        columns: list[Any] | None,
        index: list[Any] | None,
        dtype: str | np.dtype,
    ) -> "ShmDataFrameSimple":
        """
        Construct a DataFrame backed by a single SHM block.

        Parameters
        ----------
        block : ShmBlock
            Attached shared-memory block containing a contiguous 2D numeric matrix.
        nrows, ncols : int
            DataFrame shape.
        columns, index : list[Any] or None
            Column/index labels.
        dtype : str or numpy.dtype
            Dtype of the matrix stored in the block.

        Returns
        -------
        ShmDataFrameSimple
        """
        _dtype = np.dtype(dtype)
        arr = ShmArray.from_block(shape=(ncols, nrows), dtype=_dtype, block=block)

        df = ShmDataFrameSimple(data=arr.T, index=index, columns=columns)
        df._set_shm_metadata(block.to_ref())
        return df

    def _set_shm_metadata(self, meta: ShmRef):
        self._shm_metadata = meta

Attributes

_metadata = ['_shm_metadata'] class-attribute instance-attribute
_shm_metadata instance-attribute

Functions

from_block(block, nrows, ncols, columns, index, dtype) classmethod

Construct a DataFrame backed by a single SHM block.

Parameters:

Name Type Description Default
block ShmBlock

Attached shared-memory block containing a contiguous 2D numeric matrix.

required
nrows int

DataFrame shape.

required
ncols int

DataFrame shape.

required
columns list[Any] or None

Column/index labels.

required
index list[Any] or None

Column/index labels.

required
dtype str or dtype

Dtype of the matrix stored in the block.

required

Returns:

Type Description
ShmDataFrameSimple
Source code in brmspy/types/shm_extensions.py
@classmethod
def from_block(
    cls,
    block: ShmBlock,
    nrows: int,
    ncols: int,
    columns: list[Any] | None,
    index: list[Any] | None,
    dtype: str | np.dtype,
) -> "ShmDataFrameSimple":
    """
    Construct a DataFrame backed by a single SHM block.

    Parameters
    ----------
    block : ShmBlock
        Attached shared-memory block containing a contiguous 2D numeric matrix.
    nrows, ncols : int
        DataFrame shape.
    columns, index : list[Any] or None
        Column/index labels.
    dtype : str or numpy.dtype
        Dtype of the matrix stored in the block.

    Returns
    -------
    ShmDataFrameSimple
    """
    _dtype = np.dtype(dtype)
    arr = ShmArray.from_block(shape=(ncols, nrows), dtype=_dtype, block=block)

    df = ShmDataFrameSimple(data=arr.T, index=index, columns=columns)
    df._set_shm_metadata(block.to_ref())
    return df
_set_shm_metadata(meta)
Source code in brmspy/types/shm_extensions.py
def _set_shm_metadata(self, meta: ShmRef):
    self._shm_metadata = meta

Functions

_get_rvector_memview(obj)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _get_rvector_memview(
    obj: Any,
) -> tuple[SexpVectorWithNumpyInterface | None, memoryview | None]:
    try:
        assert isinstance(obj, SexpVectorWithNumpyInterface) and isinstance(
            obj, SexpVector
        )  # assert types, shouldnt error by itself
        if hasattr(obj, "memoryview"):
            src = cast(Any, obj).memoryview()
            return obj, src
        else:
            return None, None
    except:
        return None, None

_get_rvector_types(obj)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _get_rvector_types(obj: Any) -> tuple[None | str, None | int]:
    if not isinstance(obj, SexpVectorWithNumpyInterface):
        return None, None

    dtypestr = obj._NP_TYPESTR
    itemsize = obj._R_SIZEOF_ELT

    if not dtypestr or not itemsize:
        return None, None

    return dtypestr, itemsize

_r2py_vector(obj, shm=None, allow_scalar=True)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _r2py_vector(
    obj: "Vector", shm: ShmPool | None = None, allow_scalar: bool | None = True
) -> PyObject:
    import rpy2.robjects as ro
    from rpy2.robjects import default_converter
    from rpy2.robjects.conversion import localconverter

    assert not isinstance(obj, ro.ListVector)

    if allow_scalar:
        obj_any = cast(Any, obj)
        # length 1 → scalar
        if obj_any.__len__ and len(obj_any) == 1:
            # Try default R→Python conversion
            with localconverter(default_converter) as cv:
                py = cv.rpy2py(obj[0])
            return py

    is_factor = isinstance(obj, ro.FactorVector)

    dtypestr, itemsize = _get_rvector_types(obj)
    rvecnp, src = _get_rvector_memview(obj)

    # fallback
    if not dtypestr or not itemsize or not shm or not rvecnp or not src:
        return _fallback_rvector_iter(obj)

    # numpy convertible
    N = len(rvecnp)
    expected_bytes = itemsize * N
    dtype = np.dtype(dtypestr)

    if src.nbytes != expected_bytes:
        raise RuntimeError(f"R vector bytes={src.nbytes}, expected={expected_bytes}")

    # Allocate shm once
    block = shm.alloc(expected_bytes)
    assert block.shm.buf

    # Single bulk copy: R → shm, no intermediate ndarray
    src_bytes = src.cast("B")
    block.shm.buf[:expected_bytes] = src_bytes

    arr = ShmArray.from_block(block=block, shape=(N,), dtype=dtype)
    if is_factor:
        return _to_pandas_factor(arr, obj)

    return arr

log_warning(msg, method_name=None)

Log a warning message.

Parameters:

Name Type Description Default
msg str

The warning message to log

required
method_name str

The name of the method/function. If None, will auto-detect from call stack.

None
Source code in brmspy/helpers/log.py
def log_warning(msg: str, method_name: str | None = None):
    """
    Log a warning message.

    Parameters
    ----------
    msg : str
        The warning message to log
    method_name : str, optional
        The name of the method/function. If None, will auto-detect from call stack.

    """
    log(msg, method_name=method_name, level=logging.WARNING)

_rmatrix_info(obj)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _rmatrix_info(obj: "Matrix") -> tuple[int, int, list[str] | None, list[str] | None]:
    nrow, ncol = obj.dim

    if obj.colnames != NULL:
        colnames = [str(el) for el in obj.colnames]
    else:
        colnames = None
    if obj.rownames != NULL:
        rownames = [str(el) for el in obj.rownames]
    else:
        rownames = None

    return nrow, ncol, rownames, colnames

_rmatrix_to_py_default(obj)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _rmatrix_to_py_default(obj: "Matrix") -> pd.DataFrame | np.ndarray:
    nrow, ncol, rownames, colnames = _rmatrix_info(obj)

    if not rownames and not colnames:
        return np.array(obj)

    df = pd.DataFrame(data=np.array(obj), columns=colnames, index=rownames)
    if "_obs_id_" in df.columns and not df["_obs_id_"].duplicated().any():
        df.index = df["_obs_id_"]
    elif "obs_id" in df.columns and not df["obs_id"].duplicated().any():
        df.index = df["obs_id"]
    return df

_rmatrix_to_py(obj, shm=None)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _rmatrix_to_py(
    obj: "Matrix", shm: ShmPool | None = None
) -> pd.DataFrame | np.ndarray | ShmArray | ShmDataFrameSimple:
    if len(obj.dim) != 2:
        raise Exception("Matrix with dims != 2. Unimplemented conversion")

    # No shm, fall back to regular numpy
    if shm is None:
        return np.array(obj)

    dtypestr, itemsize = _get_rvector_types(obj)

    if not dtypestr or not itemsize:
        return _rmatrix_to_py_default(obj)

    dtype = np.dtype(dtypestr)

    rvecnp, src = _get_rvector_memview(obj)
    if rvecnp is None or src is None:
        return _rmatrix_to_py_default(obj)

    nrow, ncol, rownames, colnames = _rmatrix_info(obj)

    expected_bytes = nrow * ncol * itemsize

    # Raw buffer view over R's underlying data (column-major)
    if src.nbytes != expected_bytes:
        raise RuntimeError(f"R matrix bytes={src.nbytes}, expected={expected_bytes}")

    # Allocate shm once
    block = shm.alloc(expected_bytes)
    assert block.shm.buf

    # Single bulk copy: R → shm, no intermediate ndarray
    src_bytes = src.cast("B")
    block.shm.buf[:expected_bytes] = src_bytes

    # Wrap shm buffer as a numpy array, matching R's column-major layout
    if not rownames and not colnames:
        return ShmArray.from_block(block=block, shape=(nrow, ncol), dtype=dtype)

    df = ShmDataFrameSimple.from_block(
        block=block,
        nrows=nrow,
        ncols=ncol,
        columns=colnames,
        index=rownames,
        dtype=dtype,
    )

    if "_obs_id_" in df.columns and not df["_obs_id_"].duplicated().any():
        df.index = df["_obs_id_"]
    elif "obs_id" in df.columns and not df["obs_id"].duplicated().any():
        df.index = df["obs_id"]

    return df

_r2py_matrix(obj, shm=None)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _r2py_matrix(obj: "Matrix", shm: ShmPool | None = None) -> PyObject:
    return _rmatrix_to_py(obj=obj, shm=shm)

_r2py_dataframe_fallback(obj)

Fallback conversion for R data.frame -> pandas.DataFrame.

Notes

In some environments, rpy2/pandas2ri may convert R factors to their underlying integer codes instead of pandas.Categorical. Since brmspy relies on factors roundtripping as categoricals (and we have custom factor handling in _r2py_vector()), we patch factor columns explicitly here.

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _r2py_dataframe_fallback(obj: "DataFrame") -> PyObject:
    """
    Fallback conversion for R data.frame -> pandas.DataFrame.

    Notes
    -----
    In some environments, rpy2/pandas2ri may convert R factors to their underlying
    integer codes instead of `pandas.Categorical`. Since brmspy relies on factors
    roundtripping as categoricals (and we have custom factor handling in
    [`_r2py_vector()`][brmspy.helpers._rpy2._converters._vectors._r2py_vector]),
    we patch factor columns explicitly here.
    """
    import rpy2.robjects as ro
    from rpy2.robjects import pandas2ri
    from rpy2.robjects.conversion import localconverter

    with localconverter(pandas2ri.converter) as cv:
        df = cv.rpy2py(obj)

    # Ensure factor columns come back as pandas categoricals.
    # (Otherwise they can appear as int32 codes with NA_INTEGER sentinel values.)
    try:
        if obj.names is not NULL:
            for name in list(obj.names):
                col_name = str(name)
                if col_name not in df.columns:
                    continue

                col_r = obj.rx2(name)
                if isinstance(col_r, ro.FactorVector):
                    cat = _r2py_vector(col_r, shm=None, allow_scalar=False)
                    df[col_name] = pd.Series(
                        cast(np.ndarray, cat), index=df.index, name=col_name, copy=False
                    )
    except Exception:
        # Best-effort: never let fallback conversion fail due to factor patching.
        pass

    return _adjust_df_for_py(df)

_r2py_dataframe(obj, shm=None)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _r2py_dataframe(obj: "DataFrame", shm: ShmPool | None = None) -> PyObject:
    if not shm:
        return _r2py_dataframe_fallback(obj)

    try:
        from rpy2.robjects.pandas2ri import _flatten_dataframe

        rownames = list(obj.rownames)
        if len(rownames) == 0:
            return pd.DataFrame({})

        # convert straight into ShmDataframeColumns
        colnames_lst = []
        od = OrderedDict()
        od_r = OrderedDict()
        for i, col in enumerate(_flatten_dataframe(obj, colnames_lst)):
            arr = _r2py_vector(col, shm, allow_scalar=False)
            od_r[i] = col
            od[i] = arr

        res = ShmDataFrameColumns(od)
        res.columns = tuple(
            ".".join(_) if isinstance(_, list) else _ for _ in colnames_lst
        )
        res.index = rownames
        res._set_shm_metadata({})

        if res.empty:
            return res

        for col in res.columns:
            # Side-effect: ShmDataFrameColumns overrides __setitem__/events;
            # assigning the same Series forces the "put column to SHM" path
            # DO NOT REMOVE!
            res[col] = res[col]
        res = _adjust_df_for_py(res)

        return res
    except Exception as e:
        log_warning(
            f"ShmDataFrameColumns conversion failed, falling back to default. Reason: {e}"
        )
        return _r2py_dataframe_fallback(obj)

_adjust_df_for_r(obj)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _adjust_df_for_r(obj: pd.DataFrame) -> pd.DataFrame:
    if "_obs_id_" not in obj.columns:
        if obj.index.nlevels == 1:
            obj["_obs_id_"] = obj.index
        else:
            obj = obj.assign(_obs_id_=range(len(obj)))

    for c in obj.columns:
        s = obj[c]
        if isinstance(s.dtype, pd.CategoricalDtype):
            cats = s.dtype.categories
            # needs string categories for factor conversion
            if cats.inferred_type != "string":
                # log_warning(
                #    f"Column {c} has non-string categories, converting to string."
                # )
                obj[c] = s.cat.rename_categories(cats.map(str))

        elif pd.api.types.is_integer_dtype(s.dtype) and s.dtype != np.int32:
            obj[c] = obj[c].astype("int32")
        elif pd.api.types.is_float_dtype(s.dtype) and s.dtype != np.float64:
            obj[c] = obj[c].astype("float64")

    return obj

_adjust_df_for_py(df)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _adjust_df_for_py(df: pd.DataFrame) -> pd.DataFrame:
    if "_obs_id_" in df.columns:
        df = df.set_index("_obs_id_", drop=True)
    return df

_py2r_dataframe(obj)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _py2r_dataframe(obj: pd.DataFrame) -> Sexp:
    from rpy2.robjects import pandas2ri
    from rpy2.robjects.conversion import localconverter

    obj = _adjust_df_for_r(obj)

    with localconverter(pandas2ri.converter) as cv:
        return cv.py2rpy(obj)

_py2r_numpy(obj)

Source code in brmspy/helpers/_rpy2/_converters/_arrays.py
def _py2r_numpy(obj: np.ndarray) -> Sexp:
    from rpy2.robjects import numpy2ri, pandas2ri
    from rpy2.robjects.conversion import localconverter

    with localconverter(pandas2ri.converter + numpy2ri.converter) as cv:
        return cv.py2rpy(obj)