Skip to content

transport

Shared-memory transport utilities (internal).

RModuleSession uses shared memory to move large payloads between main and worker. The parent allocates blocks and passes only (name, size) references over the Pipe. The worker (or the main process during decode) attaches by name to access buffers.

This module implements the concrete ShmPool used by the session layer.

Classes

ShmRef

Bases: TypedDict

Reference to a shared-memory block sent over IPC.

Attributes:

Name Type Description
name str

Shared memory block name (as assigned by SharedMemoryManager).

size int

Allocated block size in bytes.

content_size int

Actual used size

temporary bool

Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.

offset int

Byte offset within the SHM block (for slab sub-allocations).

Notes

Codecs may store a logical payload smaller than size. In that case, the codec metadata must include the logical nbytes/length so that decoders can slice the buffer appropriately.

Source code in brmspy/types/shm.py
class ShmRef(TypedDict):
    """
    Reference to a shared-memory block sent over IPC.

    Attributes
    ----------
    name : str
        Shared memory block name (as assigned by `SharedMemoryManager`).
    size : int
        Allocated block size in bytes.
    content_size : int
        Actual used size
    temporary : bool
        Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.
    offset : int
        Byte offset within the SHM block (for slab sub-allocations).

    Notes
    -----
    Codecs may store a logical payload smaller than `size`. In that case, the
    codec metadata must include the logical `nbytes`/length so that decoders can
    slice the buffer appropriately.
    """

    name: str
    size: int
    content_size: int
    temporary: bool
    offset: int

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
temporary instance-attribute
offset instance-attribute

ShmBlock dataclass

Attached shared-memory block (name/size + live SharedMemory handle).

Notes

This object owns a SharedMemory handle and must be closed when no longer needed. In brmspy this is managed by a ShmPool implementation.

Source code in brmspy/types/shm.py
@dataclass
class ShmBlock:
    """
    Attached shared-memory block (name/size + live `SharedMemory` handle).

    Notes
    -----
    This object owns a `SharedMemory` handle and must be closed when no longer
    needed. In brmspy this is managed by a `ShmPool` implementation.
    """

    name: str
    size: int
    content_size: int
    shm: SharedMemory
    temporary: bool
    offset: int = 0

    def to_ref(self) -> ShmRef:
        return {
            "name": self.name,
            "size": self.size,
            "content_size": self.content_size,
            "temporary": self.temporary,
            "offset": self.offset,
        }

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
shm instance-attribute
temporary instance-attribute
offset = 0 class-attribute instance-attribute

Functions

to_ref()
Source code in brmspy/types/shm.py
def to_ref(self) -> ShmRef:
    return {
        "name": self.name,
        "size": self.size,
        "content_size": self.content_size,
        "temporary": self.temporary,
        "offset": self.offset,
    }
__init__(name, size, content_size, shm, temporary, offset=0)

_ShmPool

Minimal interface for allocating and attaching shared-memory blocks.

The concrete implementation lives in brmspy._session.transport.ShmPool and tracks blocks so they can be closed on teardown.

Source code in brmspy/types/shm.py
class ShmPool:
    """
    Minimal interface for allocating and attaching shared-memory blocks.

    The concrete implementation lives in
    [`brmspy._session.transport.ShmPool`][brmspy._session.transport.ShmPool] and tracks
    blocks so they can be closed on teardown.
    """

    def __init__(self, manager: SharedMemoryManager) -> None:
        """
        Create a pool bound to an existing `SharedMemoryManager`.

        Parameters
        ----------
        manager : multiprocessing.managers.SharedMemoryManager
            Manager used to allocate blocks.
        """
        ...

    def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
        """
        Allocate a new shared-memory block.

        Parameters
        ----------
        size : int
            Size in bytes.

        Returns
        -------
        ShmBlock
            Newly allocated block.
        """
        ...

    def attach(self, ref: ShmRef) -> ShmBlock:
        """
        Attach to an existing shared-memory block by name.

        Returns
        -------
        ShmBlock
            Attached block.
        """
        ...

    def close_all(self) -> None:
        """
        Close all tracked shared-memory handles owned by this pool.

        Returns
        -------
        None
        """
        ...

    def gc(self, name: str | None = None) -> None: ...

Functions

__init__(manager)

Create a pool bound to an existing SharedMemoryManager.

Parameters:

Name Type Description Default
manager SharedMemoryManager

Manager used to allocate blocks.

required
Source code in brmspy/types/shm.py
def __init__(self, manager: SharedMemoryManager) -> None:
    """
    Create a pool bound to an existing `SharedMemoryManager`.

    Parameters
    ----------
    manager : multiprocessing.managers.SharedMemoryManager
        Manager used to allocate blocks.
    """
    ...
alloc(size, temporary=False)

Allocate a new shared-memory block.

Parameters:

Name Type Description Default
size int

Size in bytes.

required

Returns:

Type Description
ShmBlock

Newly allocated block.

Source code in brmspy/types/shm.py
def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
    """
    Allocate a new shared-memory block.

    Parameters
    ----------
    size : int
        Size in bytes.

    Returns
    -------
    ShmBlock
        Newly allocated block.
    """
    ...
attach(ref)

Attach to an existing shared-memory block by name.

Returns:

Type Description
ShmBlock

Attached block.

Source code in brmspy/types/shm.py
def attach(self, ref: ShmRef) -> ShmBlock:
    """
    Attach to an existing shared-memory block by name.

    Returns
    -------
    ShmBlock
        Attached block.
    """
    ...
close_all()

Close all tracked shared-memory handles owned by this pool.

Returns:

Type Description
None
Source code in brmspy/types/shm.py
def close_all(self) -> None:
    """
    Close all tracked shared-memory handles owned by this pool.

    Returns
    -------
    None
    """
    ...
gc(name=None)
Source code in brmspy/types/shm.py
def gc(self, name: str | None = None) -> None: ...

ShmPool

Bases: ShmPool

Concrete shared-memory pool implementation with slab allocation.

When a slab is active (via open_slab/seal_slab), alloc() bump-allocates sub-regions from a single large SharedMemory block, drastically reducing the number of file descriptors for workloads that encode many small arrays (e.g. InferenceData with hundreds of data variables).

_blocks dict keeps references to shm buffers TEMPORARILY and is cleaned up before each 'responding to main' or 'sending new message to worker'. This allows the in-between processing of shm buffers to rely on the buffers not being garbage collected.

After reconstructing an object from a shm buffer, it's the CodecRegistrys role to take over the reference by initiating a weakref between the reconstructed object and buffer (or skipping if the object is temporary).

Source code in brmspy/_session/transport.py
class ShmPool(_ShmPool):
    """
    Concrete shared-memory pool implementation with slab allocation.

    When a slab is active (via `open_slab`/`seal_slab`), `alloc()` bump-allocates
    sub-regions from a single large SharedMemory block, drastically reducing the
    number of file descriptors for workloads that encode many small arrays
    (e.g. InferenceData with hundreds of data variables).

    _blocks dict keeps references to shm buffers TEMPORARILY and is cleaned up
    before each 'responding to main' or 'sending new message to worker'. This
    allows the in-between processing of shm buffers to rely on the buffers not
    being garbage collected.

    After reconstructing an object from a shm buffer, it's the CodecRegistrys role
    to take over the reference by initiating a weakref between the reconstructed
    object and buffer (or skipping if the object is temporary).
    """

    _ALIGN = 64  # cache-line alignment for sub-allocations

    def __init__(self, manager: SharedMemoryManager) -> None:
        self._manager = manager
        self._blocks: dict[str, ShmBlock] = {}
        # Slab state
        self._slab: SharedMemory | None = None
        self._slab_offset: int = 0
        self._slab_capacity: int = 0

    # -- slab API ----------------------------------------------------------

    def open_slab(self, capacity: int) -> None:
        """Pre-allocate a slab of at least *capacity* bytes.

        While a slab is open, `alloc()` bump-allocates from it instead of
        creating individual SharedMemory segments.  Call `seal_slab()` when
        the batch of allocations is complete.
        """
        self._slab = self._manager.SharedMemory(size=capacity)
        self._slab_offset = 0
        self._slab_capacity = self._slab.size
        # Track the slab itself so gc()/close_all() can manage it
        slab_block = ShmBlock(
            name=self._slab.name,
            size=self._slab.size,
            content_size=self._slab.size,
            shm=self._slab,
            temporary=False,
            offset=0,
        )
        self._blocks[self._slab.name] = slab_block

    def seal_slab(self) -> None:
        """Finalize the current slab (no more sub-allocations)."""
        self._slab = None
        self._slab_offset = 0
        self._slab_capacity = 0

    # -- allocation --------------------------------------------------------

    def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
        if self._slab is not None:
            return self._alloc_from_slab(size, temporary)
        # No active slab — individual SharedMemory segment
        shm = self._manager.SharedMemory(size=size)
        block = ShmBlock(
            name=shm.name,
            size=shm.size,
            shm=shm,
            content_size=size,
            temporary=temporary,
            offset=0,
        )
        self._blocks[block.name] = block
        return block

    def _alloc_from_slab(self, size: int, temporary: bool) -> ShmBlock:
        align = self._ALIGN
        aligned = (self._slab_offset + align - 1) & ~(align - 1)

        if aligned + size > self._slab_capacity:
            # Current slab is full — start a new one sized for the remainder
            self.seal_slab()
            self.open_slab(max(size * 2, 4 * 1024 * 1024))
            aligned = 0

        block = ShmBlock(
            name=self._slab.name,
            size=size,
            content_size=size,
            shm=self._slab,
            temporary=temporary,
            offset=aligned,
        )
        self._slab_offset = aligned + size
        # Sub-blocks are NOT tracked in _blocks — the slab itself is tracked
        return block

    # -- attach / cleanup --------------------------------------------------

    def attach(self, ref: ShmRef) -> ShmBlock:
        name = ref["name"]
        offset = ref.get("offset", 0)
        # Reuse existing mapping for this SHM segment
        existing = self._blocks.get(name)
        if existing is not None:
            shm = existing.shm
        else:
            shm = SharedMemory(name=name)

        block = ShmBlock(
            name=name,
            size=ref["size"],
            shm=shm,
            content_size=ref["content_size"],
            temporary=ref["temporary"],
            offset=offset,
        )
        # Track the base mapping (only if new)
        if existing is None:
            self._blocks[name] = block
        return block

    def close_all(self) -> None:
        for block in self._blocks.values():
            block.shm.close()
        self._blocks.clear()
        self.seal_slab()

    def gc(self, name: str | None = None):
        if name is not None:
            b = self._blocks.pop(name, None)
            if b is not None:
                b.shm.close()
            return

        for key in list(self._blocks.keys()):
            b = self._blocks[key]
            if b.temporary:
                b.shm.close()
            del self._blocks[key]

Attributes

_ALIGN = 64 class-attribute instance-attribute
_manager = manager instance-attribute
_blocks = {} instance-attribute
_slab = None instance-attribute
_slab_offset = 0 instance-attribute
_slab_capacity = 0 instance-attribute

Functions

__init__(manager)
Source code in brmspy/_session/transport.py
def __init__(self, manager: SharedMemoryManager) -> None:
    self._manager = manager
    self._blocks: dict[str, ShmBlock] = {}
    # Slab state
    self._slab: SharedMemory | None = None
    self._slab_offset: int = 0
    self._slab_capacity: int = 0
open_slab(capacity)

Pre-allocate a slab of at least capacity bytes.

While a slab is open, alloc() bump-allocates from it instead of creating individual SharedMemory segments. Call seal_slab() when the batch of allocations is complete.

Source code in brmspy/_session/transport.py
def open_slab(self, capacity: int) -> None:
    """Pre-allocate a slab of at least *capacity* bytes.

    While a slab is open, `alloc()` bump-allocates from it instead of
    creating individual SharedMemory segments.  Call `seal_slab()` when
    the batch of allocations is complete.
    """
    self._slab = self._manager.SharedMemory(size=capacity)
    self._slab_offset = 0
    self._slab_capacity = self._slab.size
    # Track the slab itself so gc()/close_all() can manage it
    slab_block = ShmBlock(
        name=self._slab.name,
        size=self._slab.size,
        content_size=self._slab.size,
        shm=self._slab,
        temporary=False,
        offset=0,
    )
    self._blocks[self._slab.name] = slab_block
seal_slab()

Finalize the current slab (no more sub-allocations).

Source code in brmspy/_session/transport.py
def seal_slab(self) -> None:
    """Finalize the current slab (no more sub-allocations)."""
    self._slab = None
    self._slab_offset = 0
    self._slab_capacity = 0
alloc(size, temporary=False)
Source code in brmspy/_session/transport.py
def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
    if self._slab is not None:
        return self._alloc_from_slab(size, temporary)
    # No active slab — individual SharedMemory segment
    shm = self._manager.SharedMemory(size=size)
    block = ShmBlock(
        name=shm.name,
        size=shm.size,
        shm=shm,
        content_size=size,
        temporary=temporary,
        offset=0,
    )
    self._blocks[block.name] = block
    return block
_alloc_from_slab(size, temporary)
Source code in brmspy/_session/transport.py
def _alloc_from_slab(self, size: int, temporary: bool) -> ShmBlock:
    align = self._ALIGN
    aligned = (self._slab_offset + align - 1) & ~(align - 1)

    if aligned + size > self._slab_capacity:
        # Current slab is full — start a new one sized for the remainder
        self.seal_slab()
        self.open_slab(max(size * 2, 4 * 1024 * 1024))
        aligned = 0

    block = ShmBlock(
        name=self._slab.name,
        size=size,
        content_size=size,
        shm=self._slab,
        temporary=temporary,
        offset=aligned,
    )
    self._slab_offset = aligned + size
    # Sub-blocks are NOT tracked in _blocks — the slab itself is tracked
    return block
attach(ref)
Source code in brmspy/_session/transport.py
def attach(self, ref: ShmRef) -> ShmBlock:
    name = ref["name"]
    offset = ref.get("offset", 0)
    # Reuse existing mapping for this SHM segment
    existing = self._blocks.get(name)
    if existing is not None:
        shm = existing.shm
    else:
        shm = SharedMemory(name=name)

    block = ShmBlock(
        name=name,
        size=ref["size"],
        shm=shm,
        content_size=ref["content_size"],
        temporary=ref["temporary"],
        offset=offset,
    )
    # Track the base mapping (only if new)
    if existing is None:
        self._blocks[name] = block
    return block
close_all()
Source code in brmspy/_session/transport.py
def close_all(self) -> None:
    for block in self._blocks.values():
        block.shm.close()
    self._blocks.clear()
    self.seal_slab()
gc(name=None)
Source code in brmspy/_session/transport.py
def gc(self, name: str | None = None):
    if name is not None:
        b = self._blocks.pop(name, None)
        if b is not None:
            b.shm.close()
        return

    for key in list(self._blocks.keys()):
        b = self._blocks[key]
        if b.temporary:
            b.shm.close()
        del self._blocks[key]