Skip to content

dataclass

Dataclass codec registration (internal).

This module registers GenericDataClassCodec for the public dataclasses that may cross the main↔worker boundary (primarily result container types and formula DSL nodes).

The registry is populated at process startup via get_default_registry().

Attributes

_generics = [_all_types.RListVectorExtension] module-attribute

_classes = [t for name, t in (_all_types.__dict__.items()) if isinstance(t, type) and is_dataclass(t) and t not in _generics] module-attribute

Classes

CodecRegistry

Ordered registry of encoders used for IPC serialization.

Source code in brmspy/_session/codec/base.py
class CodecRegistry:
    """Ordered registry of encoders used for IPC serialization."""

    def __init__(self) -> None:
        self._by_codec: dict[str, Encoder] = {}
        self._encoders: list[Encoder] = []

    def register(self, encoder: Encoder) -> None:
        """
        Register an encoder instance.

        Parameters
        ----------
        encoder : brmspy.types.session.Encoder
            Encoder to register. Its `codec` attribute is used as the key when present,
            otherwise the class name is used.
        """
        if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
            codec_name = encoder.codec  # type: ignore
        else:
            codec_name = type(encoder).__name__
        self._by_codec[codec_name] = encoder
        encoder.codec = codec_name  # type: ignore
        self._encoders.append(encoder)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        """
        Encode an object by selecting the first encoder that accepts it.

        Parameters
        ----------
        obj : Any
            Value to encode.
        shm_pool : Any
            SHM pool used by codecs for allocating buffers.

        Returns
        -------
        brmspy.types.session.EncodeResult
        """
        for enc in self._encoders:
            if enc.can_encode(obj):
                res = enc.encode(obj, shm_pool)
                if not res.codec:
                    res.codec = type(enc).__name__
                return res

        # fallback to pickle
        if "PickleCodec" not in self._by_codec:
            raise RuntimeError("No pickle codec registered")
        return self._by_codec["PickleCodec"].encode(obj, shm_pool)

    def decode(
        self,
        payload: PayloadRef,
        shm_pool: Any,
    ) -> Any:
        """
        Decode a payload using a named codec.

        Parameters
        ----------
        codec : str
            Codec identifier previously returned by `encode()`.
        meta : dict[str, Any]
            Codec metadata.
        buffers : list[memoryview]
            Memoryviews for attached SHM buffers.
        buffer_specs : list[dict]
            Original buffer specs (name/size) corresponding to `buffers`.
        shm_pool : Any
            SHM pool (some codecs may attach additional buffers).

        Returns
        -------
        Any
        """
        codec = payload["codec"]
        if codec not in self._by_codec:
            raise ValueError(
                f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
            )

        buffers = []

        @contextmanager
        def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
            buf = shm_pool.attach(ref)
            memview = memoryview(buf.shm.buf)
            view = memview[: ref["content_size"]].cast("B")

            try:
                if not ref["temporary"]:
                    # non-temporary buffers are associated with columns / objects
                    buffers.append(buf)

                yield buf, view

            finally:
                # deterministic cleanup for temporary buffers
                if ref["temporary"]:
                    # IMPORTANT: release view before closing shm
                    try:
                        view.release()
                        memview.release()
                        shm_pool.gc(ref["name"])
                        buf.shm.close()
                    except:
                        pass

        value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
        self._attach_shm_lifetime(value, buffers)

        return value

    @classmethod
    def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
        """Keep SHM blocks alive as long as `obj` is alive."""
        if not shms:
            return
        if obj is None or isinstance(obj, (bool, str, int, float)):
            return

        try:
            weakref.finalize(obj, _noop, tuple(shms))
        except:
            return

Attributes

_by_codec = {} instance-attribute
_encoders = [] instance-attribute

Functions

__init__()
Source code in brmspy/_session/codec/base.py
def __init__(self) -> None:
    self._by_codec: dict[str, Encoder] = {}
    self._encoders: list[Encoder] = []
register(encoder)

Register an encoder instance.

Parameters:

Name Type Description Default
encoder Encoder

Encoder to register. Its codec attribute is used as the key when present, otherwise the class name is used.

required
Source code in brmspy/_session/codec/base.py
def register(self, encoder: Encoder) -> None:
    """
    Register an encoder instance.

    Parameters
    ----------
    encoder : brmspy.types.session.Encoder
        Encoder to register. Its `codec` attribute is used as the key when present,
        otherwise the class name is used.
    """
    if hasattr(encoder, "codec") and encoder.codec:  # type: ignore
        codec_name = encoder.codec  # type: ignore
    else:
        codec_name = type(encoder).__name__
    self._by_codec[codec_name] = encoder
    encoder.codec = codec_name  # type: ignore
    self._encoders.append(encoder)
encode(obj, shm_pool)

Encode an object by selecting the first encoder that accepts it.

Parameters:

Name Type Description Default
obj Any

Value to encode.

required
shm_pool Any

SHM pool used by codecs for allocating buffers.

required

Returns:

Type Description
EncodeResult
Source code in brmspy/_session/codec/base.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    """
    Encode an object by selecting the first encoder that accepts it.

    Parameters
    ----------
    obj : Any
        Value to encode.
    shm_pool : Any
        SHM pool used by codecs for allocating buffers.

    Returns
    -------
    brmspy.types.session.EncodeResult
    """
    for enc in self._encoders:
        if enc.can_encode(obj):
            res = enc.encode(obj, shm_pool)
            if not res.codec:
                res.codec = type(enc).__name__
            return res

    # fallback to pickle
    if "PickleCodec" not in self._by_codec:
        raise RuntimeError("No pickle codec registered")
    return self._by_codec["PickleCodec"].encode(obj, shm_pool)
decode(payload, shm_pool)

Decode a payload using a named codec.

Parameters:

Name Type Description Default
codec str

Codec identifier previously returned by encode().

required
meta dict[str, Any]

Codec metadata.

required
buffers list[memoryview]

Memoryviews for attached SHM buffers.

required
buffer_specs list[dict]

Original buffer specs (name/size) corresponding to buffers.

required
shm_pool Any

SHM pool (some codecs may attach additional buffers).

required

Returns:

Type Description
Any
Source code in brmspy/_session/codec/base.py
def decode(
    self,
    payload: PayloadRef,
    shm_pool: Any,
) -> Any:
    """
    Decode a payload using a named codec.

    Parameters
    ----------
    codec : str
        Codec identifier previously returned by `encode()`.
    meta : dict[str, Any]
        Codec metadata.
    buffers : list[memoryview]
        Memoryviews for attached SHM buffers.
    buffer_specs : list[dict]
        Original buffer specs (name/size) corresponding to `buffers`.
    shm_pool : Any
        SHM pool (some codecs may attach additional buffers).

    Returns
    -------
    Any
    """
    codec = payload["codec"]
    if codec not in self._by_codec:
        raise ValueError(
            f"Unknown codec: {codec}, available: {list(self._by_codec.keys())}"
        )

    buffers = []

    @contextmanager
    def get_buf(ref: ShmRef) -> Iterator[tuple[ShmBlock, memoryview]]:
        buf = shm_pool.attach(ref)
        memview = memoryview(buf.shm.buf)
        view = memview[: ref["content_size"]].cast("B")

        try:
            if not ref["temporary"]:
                # non-temporary buffers are associated with columns / objects
                buffers.append(buf)

            yield buf, view

        finally:
            # deterministic cleanup for temporary buffers
            if ref["temporary"]:
                # IMPORTANT: release view before closing shm
                try:
                    view.release()
                    memview.release()
                    shm_pool.gc(ref["name"])
                    buf.shm.close()
                except:
                    pass

    value = self._by_codec[codec].decode(payload, get_buf, shm_pool)
    self._attach_shm_lifetime(value, buffers)

    return value
_attach_shm_lifetime(obj, shms) classmethod

Keep SHM blocks alive as long as obj is alive.

Source code in brmspy/_session/codec/base.py
@classmethod
def _attach_shm_lifetime(cls, obj: Any, shms: list[ShmBlock]) -> None:
    """Keep SHM blocks alive as long as `obj` is alive."""
    if not shms:
        return
    if obj is None or isinstance(obj, (bool, str, int, float)):
        return

    try:
        weakref.finalize(obj, _noop, tuple(shms))
    except:
        return

GenericDataClassCodec

Bases: Encoder

Generic codec for dataclasses (internal).

Encodes each init=True field by delegating to a CodecRegistry. Use skip_fields to exclude fields that must not cross the boundary.

Source code in brmspy/_session/codec/builtin.py
class GenericDataClassCodec(Encoder):
    """
    Generic codec for dataclasses (internal).

    Encodes each `init=True` field by delegating to a
    [`CodecRegistry`][brmspy._session.codec.base.CodecRegistry]. Use `skip_fields` to exclude
    fields that must not cross the boundary.
    """

    def __init__(
        self,
        cls: type[Any],
        registry: CodecRegistry,
        *,
        skip_fields: set[str] | None = None,
    ) -> None:
        if not is_dataclass(cls):
            raise TypeError(f"{cls!r} is not a dataclass")

        self._cls = cls
        self._registry = registry
        self.codec = f"dataclass::{cls.__module__}.{cls.__qualname__}"

        self._skip_fields = skip_fields or set()
        self._field_names: list[str] = []

        # Precompute which fields we actually encode
        for f in dc_fields(cls):
            if not f.init:
                continue
            if f.name in self._skip_fields:
                continue
            self._field_names.append(f.name)

    def can_encode(self, obj: Any) -> bool:
        return isinstance(obj, self._cls)

    def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
        buffers: list[ShmRef] = []
        fields_meta: dict[str, Any] = {}

        for field_name in self._field_names:
            value = getattr(obj, field_name)

            # Delegate to registry; chooses right encoder for the actual *runtime* type
            res = self._registry.encode(value, shm_pool)

            start = len(buffers)
            count = len(res.buffers)

            fields_meta[field_name] = {
                "codec": res.codec,
                "meta": res.meta,
                "start": start,
                "count": count,
            }

            buffers.extend(res.buffers)

        meta: dict[str, Any] = {
            "module": self._cls.__module__,
            "qualname": self._cls.__qualname__,
            "fields": fields_meta,
        }

        return EncodeResult(
            codec=self.codec,
            meta=meta,
            buffers=buffers,
        )

    def decode(
        self,
        payload: PayloadRef,
        get_buf: Callable[[ShmRef], GetBufContext],
        *args,
    ) -> Any:
        meta = payload["meta"]
        fields_meta: dict[str, Any] = meta["fields"]
        kwargs: dict[str, Any] = {}

        assert len(args) > 0
        pool = args[0]

        specs = payload["buffers"]

        for field_name, fmeta in fields_meta.items():
            codec_name = fmeta["codec"]
            start = fmeta["start"]
            count = fmeta["count"]

            subpayload: PayloadRef = {
                "codec": codec_name,
                "meta": fmeta["meta"],
                "buffers": specs[start : start + count],
            }

            # IMPORTANT: slice buffer_specs in the same way as buffers
            value = self._registry.decode(subpayload, pool)
            kwargs[field_name] = value

        return self._cls(**kwargs)

Attributes

_cls = cls instance-attribute
_registry = registry instance-attribute
codec = f'dataclass::{cls.__module__}.{cls.__qualname__}' instance-attribute
_skip_fields = skip_fields or set() instance-attribute
_field_names = [] instance-attribute

Functions

__init__(cls, registry, *, skip_fields=None)
Source code in brmspy/_session/codec/builtin.py
def __init__(
    self,
    cls: type[Any],
    registry: CodecRegistry,
    *,
    skip_fields: set[str] | None = None,
) -> None:
    if not is_dataclass(cls):
        raise TypeError(f"{cls!r} is not a dataclass")

    self._cls = cls
    self._registry = registry
    self.codec = f"dataclass::{cls.__module__}.{cls.__qualname__}"

    self._skip_fields = skip_fields or set()
    self._field_names: list[str] = []

    # Precompute which fields we actually encode
    for f in dc_fields(cls):
        if not f.init:
            continue
        if f.name in self._skip_fields:
            continue
        self._field_names.append(f.name)
can_encode(obj)
Source code in brmspy/_session/codec/builtin.py
def can_encode(self, obj: Any) -> bool:
    return isinstance(obj, self._cls)
encode(obj, shm_pool)
Source code in brmspy/_session/codec/builtin.py
def encode(self, obj: Any, shm_pool: Any) -> EncodeResult:
    buffers: list[ShmRef] = []
    fields_meta: dict[str, Any] = {}

    for field_name in self._field_names:
        value = getattr(obj, field_name)

        # Delegate to registry; chooses right encoder for the actual *runtime* type
        res = self._registry.encode(value, shm_pool)

        start = len(buffers)
        count = len(res.buffers)

        fields_meta[field_name] = {
            "codec": res.codec,
            "meta": res.meta,
            "start": start,
            "count": count,
        }

        buffers.extend(res.buffers)

    meta: dict[str, Any] = {
        "module": self._cls.__module__,
        "qualname": self._cls.__qualname__,
        "fields": fields_meta,
    }

    return EncodeResult(
        codec=self.codec,
        meta=meta,
        buffers=buffers,
    )
decode(payload, get_buf, *args)
Source code in brmspy/_session/codec/builtin.py
def decode(
    self,
    payload: PayloadRef,
    get_buf: Callable[[ShmRef], GetBufContext],
    *args,
) -> Any:
    meta = payload["meta"]
    fields_meta: dict[str, Any] = meta["fields"]
    kwargs: dict[str, Any] = {}

    assert len(args) > 0
    pool = args[0]

    specs = payload["buffers"]

    for field_name, fmeta in fields_meta.items():
        codec_name = fmeta["codec"]
        start = fmeta["start"]
        count = fmeta["count"]

        subpayload: PayloadRef = {
            "codec": codec_name,
            "meta": fmeta["meta"],
            "buffers": specs[start : start + count],
        }

        # IMPORTANT: slice buffer_specs in the same way as buffers
        value = self._registry.decode(subpayload, pool)
        kwargs[field_name] = value

    return self._cls(**kwargs)

Functions

register_dataclasses(registry)

Register codecs for known dataclass types.

Parameters:

Name Type Description Default
registry CodecRegistry

Registry to populate.

required
Source code in brmspy/_session/codec/dataclass.py
def register_dataclasses(registry: CodecRegistry) -> None:
    """
    Register codecs for known dataclass types.

    Parameters
    ----------
    registry : brmspy._session.codec.base.CodecRegistry
        Registry to populate.
    """
    for _cls in _classes:
        codec = GenericDataClassCodec(cls=_cls, registry=registry)
        registry.register(codec)