Skip to content

transport

Shared-memory transport utilities (internal).

RModuleSession uses shared memory to move large payloads between main and worker. The parent allocates blocks and passes only (name, size) references over the Pipe. The worker (or the main process during decode) attaches by name to access buffers.

This module implements the concrete ShmPool used by the session layer.

Classes

ShmRef

Bases: TypedDict

Reference to a shared-memory block sent over IPC.

Attributes:

Name Type Description
name str

Shared memory block name (as assigned by SharedMemoryManager).

size int

Allocated block size in bytes.

content_size int

Actual used size

temporary bool

Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.

Notes

Codecs may store a logical payload smaller than size. In that case, the codec metadata must include the logical nbytes/length so that decoders can slice the buffer appropriately.

Source code in brmspy/types/shm.py
class ShmRef(TypedDict):
    """
    Reference to a shared-memory block sent over IPC.

    Attributes
    ----------
    name : str
        Shared memory block name (as assigned by `SharedMemoryManager`).
    size : int
        Allocated block size in bytes.
    content_size : int
        Actual used size
    temporary : bool
        Whether this buffer can be GC-d immediately after use or should it be attached to object its constructed into.

    Notes
    -----
    Codecs may store a logical payload smaller than `size`. In that case, the
    codec metadata must include the logical `nbytes`/length so that decoders can
    slice the buffer appropriately.
    """

    name: str
    size: int
    content_size: int
    temporary: bool

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
temporary instance-attribute

ShmBlock dataclass

Attached shared-memory block (name/size + live SharedMemory handle).

Notes

This object owns a SharedMemory handle and must be closed when no longer needed. In brmspy this is managed by a ShmPool implementation.

Source code in brmspy/types/shm.py
@dataclass
class ShmBlock:
    """
    Attached shared-memory block (name/size + live `SharedMemory` handle).

    Notes
    -----
    This object owns a `SharedMemory` handle and must be closed when no longer
    needed. In brmspy this is managed by a `ShmPool` implementation.
    """

    name: str
    size: int
    content_size: int
    shm: SharedMemory
    temporary: bool

    def to_ref(self) -> ShmRef:
        return {
            "name": self.name,
            "size": self.size,
            "content_size": self.content_size,
            "temporary": self.temporary,
        }

Attributes

name instance-attribute
size instance-attribute
content_size instance-attribute
shm instance-attribute
temporary instance-attribute

Functions

to_ref()
Source code in brmspy/types/shm.py
def to_ref(self) -> ShmRef:
    return {
        "name": self.name,
        "size": self.size,
        "content_size": self.content_size,
        "temporary": self.temporary,
    }
__init__(name, size, content_size, shm, temporary)

_ShmPool

Minimal interface for allocating and attaching shared-memory blocks.

The concrete implementation lives in brmspy._session.transport.ShmPool and tracks blocks so they can be closed on teardown.

Source code in brmspy/types/shm.py
class ShmPool:
    """
    Minimal interface for allocating and attaching shared-memory blocks.

    The concrete implementation lives in
    [`brmspy._session.transport.ShmPool`][brmspy._session.transport.ShmPool] and tracks
    blocks so they can be closed on teardown.
    """

    def __init__(self, manager: SharedMemoryManager) -> None:
        """
        Create a pool bound to an existing `SharedMemoryManager`.

        Parameters
        ----------
        manager : multiprocessing.managers.SharedMemoryManager
            Manager used to allocate blocks.
        """
        ...

    def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
        """
        Allocate a new shared-memory block.

        Parameters
        ----------
        size : int
            Size in bytes.

        Returns
        -------
        ShmBlock
            Newly allocated block.
        """
        ...

    def attach(self, ref: ShmRef) -> ShmBlock:
        """
        Attach to an existing shared-memory block by name.

        Returns
        -------
        ShmBlock
            Attached block.
        """
        ...

    def close_all(self) -> None:
        """
        Close all tracked shared-memory handles owned by this pool.

        Returns
        -------
        None
        """
        ...

    def gc(self, name: str | None = None) -> None: ...

Functions

__init__(manager)

Create a pool bound to an existing SharedMemoryManager.

Parameters:

Name Type Description Default
manager SharedMemoryManager

Manager used to allocate blocks.

required
Source code in brmspy/types/shm.py
def __init__(self, manager: SharedMemoryManager) -> None:
    """
    Create a pool bound to an existing `SharedMemoryManager`.

    Parameters
    ----------
    manager : multiprocessing.managers.SharedMemoryManager
        Manager used to allocate blocks.
    """
    ...
alloc(size, temporary=False)

Allocate a new shared-memory block.

Parameters:

Name Type Description Default
size int

Size in bytes.

required

Returns:

Type Description
ShmBlock

Newly allocated block.

Source code in brmspy/types/shm.py
def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
    """
    Allocate a new shared-memory block.

    Parameters
    ----------
    size : int
        Size in bytes.

    Returns
    -------
    ShmBlock
        Newly allocated block.
    """
    ...
attach(ref)

Attach to an existing shared-memory block by name.

Returns:

Type Description
ShmBlock

Attached block.

Source code in brmspy/types/shm.py
def attach(self, ref: ShmRef) -> ShmBlock:
    """
    Attach to an existing shared-memory block by name.

    Returns
    -------
    ShmBlock
        Attached block.
    """
    ...
close_all()

Close all tracked shared-memory handles owned by this pool.

Returns:

Type Description
None
Source code in brmspy/types/shm.py
def close_all(self) -> None:
    """
    Close all tracked shared-memory handles owned by this pool.

    Returns
    -------
    None
    """
    ...
gc(name=None)
Source code in brmspy/types/shm.py
def gc(self, name: str | None = None) -> None: ...

ShmPool

Bases: ShmPool

Concrete shared-memory pool implementation that temporarily tracks attached blocks.

_blocks dict keeps references to shm buffers TEMPORARILY and is cleaned up before each 'responding to main' or 'sending new message to worker'. This allows the in-between processing of shm buffers to rely on the buffers not being garbage collected.

After reconstructing an object from a shm buffer, it's the CodecRegistrys role to take over the reference by initiating a weakref between the reconstructed object and buffer (or skipping if the object is temporary).

This helps ensure that a minimal amount of shm buffers are actively mapped and garbage collection can remove file descriptors no longer needed.

Source code in brmspy/_session/transport.py
class ShmPool(_ShmPool):
    """
    Concrete shared-memory pool implementation that temporarily tracks attached blocks.

    _blocks dict keeps references to shm buffers TEMPORARILY and is cleaned up
    before each 'responding to main' or 'sending new message to worker'. This
    allows the in-between processing of shm buffers to rely on the buffers not
    being garbage collected.

    After reconstructing an object from a shm buffer, it's the CodecRegistrys role
    to take over the reference by initiating a weakref between the reconstructed
    object and buffer (or skipping if the object is temporary).

    This helps ensure that a minimal amount of shm buffers are actively mapped
    and garbage collection can remove file descriptors no longer needed.
    """

    def __init__(self, manager: SharedMemoryManager) -> None:
        self._manager = manager
        self._blocks: dict[str, ShmBlock] = {}

    def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
        # print(f"alloc {'temp' if temporary else ''}")
        shm = self._manager.SharedMemory(size=size)
        block = ShmBlock(
            name=shm.name,
            size=shm.size,
            shm=shm,
            content_size=size,
            temporary=temporary,
        )
        self._blocks[block.name] = block
        return block

    def attach(self, ref: ShmRef) -> ShmBlock:
        if ref["name"] in self._blocks:
            return self._blocks[ref["name"]]
        shm = SharedMemory(name=ref["name"])
        block = ShmBlock(
            name=ref["name"],
            size=ref["size"],
            shm=shm,
            content_size=ref["content_size"],
            temporary=ref["temporary"],
        )
        self._blocks[ref["name"]] = block
        return block

    def close_all(self) -> None:
        for block in self._blocks.values():
            block.shm.close()
        self._blocks.clear()

    def gc(self, name: str | None = None):
        if name is not None:
            b = self._blocks.pop(name, None)
            if b is not None:
                b.shm.close()
            return

        for key in list(self._blocks.keys()):
            b = self._blocks[key]
            if b.temporary:
                b.shm.close()
            del self._blocks[key]

Attributes

_manager = manager instance-attribute
_blocks = {} instance-attribute

Functions

__init__(manager)
Source code in brmspy/_session/transport.py
def __init__(self, manager: SharedMemoryManager) -> None:
    self._manager = manager
    self._blocks: dict[str, ShmBlock] = {}
alloc(size, temporary=False)
Source code in brmspy/_session/transport.py
def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
    # print(f"alloc {'temp' if temporary else ''}")
    shm = self._manager.SharedMemory(size=size)
    block = ShmBlock(
        name=shm.name,
        size=shm.size,
        shm=shm,
        content_size=size,
        temporary=temporary,
    )
    self._blocks[block.name] = block
    return block
attach(ref)
Source code in brmspy/_session/transport.py
def attach(self, ref: ShmRef) -> ShmBlock:
    if ref["name"] in self._blocks:
        return self._blocks[ref["name"]]
    shm = SharedMemory(name=ref["name"])
    block = ShmBlock(
        name=ref["name"],
        size=ref["size"],
        shm=shm,
        content_size=ref["content_size"],
        temporary=ref["temporary"],
    )
    self._blocks[ref["name"]] = block
    return block
close_all()
Source code in brmspy/_session/transport.py
def close_all(self) -> None:
    for block in self._blocks.values():
        block.shm.close()
    self._blocks.clear()
gc(name=None)
Source code in brmspy/_session/transport.py
def gc(self, name: str | None = None):
    if name is not None:
        b = self._blocks.pop(name, None)
        if b is not None:
            b.shm.close()
        return

    for key in list(self._blocks.keys()):
        b = self._blocks[key]
        if b.temporary:
            b.shm.close()
        del self._blocks[key]