Skip to content

_vectors

Attributes

PyObject = Union[dict, list, str, float, int, np.dtype, None, Any, pd.DataFrame, pd.Series, np.ndarray, az.InferenceData, xr.DataArray, xr.Dataset] module-attribute

Union of common Python-side objects produced by R→Python conversion.

This is intentionally broad: brmspy frequently returns standard scientific Python types (NumPy/pandas/xarray/ArviZ), plus plain dict/list primitives.

Note

Avoid adding Any here unless absolutely necessary; it defeats the purpose of having this alias.

Classes

ShmArray

Bases: ndarray

NumPy array view backed by a shared-memory block.

Attributes:

Name Type Description
block ShmRef

Reference to the shared-memory block backing the array data.

Notes

This is a view over SharedMemory.buf. Closing/unlinking the underlying shared memory while the array is still in use will lead to undefined behavior.

Source code in brmspy/types/shm_extensions.py
class ShmArray(np.ndarray):
    """
    NumPy array view backed by a shared-memory block.

    Attributes
    ----------
    block : ShmRef
        Reference to the shared-memory block backing the array data.

    Notes
    -----
    This is a *view* over `SharedMemory.buf`. Closing/unlinking the underlying
    shared memory while the array is still in use will lead to undefined
    behavior.
    """

    _shm_metadata: ShmRef  # for type checkers

    @classmethod
    def from_metadata(
        cls, meta: ShmArrayMetadata | dict[str, Any], block: ShmBlock
    ) -> np.ndarray:
        dtype = np.dtype(meta["dtype"])
        shape = tuple(meta["shape"])
        order = meta["order"]

        return ShmArray.from_block(block=block, shape=shape, dtype=dtype, order=order)

    @classmethod
    def from_block(
        cls, block: ShmBlock, shape: tuple[int, ...], dtype: np.dtype, **kwargs
    ) -> Union["ShmArray", np.ndarray]:
        """
        Create an array view backed by an existing shared-memory block.

        Parameters
        ----------
        block : ShmBlock
            Attached shared-memory block.
        shape : tuple[int, ...]
            Desired array shape.
        dtype : numpy.dtype
            NumPy dtype of the array.
        **kwargs
            Reserved for future compatibility. Currently unused.

        Returns
        -------
        ShmArray
            Array view into the shared-memory buffer.
        """
        is_object = np.dtype(dtype) == np.dtype("O")

        if not is_object:
            if block.shm.buf:
                view = memoryview(block.shm.buf)
                view = view[: block.content_size]
            else:
                view = None
            base = np.ndarray(
                shape=shape,
                dtype=dtype,
                buffer=view,
                order=kwargs.get("order", "F"),
            )
            obj = base.view(ShmArray)
            obj._shm_metadata = block.to_ref()
        else:
            assert block.shm.buf
            view = memoryview(block.shm.buf)
            view = view[: block.content_size]
            payload = bytes(view)
            obj = pickle.loads(payload)
            assert isinstance(obj, np.ndarray)

        return obj

    @classmethod
    def array_order(cls, a: np.ndarray) -> Literal["C", "F", "non-contiguous"]:
        """
        Determine how an array can be reconstructed from a raw buffer.

        Returns `"C"` for C-contiguous arrays, `"F"` for Fortran-contiguous arrays,
        otherwise `"non-contiguous"` (meaning: bytes were obtained by forcing
        a contiguous copy during encoding).
        """
        if a.flags["C_CONTIGUOUS"]:
            return "C"
        if a.flags["F_CONTIGUOUS"]:
            return "F"
        return "non-contiguous"

    @classmethod
    def is_string_object(cls, a: np.ndarray, sample: int = 1000):
        if np.dtype(a.dtype) != np.dtype("O"):
            return False
        it = a.flat
        for _ in range(min(sample, a.size)):
            v = next(it, None)
            if v is not None and not isinstance(v, str):
                return False
        return True

    @classmethod
    def to_shm(
        cls, obj: np.ndarray | pd.Series | list, shm_pool: Any
    ) -> tuple[np.ndarray | None, ShmRef, str, list[int], str]:
        if isinstance(obj, pd.Series):
            if isinstance(obj.dtype, pd.CategoricalDtype):
                arr = obj.cat.codes.to_numpy(copy=False)
            else:
                arr = obj.to_numpy(copy=False)
        elif not isinstance(obj, np.ndarray):
            arr = np.asarray(obj)
        else:
            arr = obj

        is_object = np.dtype(arr.dtype) == np.dtype("O")
        is_string = cls.is_string_object(arr)

        arr_modified = None
        if isinstance(arr, ShmArray):
            ref = arr._shm_metadata

        else:
            temporary = False
            if not is_object:
                data = arr.tobytes(order="C")
            elif is_string:
                arr = arr.astype("U")
                arr_modified = arr
                data = arr.tobytes(order="C")
            else:
                data = pickle.dumps(arr, protocol=pickle.HIGHEST_PROTOCOL)
                temporary = True

            nbytes = len(data)

            # Ask for exactly nbytes; OS may round up internally, that's fine.
            block = shm_pool.alloc(nbytes, temporary=temporary)
            block.shm.buf[:nbytes] = data
            ref = block.to_ref()

        ref, dtype, shape, order = (
            ref,
            str(arr.dtype),
            list(arr.shape),
            cls.array_order(arr),
        )

        return arr_modified, ref, dtype, shape, order

Attributes

_shm_metadata instance-attribute

Functions

from_metadata(meta, block) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def from_metadata(
    cls, meta: ShmArrayMetadata | dict[str, Any], block: ShmBlock
) -> np.ndarray:
    dtype = np.dtype(meta["dtype"])
    shape = tuple(meta["shape"])
    order = meta["order"]

    return ShmArray.from_block(block=block, shape=shape, dtype=dtype, order=order)
from_block(block, shape, dtype, **kwargs) classmethod

Create an array view backed by an existing shared-memory block.

Parameters:

Name Type Description Default
block ShmBlock

Attached shared-memory block.

required
shape tuple[int, ...]

Desired array shape.

required
dtype dtype

NumPy dtype of the array.

required
**kwargs

Reserved for future compatibility. Currently unused.

{}

Returns:

Type Description
ShmArray

Array view into the shared-memory buffer.

Source code in brmspy/types/shm_extensions.py
@classmethod
def from_block(
    cls, block: ShmBlock, shape: tuple[int, ...], dtype: np.dtype, **kwargs
) -> Union["ShmArray", np.ndarray]:
    """
    Create an array view backed by an existing shared-memory block.

    Parameters
    ----------
    block : ShmBlock
        Attached shared-memory block.
    shape : tuple[int, ...]
        Desired array shape.
    dtype : numpy.dtype
        NumPy dtype of the array.
    **kwargs
        Reserved for future compatibility. Currently unused.

    Returns
    -------
    ShmArray
        Array view into the shared-memory buffer.
    """
    is_object = np.dtype(dtype) == np.dtype("O")

    if not is_object:
        if block.shm.buf:
            view = memoryview(block.shm.buf)
            view = view[: block.content_size]
        else:
            view = None
        base = np.ndarray(
            shape=shape,
            dtype=dtype,
            buffer=view,
            order=kwargs.get("order", "F"),
        )
        obj = base.view(ShmArray)
        obj._shm_metadata = block.to_ref()
    else:
        assert block.shm.buf
        view = memoryview(block.shm.buf)
        view = view[: block.content_size]
        payload = bytes(view)
        obj = pickle.loads(payload)
        assert isinstance(obj, np.ndarray)

    return obj
array_order(a) classmethod

Determine how an array can be reconstructed from a raw buffer.

Returns "C" for C-contiguous arrays, "F" for Fortran-contiguous arrays, otherwise "non-contiguous" (meaning: bytes were obtained by forcing a contiguous copy during encoding).

Source code in brmspy/types/shm_extensions.py
@classmethod
def array_order(cls, a: np.ndarray) -> Literal["C", "F", "non-contiguous"]:
    """
    Determine how an array can be reconstructed from a raw buffer.

    Returns `"C"` for C-contiguous arrays, `"F"` for Fortran-contiguous arrays,
    otherwise `"non-contiguous"` (meaning: bytes were obtained by forcing
    a contiguous copy during encoding).
    """
    if a.flags["C_CONTIGUOUS"]:
        return "C"
    if a.flags["F_CONTIGUOUS"]:
        return "F"
    return "non-contiguous"
is_string_object(a, sample=1000) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def is_string_object(cls, a: np.ndarray, sample: int = 1000):
    if np.dtype(a.dtype) != np.dtype("O"):
        return False
    it = a.flat
    for _ in range(min(sample, a.size)):
        v = next(it, None)
        if v is not None and not isinstance(v, str):
            return False
    return True
to_shm(obj, shm_pool) classmethod
Source code in brmspy/types/shm_extensions.py
@classmethod
def to_shm(
    cls, obj: np.ndarray | pd.Series | list, shm_pool: Any
) -> tuple[np.ndarray | None, ShmRef, str, list[int], str]:
    if isinstance(obj, pd.Series):
        if isinstance(obj.dtype, pd.CategoricalDtype):
            arr = obj.cat.codes.to_numpy(copy=False)
        else:
            arr = obj.to_numpy(copy=False)
    elif not isinstance(obj, np.ndarray):
        arr = np.asarray(obj)
    else:
        arr = obj

    is_object = np.dtype(arr.dtype) == np.dtype("O")
    is_string = cls.is_string_object(arr)

    arr_modified = None
    if isinstance(arr, ShmArray):
        ref = arr._shm_metadata

    else:
        temporary = False
        if not is_object:
            data = arr.tobytes(order="C")
        elif is_string:
            arr = arr.astype("U")
            arr_modified = arr
            data = arr.tobytes(order="C")
        else:
            data = pickle.dumps(arr, protocol=pickle.HIGHEST_PROTOCOL)
            temporary = True

        nbytes = len(data)

        # Ask for exactly nbytes; OS may round up internally, that's fine.
        block = shm_pool.alloc(nbytes, temporary=temporary)
        block.shm.buf[:nbytes] = data
        ref = block.to_ref()

    ref, dtype, shape, order = (
        ref,
        str(arr.dtype),
        list(arr.shape),
        cls.array_order(arr),
    )

    return arr_modified, ref, dtype, shape, order

ShmPool

Minimal interface for allocating and attaching shared-memory blocks.

The concrete implementation lives in brmspy._session.transport.ShmPool and tracks blocks so they can be closed on teardown.

Source code in brmspy/types/shm.py
class ShmPool:
    """
    Minimal interface for allocating and attaching shared-memory blocks.

    The concrete implementation lives in
    [`brmspy._session.transport.ShmPool`][brmspy._session.transport.ShmPool] and tracks
    blocks so they can be closed on teardown.
    """

    def __init__(self, manager: SharedMemoryManager) -> None:
        """
        Create a pool bound to an existing `SharedMemoryManager`.

        Parameters
        ----------
        manager : multiprocessing.managers.SharedMemoryManager
            Manager used to allocate blocks.
        """
        ...

    def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
        """
        Allocate a new shared-memory block.

        Parameters
        ----------
        size : int
            Size in bytes.

        Returns
        -------
        ShmBlock
            Newly allocated block.
        """
        ...

    def attach(self, ref: ShmRef) -> ShmBlock:
        """
        Attach to an existing shared-memory block by name.

        Returns
        -------
        ShmBlock
            Attached block.
        """
        ...

    def close_all(self) -> None:
        """
        Close all tracked shared-memory handles owned by this pool.

        Returns
        -------
        None
        """
        ...

    def gc(self, name: str | None = None) -> None: ...

Functions

__init__(manager)

Create a pool bound to an existing SharedMemoryManager.

Parameters:

Name Type Description Default
manager SharedMemoryManager

Manager used to allocate blocks.

required
Source code in brmspy/types/shm.py
def __init__(self, manager: SharedMemoryManager) -> None:
    """
    Create a pool bound to an existing `SharedMemoryManager`.

    Parameters
    ----------
    manager : multiprocessing.managers.SharedMemoryManager
        Manager used to allocate blocks.
    """
    ...
alloc(size, temporary=False)

Allocate a new shared-memory block.

Parameters:

Name Type Description Default
size int

Size in bytes.

required

Returns:

Type Description
ShmBlock

Newly allocated block.

Source code in brmspy/types/shm.py
def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
    """
    Allocate a new shared-memory block.

    Parameters
    ----------
    size : int
        Size in bytes.

    Returns
    -------
    ShmBlock
        Newly allocated block.
    """
    ...
attach(ref)

Attach to an existing shared-memory block by name.

Returns:

Type Description
ShmBlock

Attached block.

Source code in brmspy/types/shm.py
def attach(self, ref: ShmRef) -> ShmBlock:
    """
    Attach to an existing shared-memory block by name.

    Returns
    -------
    ShmBlock
        Attached block.
    """
    ...
close_all()

Close all tracked shared-memory handles owned by this pool.

Returns:

Type Description
None
Source code in brmspy/types/shm.py
def close_all(self) -> None:
    """
    Close all tracked shared-memory handles owned by this pool.

    Returns
    -------
    None
    """
    ...
gc(name=None)
Source code in brmspy/types/shm.py
def gc(self, name: str | None = None) -> None: ...

Functions

_get_rvector_types(obj)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _get_rvector_types(obj: Any) -> tuple[None | str, None | int]:
    if not isinstance(obj, SexpVectorWithNumpyInterface):
        return None, None

    dtypestr = obj._NP_TYPESTR
    itemsize = obj._R_SIZEOF_ELT

    if not dtypestr or not itemsize:
        return None, None

    return dtypestr, itemsize

_get_rvector_memview(obj)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _get_rvector_memview(
    obj: Any,
) -> tuple[SexpVectorWithNumpyInterface | None, memoryview | None]:
    try:
        assert isinstance(obj, SexpVectorWithNumpyInterface) and isinstance(
            obj, SexpVector
        )  # assert types, shouldnt error by itself
        if hasattr(obj, "memoryview"):
            src = cast(Any, obj).memoryview()
            return obj, src
        else:
            return None, None
    except:
        return None, None

_r2py_listvector(obj, shm=None)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _r2py_listvector(
    obj: "ListVector", shm: ShmPool | None = None
) -> dict[str, PyObject] | list[PyObject]:
    from ._dispatch import r_to_py

    names = list(obj.names) if obj.names is not NULL else None

    # Named list → dict
    if names and any(n is not NULL and n != "" for n in names):
        result = {}
        for name in names:
            key = str(name) if name not in (None, "") else None
            if obj is NULL:
                result[key] = None
            else:
                result[key] = r_to_py(obj.rx2(name))
        return result

    # Unnamed → list
    return [r_to_py(el) for el in obj]

_fallback_rvector_iter(obj)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _fallback_rvector_iter(obj):
    from rpy2.robjects.conversion import localconverter
    from rpy2.robjects import default_converter, FactorVector

    is_factor = isinstance(obj, FactorVector)
    if is_factor:
        return _to_pandas_factor(np.array(obj), obj)

    out = []
    with localconverter(default_converter) as cv:
        for el in obj:
            py = cv.rpy2py(el)
            out.append(py)

    return out

_to_pandas_factor(arr, obj_r)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _to_pandas_factor(arr: Any, obj_r: "FactorVector"):
    import rpy2.robjects as ro

    # R factors are 1-based integer codes with missing values represented as NA_INTEGER
    # (a negative sentinel). pandas expects 0-based codes with -1 for missing.
    if not isinstance(arr, np.ndarray):
        arr = np.asarray(arr)

    # Avoid int32 overflow on NA_INTEGER when shifting codes:
    # only positive values are valid factor codes.
    valid = arr > 0
    arr = arr.astype(np.int32, copy=False)
    arr[~valid] = -1
    arr[valid] -= 1

    res = pd.Categorical.from_codes(
        arr,
        categories=cast(pd.Index, list(cast(ro.ListVector, obj_r.do_slot("levels")))),
        ordered="ordered" in obj_r.rclass,
    )
    return res

_r2py_vector(obj, shm=None, allow_scalar=True)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _r2py_vector(
    obj: "Vector", shm: ShmPool | None = None, allow_scalar: bool | None = True
) -> PyObject:
    import rpy2.robjects as ro
    from rpy2.robjects import default_converter
    from rpy2.robjects.conversion import localconverter

    assert not isinstance(obj, ro.ListVector)

    if allow_scalar:
        obj_any = cast(Any, obj)
        # length 1 → scalar
        if obj_any.__len__ and len(obj_any) == 1:
            # Try default R→Python conversion
            with localconverter(default_converter) as cv:
                py = cv.rpy2py(obj[0])
            return py

    is_factor = isinstance(obj, ro.FactorVector)

    dtypestr, itemsize = _get_rvector_types(obj)
    rvecnp, src = _get_rvector_memview(obj)

    # fallback
    if not dtypestr or not itemsize or not shm or not rvecnp or not src:
        return _fallback_rvector_iter(obj)

    # numpy convertible
    N = len(rvecnp)
    expected_bytes = itemsize * N
    dtype = np.dtype(dtypestr)

    if src.nbytes != expected_bytes:
        raise RuntimeError(f"R vector bytes={src.nbytes}, expected={expected_bytes}")

    # Allocate shm once
    block = shm.alloc(expected_bytes)
    assert block.shm.buf

    # Single bulk copy: R → shm, no intermediate ndarray
    src_bytes = src.cast("B")
    block.shm.buf[:expected_bytes] = src_bytes

    arr = ShmArray.from_block(block=block, shape=(N,), dtype=dtype)
    if is_factor:
        return _to_pandas_factor(arr, obj)

    return arr

_py2r_list(obj)

Source code in brmspy/helpers/_rpy2/_converters/_vectors.py
def _py2r_list(obj: list | tuple) -> Sexp:
    import rpy2.robjects as ro
    from rpy2.robjects import default_converter, numpy2ri, pandas2ri
    from rpy2.robjects.conversion import localconverter

    if not obj:
        return ro.ListVector({})

    if isinstance(obj, tuple):
        obj = list(obj)

    from ._dispatch import py_to_r

    if all(isinstance(el, Mapping) for el in obj):
        # R lists are usually named or indexed; use 1-based index names
        converted = {str(i + 1): py_to_r(el) for i, el in enumerate(obj)}
        return ro.ListVector(converted)

    # Homogeneous scalar lists → atomic R vectors (c(...))
    # Strings
    if all(isinstance(el, str) for el in obj):
        return ro.StrVector(list(obj))

    # Booleans
    if all(isinstance(el, bool) for el in obj):
        return ro.BoolVector(list(obj))

    # Integers (avoid treating bools as ints)
    if all(
        isinstance(el, (int, np.integer)) and not isinstance(el, bool) for el in obj
    ):
        return ro.IntVector(list(obj))

    # Numeric (mix of ints/floats/bools) → R "numeric" (double) vector
    if all(isinstance(el, (int, float, np.integer, np.floating, bool)) for el in obj):
        return ro.FloatVector([float(el) for el in obj])

    # mixed / other lists: let rpy2 decide (vectors, lists, etc.)
    with localconverter(
        default_converter + pandas2ri.converter + numpy2ri.converter
    ) as cv:
        return cv.py2rpy(obj)