Skip to content

base

Classes

Encoder

Bases: Protocol

Protocol implemented by codecs in the session codec registry.

Source code in brmspy/types/session.py
@runtime_checkable
class Encoder(Protocol):
    """
    Protocol implemented by codecs in the session codec registry.
    """

    def can_encode(self, obj: Any) -> bool: ...

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult: ...

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args: Any,
    ) -> Any: ...

Functions

can_encode(obj)
Source code in brmspy/types/session.py
def can_encode(self, obj: Any) -> bool: ...
encode(obj, shm_pool)
Source code in brmspy/types/session.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult: ...
decode(payload, get_buf, *args)
Source code in brmspy/types/session.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args: Any,
) -> Any: ...

EncodeResult dataclass

Result of encoding a Python value for IPC transfer.

Attributes:

Name Type Description
codec str

Codec identifier.

meta dict[str, Any]

JSON-serializable metadata required for decoding.

buffers list[ShmRef]

Shared-memory blocks backing the encoded payload.

Source code in brmspy/types/session.py
@dataclass
class EncodeResult:
    """
    Result of encoding a Python value for IPC transfer.

    Attributes
    ----------
    codec : str
        Codec identifier.
    meta : dict[str, Any]
        JSON-serializable metadata required for decoding.
    buffers : list[ShmRef]
        Shared-memory blocks backing the encoded payload.
    """

    codec: str
    meta: dict[str, Any]
    buffers: list[ShmRef]

Attributes

codec instance-attribute
meta instance-attribute
buffers instance-attribute

Functions

__init__(codec, meta, buffers)

PayloadRef

Bases: TypedDict

Encoded argument/result payload sent over the control pipe.

A payload is:

  • codec: the codec identifier used by the registry
  • meta: JSON-serializable metadata needed to reconstruct the value
  • buffers: shared-memory buffer references backing the payload
Source code in brmspy/types/session.py
class PayloadRef(TypedDict):
    """
    Encoded argument/result payload sent over the control pipe.

    A payload is:

    - `codec`: the codec identifier used by the registry
    - `meta`: JSON-serializable metadata needed to reconstruct the value
    - `buffers`: shared-memory buffer references backing the payload
    """

    codec: str
    meta: dict[str, Any]
    buffers: list[ShmRef]

Attributes

codec instance-attribute
meta instance-attribute
buffers instance-attribute

ShmBlock dataclass

Attached shared-memory block (name/size + live SharedMemory handle).

Notes

This object owns a SharedMemory handle and must be closed when no longer needed. In brmspy this is managed by a ShmPool implementation.

Source code in brmspy/types/shm.py
@dataclass
class ShmBlock:
    """
    Attached shared-memory block (name/size + live `SharedMemory` handle).

    Notes
    -----
    This object owns a `SharedMemory` handle and must be closed when no longer
    needed. In brmspy this is managed by a `ShmPool` implementation.
    """

    name: str
    size: int
    content_size: int
    shm: SharedMemory
    temporary: bool

    def to_ref(self) -> ShmRef:
        return {
            "name": self.name,
            "size": self.size,
            "content_size": self.content_size,
            "temporary": self.temporary,
        }

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
shm instance-attribute
temporary instance-attribute

Functions

to_ref()
Source code in brmspy/types/shm.py
def to_ref(self) -> ShmRef:
    return {
        "name": self.name,
        "size": self.size,
        "content_size": self.content_size,
        "temporary": self.temporary,
    }
__init__(name, size, content_size, shm, temporary)

ShmRef

Bases: TypedDict

Reference to a shared-memory block sent over IPC.

Attributes:

Name Type Description
name str

Shared memory block name (as assigned by SharedMemoryManager).

size int

Allocated block size in bytes.

content_size int

Actual used size

temporary bool

Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.

Notes

Codecs may store a logical payload smaller than size. In that case, the codec metadata must include the logical nbytes/length so that decoders can slice the buffer appropriately.

Source code in brmspy/types/shm.py
class ShmRef(TypedDict):
    """
    Reference to a shared-memory block sent over IPC.

    Attributes
    ----------
    name : str
        Shared memory block name (as assigned by `SharedMemoryManager`).
    size : int
        Allocated block size in bytes.
    content_size : int
        Actual used size
    temporary : bool
        Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.

    Notes
    -----
    Codecs may store a logical payload smaller than `size`. In that case, the
    codec metadata must include the logical `nbytes`/length so that decoders can
    slice the buffer appropriately.
    """

    name: str
    size: int
    content_size: int
    temporary: bool

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
temporary instance-attribute

CodecRegistry

Ordered registry of encoders used for IPC serialization.

Source code in brmspy/_session/codec/base.py
class CodecRegistry:
    """Ordered registry of encoders used for IPC serialization."""

    def __init__(self) -> None:
        self._by_codec: dict[str, Encoder] = {}
        self._encoders: list[Encoder] = []

    def register(self, encoder: Encoder) -> None:
        """
        Register an encoder instance.

        Parameters
        ----------
        encoder : brmspy.types.session.Encoder
            Encoder to register. Its `codec` attribute is used as the key when present,
            otherwise the class name is used.
        """
        if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
            codec_name = encoder.codec  # type: ignore
        else:
            codec_name = type(encoder).__name__
        self._by_codec[codec_name] = encoder
        encoder.codec = codec_name  # type: ignore
        self._encoders.append(encoder)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        """
        Encode an object by selecting the first encoder that accepts it.

        Parameters
        ----------
        obj : Any
            Value to encode.
        shm_pool : Any
            SHM pool used by codecs for allocating buffers.

        Returns
        -------
        brmspy.types.session.EncodeResult
        """
        for enc in self._encoders:
            if enc.can_encode(obj):
                res = enc.encode(obj, shm_pool)
                if not res.codec:
                    res.codec = type(enc).__name__
                return res

        # fallback to pickle
        if "PickleCodec" not in self._by_codec:
            raise RuntimeError("No pickle codec registered")
        return self._by_codec["PickleCodec"].encode(obj, shm_pool)

    def decode(
        self,
        payload: PayloadRef,
        shm_pool: Any,
    ) -> Any:
        """
        Decode a payload using a named codec.

        Parameters
        ----------
        codec : str
            Codec identifier previously returned by `encode()`.
        meta : dict[str, Any]
            Codec metadata.
        buffers : list[memoryview]
            Memoryviews for attached SHM buffers.
        buffer_specs : list[dict]
            Original buffer specs (name/size) corresponding to `buffers`.
        shm_pool : Any
            SHM pool (some codecs may attach additional buffers).

        Returns
        -------
        Any
        """
        codec = payload["codec"]
        if codec not in self._by_codec:
            raise ValueError(
                f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
            )

        buffers = []

        @contextmanager
        def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
            buf = shm_pool.attach(ref)
            memview = memoryview(buf.shm.buf)
            view = memview[: ref["content_size"]].cast("B")

            try:
                if not ref["temporary"]:
                    # non-temporary buffers are associated with columns / objects
                    buffers.append(buf)

                yield buf, view

            finally:
                # deterministic cleanup for temporary buffers
                if ref["temporary"]:
                    # IMPORTANT: release view before closing shm
                    try:
                        view.release()
                        memview.release()
                        shm_pool.gc(ref["name"])
                        buf.shm.close()
                    except:
                        pass

        value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
        self._attach_shm_lifetime(value, buffers)

        return value

    @classmethod
    def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
        """Keep SHM blocks alive as long as `obj` is alive."""
        if not shms:
            return
        if obj is None or isinstance(obj, (bool, str, int, float)):
            return

        try:
            weakref.finalize(obj, _noop, tuple(shms))
        except:
            return

Attributes

_by_codec = {} instance-attribute
_encoders = [] instance-attribute

Functions

__init__()
Source code in brmspy/_session/codec/base.py
def __init__(self) -> None:
    self._by_codec: dict[str, Encoder] = {}
    self._encoders: list[Encoder] = []
register(encoder)

Register an encoder instance.

Parameters:

Name Type Description Default
encoder Encoder

Encoder to register. Its codec attribute is used as the key when present, otherwise the class name is used.

required
Source code in brmspy/_session/codec/base.py
def register(self, encoder: Encoder) -> None:
    """
    Register an encoder instance.

    Parameters
    ----------
    encoder : brmspy.types.session.Encoder
        Encoder to register. Its `codec` attribute is used as the key when present,
        otherwise the class name is used.
    """
    if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
        codec_name = encoder.codec  # type: ignore
    else:
        codec_name = type(encoder).__name__
    self._by_codec[codec_name] = encoder
    encoder.codec = codec_name  # type: ignore
    self._encoders.append(encoder)
encode(obj, shm_pool)

Encode an object by selecting the first encoder that accepts it.

Parameters:

Name Type Description Default
obj Any

Value to encode.

required
shm_pool Any

SHM pool used by codecs for allocating buffers.

required

Returns:

Type Description
EncodeResult
Source code in brmspy/_session/codec/base.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    """
    Encode an object by selecting the first encoder that accepts it.

    Parameters
    ----------
    obj : Any
        Value to encode.
    shm_pool : Any
        SHM pool used by codecs for allocating buffers.

    Returns
    -------
    brmspy.types.session.EncodeResult
    """
    for enc in self._encoders:
        if enc.can_encode(obj):
            res = enc.encode(obj, shm_pool)
            if not res.codec:
                res.codec = type(enc).__name__
            return res

    # fallback to pickle
    if "PickleCodec" not in self._by_codec:
        raise RuntimeError("No pickle codec registered")
    return self._by_codec["PickleCodec"].encode(obj, shm_pool)
decode(payload, shm_pool)

Decode a payload using a named codec.

Parameters:

Name Type Description Default
codec str

Codec identifier previously returned by encode().

required
meta dict[str, Any]

Codec metadata.

required
buffers list[memoryview]

Memoryviews for attached SHM buffers.

required
buffer_specs list[dict]

Original buffer specs (name/size) corresponding to buffers.

required
shm_pool Any

SHM pool (some codecs may attach additional buffers).

required

Returns:

Type Description
Any
Source code in brmspy/_session/codec/base.py
def decode(
    self,
    payload: PayloadRef,
    shm_pool: Any,
) -> Any:
    """
    Decode a payload using a named codec.

    Parameters
    ----------
    codec : str
        Codec identifier previously returned by `encode()`.
    meta : dict[str, Any]
        Codec metadata.
    buffers : list[memoryview]
        Memoryviews for attached SHM buffers.
    buffer_specs : list[dict]
        Original buffer specs (name/size) corresponding to `buffers`.
    shm_pool : Any
        SHM pool (some codecs may attach additional buffers).

    Returns
    -------
    Any
    """
    codec = payload["codec"]
    if codec not in self._by_codec:
        raise ValueError(
            f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
        )

    buffers = []

    @contextmanager
    def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
        buf = shm_pool.attach(ref)
        memview = memoryview(buf.shm.buf)
        view = memview[: ref["content_size"]].cast("B")

        try:
            if not ref["temporary"]:
                # non-temporary buffers are associated with columns / objects
                buffers.append(buf)

            yield buf, view

        finally:
            # deterministic cleanup for temporary buffers
            if ref["temporary"]:
                # IMPORTANT: release view before closing shm
                try:
                    view.release()
                    memview.release()
                    shm_pool.gc(ref["name"])
                    buf.shm.close()
                except:
                    pass

    value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
    self._attach_shm_lifetime(value, buffers)

    return value
_attach_shm_lifetime(obj, shms) classmethod

Keep SHM blocks alive as long as obj is alive.

Source code in brmspy/_session/codec/base.py
@classmethod
def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
    """Keep SHM blocks alive as long as `obj` is alive."""
    if not shms:
        return
    if obj is None or isinstance(obj, (bool, str, int, float)):
        return

    try:
        weakref.finalize(obj, _noop, tuple(shms))
    except:
        return

Functions

_noop(_blocks)

Source code in brmspy/_session/codec/base.py
def _noop(_blocks):
    pass