Skip to content

session

Attributes

ctx = mp.get_context('spawn') module-attribute

_INTERNAL_ATTRS = {'_module', '_module_path', '_environment_conf', '_mgr', '_proc', '_conn', '_shm_pool', '_reg', '_closed', '_func_cache', '_call_remote', '_encode_arg', '_decode_result', '_active_ctx', 'add_contextmanager', 'restart', 'shutdown', 'environment_exists', 'environment_activate', '_run_test_by_name'} module-attribute

Classes

RSessionError

Bases: RuntimeError

Error raised when a worker call fails.

Parameters:

Name Type Description Default
message str

Human-readable error message (often derived from R error messages).

required
remote_traceback str or None

Best-effort traceback text from the worker process. For R errors this may be an R traceback string; for Python errors inside the worker it may be a Python traceback.

None
Notes

This exception type is designed to preserve the remote failure context while keeping the main process free of rpy2/R state.

Source code in brmspy/types/errors.py
class RSessionError(RuntimeError):
    """
    Error raised when a worker call fails.

    Parameters
    ----------
    message : str
        Human-readable error message (often derived from R error messages).
    remote_traceback : str or None, default=None
        Best-effort traceback text from the worker process. For R errors this may
        be an R traceback string; for Python errors inside the worker it may be
        a Python traceback.

    Notes
    -----
    This exception type is designed to preserve the *remote* failure context
    while keeping the main process free of rpy2/R state.
    """

    def __init__(self, message: str, remote_traceback: str | None = None) -> None:
        super().__init__(message)
        self.remote_traceback = remote_traceback

    def __str__(self) -> str:
        """Return message plus the remote traceback (if available)."""
        base = super().__str__()
        if self.remote_traceback:
            return f"{base}\n\nRemote traceback:\n{self.remote_traceback}\n\n"
        return base

Attributes

remote_traceback = remote_traceback instance-attribute

Functions

__init__(message, remote_traceback=None)
Source code in brmspy/types/errors.py
def __init__(self, message: str, remote_traceback: str | None = None) -> None:
    super().__init__(message)
    self.remote_traceback = remote_traceback
__str__()

Return message plus the remote traceback (if available).

Source code in brmspy/types/errors.py
def __str__(self) -> str:
    """Return message plus the remote traceback (if available)."""
    base = super().__str__()
    if self.remote_traceback:
        return f"{base}\n\nRemote traceback:\n{self.remote_traceback}\n\n"
    return base

RWorkerCrashedError

Bases: RuntimeError

Raised when the R worker process crashes during an operation.

Parameters:

Name Type Description Default
message str

Human-readable description of the failure.

required
recovered bool

Indicates whether a fresh worker session was successfully started.

  • True – The crash occurred, but automatic recovery succeeded. The failed operation did not complete, but the worker is now in a clean state. Callers may safely retry.
  • False – The crash occurred and automatic recovery failed. A usable worker session is not available. Callers should treat this as a hard failure and abort or escalate.
required
cause BaseException

The original exception that triggered the crash. Stored as __cause__ for chained exception inspection.

None
Usage

In user code or automated pipelines, you can distinguish between a recoverable and unrecoverable crash:

try:
    brms.brm(...)
except RWorkerCrashedError as err:
    if err.recovered:
        # Crash occurred, but a fresh worker is ready.
        # Safe to retry the operation once.
        brms.brm(...)
    else:
        # Worker could not be restarted.
        # Treat this as a hard failure.
        raise
Notes

All crashes automatically produce a new exception that wraps the original failure using Python's exception chaining (raise ... from cause). Inspect err.__cause__ for the underlying system error.

Source code in brmspy/types/errors.py
class RWorkerCrashedError(RuntimeError):
    """
    Raised when the R worker process crashes during an operation.

    Parameters
    ----------
    message : str
        Human-readable description of the failure.
    recovered : bool
        Indicates whether a fresh worker session was successfully started.

        * ``True``  – The crash occurred, but automatic recovery succeeded.
                      The failed operation did *not* complete, but the worker
                      is now in a clean state. Callers may safely retry.
        * ``False`` – The crash occurred and automatic recovery failed.
                      A usable worker session is not available. Callers should
                      treat this as a hard failure and abort or escalate.
    cause : BaseException, optional
        The original exception that triggered the crash. Stored as ``__cause__``
        for chained exception inspection.

    Usage
    -----
    In user code or automated pipelines, you can distinguish between a
    recoverable and unrecoverable crash:

    ```python
    try:
        brms.brm(...)
    except RWorkerCrashedError as err:
        if err.recovered:
            # Crash occurred, but a fresh worker is ready.
            # Safe to retry the operation once.
            brms.brm(...)
        else:
            # Worker could not be restarted.
            # Treat this as a hard failure.
            raise
    ```

    Notes
    -----
    All crashes automatically produce a new exception that wraps the original
    failure using Python's exception chaining (``raise ... from cause``).
    Inspect ``err.__cause__`` for the underlying system error.
    """

    def __init__(
        self, message: str, *, recovered: bool, cause: BaseException | None = None
    ):
        super().__init__(message)
        self.recovered = recovered
        self.__cause__ = cause

Attributes

recovered = recovered instance-attribute
__cause__ = cause instance-attribute

Functions

__init__(message, *, recovered, cause=None)
Source code in brmspy/types/errors.py
def __init__(
    self, message: str, *, recovered: bool, cause: BaseException | None = None
):
    super().__init__(message)
    self.recovered = recovered
    self.__cause__ = cause

EncodeResult dataclass

Result of encoding a Python value for IPC transfer.

Attributes:

Name Type Description
codec str

Codec identifier.

meta dict[str, Any]

JSON-serializable metadata required for decoding.

buffers list[ShmRef]

Shared-memory blocks backing the encoded payload.

Source code in brmspy/types/session.py
@dataclass
class EncodeResult:
    """
    Result of encoding a Python value for IPC transfer.

    Attributes
    ----------
    codec : str
        Codec identifier.
    meta : dict[str, Any]
        JSON-serializable metadata required for decoding.
    buffers : list[ShmRef]
        Shared-memory blocks backing the encoded payload.
    """

    codec: str
    meta: dict[str, Any]
    buffers: list[ShmRef]

Attributes

codec instance-attribute
meta instance-attribute
buffers instance-attribute

Functions

__init__(codec, meta, buffers)

EnvironmentConfig dataclass

Worker environment configuration.

This configuration is applied in the worker before importing/using brms.

Parameters:

Name Type Description Default
r_home str or None

Override for R_HOME. If None, the worker will rely on system detection.

None
startup_scripts list[str]

R code snippets executed in the worker after initialization.

list()
environment_name str

brmspy environment name (used to determine ~/.brmspy/environment/<name>/Rlib).

'default'
runtime_path str or None

Path to a brmspy runtime bundle to activate in the worker.

None
env dict[str, str]

Extra environment variables applied when spawning the worker.

dict()
Source code in brmspy/types/session.py
@dataclass
class EnvironmentConfig:
    """
    Worker environment configuration.

    This configuration is applied in the worker before importing/using brms.

    Parameters
    ----------
    r_home : str or None
        Override for `R_HOME`. If None, the worker will rely on system detection.
    startup_scripts : list[str]
        R code snippets executed in the worker after initialization.
    environment_name : str
        brmspy environment name (used to determine `~/.brmspy/environment/<name>/Rlib`).
    runtime_path : str or None
        Path to a brmspy runtime bundle to activate in the worker.
    env : dict[str, str]
        Extra environment variables applied when spawning the worker.
    """

    r_home: None | str = None
    startup_scripts: list[str] = field(default_factory=list)
    environment_name: str = "default"
    runtime_path: None | str = None
    env: dict[str, str] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Serialize configuration for persistence to JSON."""
        return {
            "environment_name": self.environment_name,
            "r_home": self.r_home,
            "startup_scripts": self.startup_scripts or [],
            "runtime_path": self.runtime_path,
            "env": self.env,
        }

    @classmethod
    def from_dict(cls, obj: dict[str, Any]) -> EnvironmentConfig:
        """Deserialize configuration from a JSON object."""
        return cls(
            r_home=obj["r_home"],
            startup_scripts=obj["startup_scripts"],
            environment_name=obj["environment_name"],
            runtime_path=obj["runtime_path"],
            env=obj["env"],
        )

    @classmethod
    def from_obj(
        cls, obj: None | dict[str, Any] | EnvironmentConfig
    ) -> EnvironmentConfig:
        """Normalize `None | dict | EnvironmentConfig` into an `EnvironmentConfig`."""
        if obj is None:
            return cls()
        if isinstance(obj, dict):
            return cls.from_dict(obj)
        return obj

Attributes

r_home = None class-attribute instance-attribute
startup_scripts = field(default_factory=list) class-attribute instance-attribute
environment_name = 'default' class-attribute instance-attribute
runtime_path = None class-attribute instance-attribute
env = field(default_factory=dict) class-attribute instance-attribute

Functions

to_dict()

Serialize configuration for persistence to JSON.

Source code in brmspy/types/session.py
def to_dict(self) -> dict[str, Any]:
    """Serialize configuration for persistence to JSON."""
    return {
        "environment_name": self.environment_name,
        "r_home": self.r_home,
        "startup_scripts": self.startup_scripts or [],
        "runtime_path": self.runtime_path,
        "env": self.env,
    }
from_dict(obj) classmethod

Deserialize configuration from a JSON object.

Source code in brmspy/types/session.py
@classmethod
def from_dict(cls, obj: dict[str, Any]) -> EnvironmentConfig:
    """Deserialize configuration from a JSON object."""
    return cls(
        r_home=obj["r_home"],
        startup_scripts=obj["startup_scripts"],
        environment_name=obj["environment_name"],
        runtime_path=obj["runtime_path"],
        env=obj["env"],
    )
from_obj(obj) classmethod

Normalize None | dict | EnvironmentConfig into an EnvironmentConfig.

Source code in brmspy/types/session.py
@classmethod
def from_obj(
    cls, obj: None | dict[str, Any] | EnvironmentConfig
) -> EnvironmentConfig:
    """Normalize `None | dict | EnvironmentConfig` into an `EnvironmentConfig`."""
    if obj is None:
        return cls()
    if isinstance(obj, dict):
        return cls.from_dict(obj)
    return obj
__init__(r_home=None, startup_scripts=list(), environment_name='default', runtime_path=None, env=dict())

PayloadRef

Bases: TypedDict

Encoded argument/result payload sent over the control pipe.

A payload is:

  • codec: the codec identifier used by the registry
  • meta: JSON-serializable metadata needed to reconstruct the value
  • buffers: shared-memory buffer references backing the payload
Source code in brmspy/types/session.py
class PayloadRef(TypedDict):
    """
    Encoded argument/result payload sent over the control pipe.

    A payload is:

    - `codec`: the codec identifier used by the registry
    - `meta`: JSON-serializable metadata needed to reconstruct the value
    - `buffers`: shared-memory buffer references backing the payload
    """

    codec: str
    meta: dict[str, Any]
    buffers: list[ShmRef]

Attributes

codec instance-attribute
meta instance-attribute
buffers instance-attribute

Request

Bases: TypedDict

IPC request message sent from main process to worker.

Attributes:

Name Type Description
id str

Correlation id for the request/response pair.

cmd {'CALL', 'SHUTDOWN', 'PING', '_RUN_TEST_BY_NAME'}

Command type.

target str

Worker target spec (see _resolve_module_target()).

args, kwargs

Encoded arguments.

Source code in brmspy/types/session.py
class Request(TypedDict):
    """
    IPC request message sent from main process to worker.

    Attributes
    ----------
    id : str
        Correlation id for the request/response pair.
    cmd : {"CALL", "SHUTDOWN", "PING", "_RUN_TEST_BY_NAME"}
        Command type.
    target : str
        Worker target spec (see [`_resolve_module_target()`][brmspy._session.worker.worker._resolve_module_target]).
    args, kwargs
        Encoded arguments.
    """

    id: str
    cmd: CommandType
    target: str
    args: list[PayloadRef]
    kwargs: dict[str, PayloadRef]

Attributes

id instance-attribute
cmd instance-attribute
target instance-attribute
args instance-attribute
kwargs instance-attribute

Response

Bases: TypedDict

IPC response message sent from worker back to the main process.

Source code in brmspy/types/session.py
class Response(TypedDict):
    """
    IPC response message sent from worker back to the main process.
    """

    id: str
    ok: bool
    result: None | PayloadRef
    error: None | str
    traceback: None | str

Attributes

id instance-attribute
ok instance-attribute
result instance-attribute
error instance-attribute
traceback instance-attribute

ShmPool

Bases: ShmPool

Concrete shared-memory pool implementation that temporarily tracks attached blocks.

_blocks dict keeps references to shm buffers TEMPORARILY and is cleaned up before each 'responding to main' or 'sending new message to worker'. This allows the in-between processing of shm buffers to rely on the buffers not being garbage collected.

After reconstructing an object from a shm buffer, it's the CodecRegistrys role to take over the reference by initiating a weakref between the reconstructed object and buffer (or skipping if the object is temporary).

This helps ensure that a minimal amount of shm buffers are actively mapped and garbage collection can remove file descriptors no longer needed.

Source code in brmspy/_session/transport.py
class ShmPool(_ShmPool):
    """
    Concrete shared-memory pool implementation that temporarily tracks attached blocks.

    _blocks dict keeps references to shm buffers TEMPORARILY and is cleaned up
    before each 'responding to main' or 'sending new message to worker'. This
    allows the in-between processing of shm buffers to rely on the buffers not
    being garbage collected.

    After reconstructing an object from a shm buffer, it's the CodecRegistrys role
    to take over the reference by initiating a weakref between the reconstructed
    object and buffer (or skipping if the object is temporary).

    This helps ensure that a minimal amount of shm buffers are actively mapped
    and garbage collection can remove file descriptors no longer needed.
    """

    def __init__(self, manager: SharedMemoryManager) -> None:
        self._manager = manager
        self._blocks: dict[str, ShmBlock] = {}

    def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
        # print(f"alloc {'temp' if temporary else ''}")
        shm = self._manager.SharedMemory(size=size)
        block = ShmBlock(
            name=shm.name,
            size=shm.size,
            shm=shm,
            content_size=size,
            temporary=temporary,
        )
        self._blocks[block.name] = block
        return block

    def attach(self, ref: ShmRef) -> ShmBlock:
        if ref["name"] in self._blocks:
            return self._blocks[ref["name"]]
        shm = SharedMemory(name=ref["name"])
        block = ShmBlock(
            name=ref["name"],
            size=ref["size"],
            shm=shm,
            content_size=ref["content_size"],
            temporary=ref["temporary"],
        )
        self._blocks[ref["name"]] = block
        return block

    def close_all(self) -> None:
        for block in self._blocks.values():
            block.shm.close()
        self._blocks.clear()

    def gc(self, name: str | None = None):
        if name is not None:
            b = self._blocks.pop(name, None)
            if b is not None:
                b.shm.close()
            return

        for key in list(self._blocks.keys()):
            b = self._blocks[key]
            if b.temporary:
                b.shm.close()
            del self._blocks[key]

Attributes

_manager = manager instance-attribute
_blocks = {} instance-attribute

Functions

__init__(manager)
Source code in brmspy/_session/transport.py
def __init__(self, manager: SharedMemoryManager) -> None:
    self._manager = manager
    self._blocks: dict[str, ShmBlock] = {}
alloc(size, temporary=False)
Source code in brmspy/_session/transport.py
def alloc(self, size: int, temporary: bool = False) -> ShmBlock:
    # print(f"alloc {'temp' if temporary else ''}")
    shm = self._manager.SharedMemory(size=size)
    block = ShmBlock(
        name=shm.name,
        size=shm.size,
        shm=shm,
        content_size=size,
        temporary=temporary,
    )
    self._blocks[block.name] = block
    return block
attach(ref)
Source code in brmspy/_session/transport.py
def attach(self, ref: ShmRef) -> ShmBlock:
    if ref["name"] in self._blocks:
        return self._blocks[ref["name"]]
    shm = SharedMemory(name=ref["name"])
    block = ShmBlock(
        name=ref["name"],
        size=ref["size"],
        shm=shm,
        content_size=ref["content_size"],
        temporary=ref["temporary"],
    )
    self._blocks[ref["name"]] = block
    return block
close_all()
Source code in brmspy/_session/transport.py
def close_all(self) -> None:
    for block in self._blocks.values():
        block.shm.close()
    self._blocks.clear()
gc(name=None)
Source code in brmspy/_session/transport.py
def gc(self, name: str | None = None):
    if name is not None:
        b = self._blocks.pop(name, None)
        if b is not None:
            b.shm.close()
        return

    for key in list(self._blocks.keys()):
        b = self._blocks[key]
        if b.temporary:
            b.shm.close()
        del self._blocks[key]

ClassProxy

Class-like proxy that exposes only @staticmethod members of a surface class and executes them in the worker.

Worker target format: mod:{module_path}::{class_name}.{method_name}

Source code in brmspy/_session/session.py
class ClassProxy:
    """
    Class-like proxy that exposes only @staticmethod members of a surface class
    and executes them in the worker.

    Worker target format:
        mod:{module_path}::{class_name}.{method_name}
    """

    _INTERNAL = {
        "_session",
        "_surface_class",
        "_module_path",
        "_class_name",
        "_allowed",
        "_func_cache",
    }

    def __init__(
        self,
        *,
        session: "RModuleSession",
        surface_class: type,
        module_path: str,
        class_name: str,
    ) -> None:
        self._session = session
        self._surface_class = surface_class
        self._module_path = module_path
        self._class_name = class_name

        # Only expose names backed by `@staticmethod` descriptors.
        allowed: list[str] = []
        for k, v in getattr(surface_class, "__dict__", {}).items():
            if isinstance(v, staticmethod):
                allowed.append(k)
        self._allowed = tuple(sorted(set(allowed)))
        self._func_cache: dict[str, Callable[..., Any]] = {}

    def __getattribute__(self, name: str) -> Any:
        if name in ClassProxy._INTERNAL or (
            name.startswith("__") and name.endswith("__")
        ):
            return object.__getattribute__(self, name)

        allowed = object.__getattribute__(self, "_allowed")
        if name not in allowed:
            raise AttributeError(
                f"{self.__class__.__name__!r} has no attribute {name!r}"
            )

        func_cache = object.__getattribute__(self, "_func_cache")
        if name in func_cache:
            return func_cache[name]

        surface_class = object.__getattribute__(self, "_surface_class")
        raw = surface_class.__dict__.get(name)

        # We only allow staticmethod entries; enforce again defensively.
        if not isinstance(raw, staticmethod):
            raise AttributeError(f"{surface_class!r} has no staticmethod {name!r}")

        session = object.__getattribute__(self, "_session")
        module_path = object.__getattribute__(self, "_module_path")
        class_name = object.__getattribute__(self, "_class_name")

        # Grab underlying function only for metadata (__doc__/__name__)
        orig = raw.__func__

        def wrapper(*args, **kwargs):
            return session._call_remote(
                f"mod:{module_path}::{class_name}.{name}", *args, **kwargs
            )

        wrapper.__name__ = getattr(orig, "__name__", name)
        wrapper.__doc__ = getattr(orig, "__doc__", None)
        wrapper.__wrapped__ = orig  # type: ignore[attr-defined]

        func_cache[name] = wrapper
        return wrapper

    def __dir__(self):
        allowed = object.__getattribute__(self, "_allowed")
        return sorted(set(allowed))

    def __repr__(self) -> str:
        module_path = object.__getattribute__(self, "_module_path")
        class_name = object.__getattribute__(self, "_class_name")
        return f"<ClassProxy {module_path}::{class_name}>"

    @property
    def __all__(self) -> list[str]:
        return list(object.__getattribute__(self, "_allowed"))

Attributes

_INTERNAL = {'_session', '_surface_class', '_module_path', '_class_name', '_allowed', '_func_cache'} class-attribute instance-attribute
_session = session instance-attribute
_surface_class = surface_class instance-attribute
_module_path = module_path instance-attribute
_class_name = class_name instance-attribute
_allowed = tuple(sorted(set(allowed))) instance-attribute
_func_cache = {} instance-attribute
__all__ property

Functions

__init__(*, session, surface_class, module_path, class_name)
Source code in brmspy/_session/session.py
def __init__(
    self,
    *,
    session: "RModuleSession",
    surface_class: type,
    module_path: str,
    class_name: str,
) -> None:
    self._session = session
    self._surface_class = surface_class
    self._module_path = module_path
    self._class_name = class_name

    # Only expose names backed by `@staticmethod` descriptors.
    allowed: list[str] = []
    for k, v in getattr(surface_class, "__dict__", {}).items():
        if isinstance(v, staticmethod):
            allowed.append(k)
    self._allowed = tuple(sorted(set(allowed)))
    self._func_cache: dict[str, Callable[..., Any]] = {}
__getattribute__(name)
Source code in brmspy/_session/session.py
def __getattribute__(self, name: str) -> Any:
    if name in ClassProxy._INTERNAL or (
        name.startswith("__") and name.endswith("__")
    ):
        return object.__getattribute__(self, name)

    allowed = object.__getattribute__(self, "_allowed")
    if name not in allowed:
        raise AttributeError(
            f"{self.__class__.__name__!r} has no attribute {name!r}"
        )

    func_cache = object.__getattribute__(self, "_func_cache")
    if name in func_cache:
        return func_cache[name]

    surface_class = object.__getattribute__(self, "_surface_class")
    raw = surface_class.__dict__.get(name)

    # We only allow staticmethod entries; enforce again defensively.
    if not isinstance(raw, staticmethod):
        raise AttributeError(f"{surface_class!r} has no staticmethod {name!r}")

    session = object.__getattribute__(self, "_session")
    module_path = object.__getattribute__(self, "_module_path")
    class_name = object.__getattribute__(self, "_class_name")

    # Grab underlying function only for metadata (__doc__/__name__)
    orig = raw.__func__

    def wrapper(*args, **kwargs):
        return session._call_remote(
            f"mod:{module_path}::{class_name}.{name}", *args, **kwargs
        )

    wrapper.__name__ = getattr(orig, "__name__", name)
    wrapper.__doc__ = getattr(orig, "__doc__", None)
    wrapper.__wrapped__ = orig  # type: ignore[attr-defined]

    func_cache[name] = wrapper
    return wrapper
__dir__()
Source code in brmspy/_session/session.py
def __dir__(self):
    allowed = object.__getattribute__(self, "_allowed")
    return sorted(set(allowed))
__repr__()
Source code in brmspy/_session/session.py
def __repr__(self) -> str:
    module_path = object.__getattribute__(self, "_module_path")
    class_name = object.__getattribute__(self, "_class_name")
    return f"<ClassProxy {module_path}::{class_name}>"

RModuleSession

Bases: ModuleType

Module-like proxy that forwards attribute access and function calls to a worker process.

In the main process, brmspy.brms is an instance of this class wrapping the real worker-side module. Access patterns are:

  • Callables (functions) are wrapped so calling them performs an IPC roundtrip: encode args/kwargs → send request → run in worker → encode result → decode result.
  • Non-callables (constants, types) are mirrored directly from the wrapped module to keep dir() and IDE autocomplete useful.

This class also owns the worker lifecycle:

  • creates a SharedMemoryManager for large payload buffers
  • spawns the worker process using spawn semantics
  • bridges worker logging back into the parent's handlers via a log queue
  • performs a PING handshake to ensure the worker is ready before requests

Parameters:

Name Type Description Default
module ModuleType

The module object whose surface is mirrored in the main process. In practice this is the import of the worker-facing module (e.g. brmspy.brms._brms_module) but executed in the main process only for metadata / attribute discovery.

required
module_path str

Import path used inside the worker when resolving targets (e.g. "brmspy.brms").

required
environment_conf EnvironmentConfig | dict[str, Any] | None

Initial environment configuration for the worker. If omitted, brmspy will try to load default from the environment store.

None
Notes
  • The main process must not import rpy2.robjects; the worker owns all embedded-R state.
  • Any R objects returned by the worker are replaced with lightweight wrappers and can only be reattached inside the same worker process lifetime.
Source code in brmspy/_session/session.py
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
class RModuleSession(ModuleType):
    """
    Module-like proxy that forwards attribute access and function calls to a worker process.

    In the main process, `brmspy.brms` is an instance of this class wrapping the
    real worker-side module. Access patterns are:

    - **Callables** (functions) are wrapped so calling them performs an IPC roundtrip:
      encode args/kwargs → send request → run in worker → encode result → decode result.
    - **Non-callables** (constants, types) are mirrored directly from the wrapped module
      to keep `dir()` and IDE autocomplete useful.

    This class also owns the worker lifecycle:

    - creates a `SharedMemoryManager` for large payload buffers
    - spawns the worker process using spawn semantics
    - bridges worker logging back into the parent's handlers via a log queue
    - performs a `PING` handshake to ensure the worker is ready before requests

    Parameters
    ----------
    module : types.ModuleType
        The module object whose surface is mirrored in the main process. In practice
        this is the import of the worker-facing module (e.g. `brmspy.brms._brms_module`)
        but executed in the main process only for metadata / attribute discovery.
    module_path : str
        Import path used inside the worker when resolving targets (e.g. ``"brmspy.brms"``).
    environment_conf : brmspy.types.session.EnvironmentConfig | dict[str, Any] | None, optional
        Initial environment configuration for the worker. If omitted, brmspy will try
        to load `default` from the environment store.

    Notes
    -----
    - The main process must not import `rpy2.robjects`; the worker owns all embedded-R state.
    - Any R objects returned by the worker are replaced with lightweight wrappers and can
      only be reattached inside the same worker process lifetime.
    """

    _instances: weakref.WeakSet[RModuleSession] = weakref.WeakSet()
    _atexit_registered: bool = False
    _is_rsession: bool = True

    def __init__(
        self,
        module: ModuleType,
        module_path: str,
        environment_conf: EnvironmentConfig | dict[str, Any] | None = None,
    ) -> None:
        """
        Create a new session proxy and immediately start its worker.

        Parameters
        ----------
        module : types.ModuleType
            Wrapped module used for surface mirroring in the main process.
        module_path : str
            Worker import root used for resolving targets.
        environment_conf : brmspy.types.session.EnvironmentConfig | dict[str, Any] | None
            Initial worker environment configuration.

        Raises
        ------
        RuntimeError
            If the worker fails to start or does not respond to the startup handshake.
        """
        # Pretend to be the same module (for IDEs/docs)
        super().__init__(module.__name__, module.__doc__)

        if environment_conf is None:

            try:
                environment_conf = get_environment_config("default")
            except:

                pass

        # Store wrapped module and how to import it in worker
        self._module: ModuleType = module
        self._module_path: str = module_path
        self._environment_conf: EnvironmentConfig = EnvironmentConfig.from_obj(
            environment_conf
        )

        if "BRMSPY_AUTOLOAD" in self._environment_conf.env:
            del self._environment_conf.env["BRMSPY_AUTOLOAD"]

        # cache of Python wrappers for functions
        self._func_cache: dict[str, Callable[..., Any]] = {}

        # Disallow nested tooling contexts (manage/_build/etc)
        self._active_ctx: str | None = None

        self._closed = True

        # start SHM manager + worker
        self._setup_worker()

        # copy attributes so IDEs / dir() see the module surface
        self.__dict__.update(module.__dict__)

        # register for global cleanup at exit
        RModuleSession._instances.add(self)
        if not RModuleSession._atexit_registered:
            atexit.register(RModuleSession._cleanup_all)
            RModuleSession._atexit_registered = True

    def _setup_worker(self, autoload: bool = True) -> None:
        """
        Start the SHM manager + worker process and perform the startup handshake.

        Parameters
        ----------
        autoload : bool, default=True
            If True, sets `BRMSPY_AUTOLOAD=1` for the worker, allowing it to auto-activate
            the last configured runtime on startup. Context-managed tooling flows
            (e.g. `manage()`) typically start the worker with `autoload=False`.

        Raises
        ------
        RuntimeError
            If the worker fails to start within the handshake timeout or reports an init error.
        """

        mgr = SharedMemoryManager(ctx=ctx)
        mgr.start()

        mgr_address = mgr.address
        mgr_authkey = mgr._authkey  # type: ignore[attr-defined]

        parent_conn, child_conn = mp.Pipe()
        self._conn = parent_conn

        env_overrides: dict[str, str] = {
            "BRMSPY_WORKER": "1",
            **self._environment_conf.env,
        }
        # --- logging bridge: child -> parent ---
        self._log_queue: mp.Queue = ctx.Queue()

        # Use whatever handlers are currently on the root logger.
        root = logging.getLogger()
        self._log_listener = QueueListener(
            self._log_queue,
            *root.handlers,
            respect_handler_level=True,
        )
        self._log_listener.start()

        if autoload:
            env_overrides["BRMSPY_AUTOLOAD"] = "1"
        else:
            env_overrides["BRMSPY_AUTOLOAD"] = "0"

        # Spawn worker. Important: close our local copy of the child's end of the Pipe
        # after the process starts, otherwise each restart leaks file descriptors.
        proc = None
        try:
            proc = spawn_worker(
                target=worker_main,
                args=(child_conn, mgr_address, mgr_authkey, self._environment_conf),
                env_overrides=env_overrides,
                log_queue=self._log_queue,
            )
        finally:
            try:
                child_conn.close()
            except Exception:
                pass

        self._mgr = mgr
        self._proc = proc
        self._shm_pool = ShmPool(mgr)
        from .._singleton._shm_singleton import _set_shm

        _set_shm(self._shm_pool)

        self._reg = get_default_registry()
        self._closed = False

        # --- handshake: wait until worker is ready ---
        # This is MANDATORY. Unless we want zombies or race conditions.
        req_id = str(uuid.uuid4())
        self._conn.send(
            {
                "id": req_id,
                "cmd": "PING",
                "target": "",
                "args": [],
                "kwargs": {},
            }
        )
        if not self._conn.poll(30.0):
            # worker never replied -> treat as startup failure,
            # clean up and raise
            self._teardown_worker()
            raise RuntimeError("Worker failed to start within timeout")

        resp = self._conn.recv()
        if not resp.get("ok", False):
            self._teardown_worker()
            raise RuntimeError(f"Worker failed to initialize: {resp.get('error')}")

    def _teardown_worker(self) -> None:
        """
        Tear down worker process, SHM manager, and logging bridge.

        This is best-effort cleanup used by `shutdown()` and `restart()`:

        - sends `SHUTDOWN` (non-fatal if it fails)
        - stops the `QueueListener` for worker logging
        - shuts down the `SharedMemoryManager`
        - joins/terminates the worker if needed

        Notes
        -----
        The "join then terminate" sequence is intentional to avoid leaving zombie
        processes behind in interactive environments.
        """
        if self._closed:
            return

        # best-effort graceful shutdown
        try:
            if self._conn:
                req_id = str(uuid.uuid4())
                self._conn.send(
                    {
                        "id": req_id,
                        "cmd": "SHUTDOWN",
                        "target": "",
                        "args": [],
                        "kwargs": {},
                    }
                )
                # wait for ack, but don't block forever
                if self._conn.poll(5.0):
                    _ = self._conn.recv()
        except Exception:
            pass

        # close the pipe connection in this process (best-effort)
        try:
            conn = getattr(self, "_conn", None)
            if conn is not None:
                conn.close()
        except Exception:
            pass

        # stop logging listener
        try:
            listener = getattr(self, "_log_listener", None)
            if listener is not None:
                listener.stop()
        except Exception:
            pass

        # close the log queue to release its pipe FDs (important on macOS low ulimit)
        try:
            q = getattr(self, "_log_queue", None)
            if q is not None:
                try:
                    q.close()
                except Exception:
                    pass
                # Avoid potential hangs waiting for queue feeder threads in teardown paths.
                try:
                    q.cancel_join_thread()
                except Exception:
                    pass
        except Exception:
            pass

        # close any SHM blocks we have attached/allocated in this process
        try:
            pool = getattr(self, "_shm_pool", None)
            if pool is not None:
                pool.close_all()
            from .._singleton._shm_singleton import _set_shm

            _set_shm(None)
        except Exception:
            pass

        # shut down SHM manager
        try:
            self._mgr.shutdown()
        except Exception:
            pass

        # give worker a chance to exit, then kill it if needed
        # This is MANDATORY. Unless we want zombies or race conditions running amock
        try:
            if self._proc is not None:
                self._proc.join(timeout=5.0)
                if self._proc.is_alive():
                    self._proc.terminate()
                    self._proc.join(timeout=5.0)
        except Exception:
            pass

        self._closed = True

    # ----------------- global cleanup -----------------

    @classmethod
    def _cleanup_all(cls) -> None:
        """
        Atexit hook to shut down all live sessions.

        This is registered once for the class and iterates over a WeakSet of
        `RModuleSession` instances.
        """
        for inst in list(cls._instances):
            try:
                inst.shutdown()
            except Exception:
                pass

    # ----------------- IPC helpers --------------------

    def _encode_arg(self, obj: Any) -> PayloadRef:
        """
        Encode a single Python argument into an IPC payload dict.

        Parameters
        ----------
        obj : Any
            Value to encode.

        Returns
        -------
        dict[str, Any]
            A JSON-serializable structure containing:

            - `codec`: registry codec id
            - `meta`: codec metadata
            - `buffers`: list of SHM block references (`name`, `size`)
        """
        enc = self._reg.encode(obj, self._shm_pool)
        return {
            "codec": enc.codec,
            "meta": enc.meta,
            "buffers": enc.buffers,
        }

    def _decode_result(self, resp: Response) -> Any:
        """
        Decode a worker response into a Python value or raise.

        Parameters
        ----------
        resp : dict[str, Any]
            Response message from the worker.

        Returns
        -------
        Any
            Decoded Python object.

        Raises
        ------
        brmspy.types.errors.RSessionError
            If `resp["ok"]` is false. Includes best-effort remote traceback.
        """
        if not resp["ok"]:
            raise RSessionError(
                resp.get("error") or "Worker error",
                remote_traceback=resp.get("traceback"),
            )
        pres = resp["result"]
        if not pres:
            return None
        decoded = self._reg.decode(
            pres,
            shm_pool=self._shm_pool,
        )
        return decoded

    def _call_remote(self, func_name: str, *args: Any, **kwargs: Any) -> Any:
        """
        Perform a remote call in the worker.

        Parameters
        ----------
        func_name : str
            Function name or fully qualified target.

            - If it starts with ``"mod:"``, it is treated as a full worker target.
            - Otherwise it is resolved as a function on `self._module_path`.
        *args, **kwargs
            Call arguments, encoded via the session codec registry.

        Returns
        -------
        Any
            Decoded return value.

        Raises
        ------
        RuntimeError
            If the session has been shut down.
        brmspy.types.errors.RSessionError
            If the worker reports an error while executing the call.
        """
        if self._closed:
            raise RuntimeError("RModuleSession is closed")

        try:
            if func_name.startswith("mod:"):
                target = func_name
            else:
                target = f"mod:{self._module_path}.{func_name}"

            req_id = str(uuid.uuid4())
            req: Request = {
                "id": req_id,
                "cmd": "CALL",
                "target": target,
                "args": [self._encode_arg(a) for a in args],
                "kwargs": {k: self._encode_arg(v) for k, v in kwargs.items()},
            }
            self._conn.send(req)
            resp = self._conn.recv()
            decoded = self._decode_result(resp)

            try:
                # MUST be run after and never before decoding!
                if self._shm_pool:
                    self._shm_pool.gc()
            except:
                pass

            return decoded

        except (BrokenPipeError, ConnectionResetError, EOFError) as e:
            self._recover(e)

    def _recover(self, orig_exc: BaseException) -> None:
        logger = get_logger()

        logger.warning(
            "R worker crashed; attempting to start a new session...",
            exc_info=orig_exc,
        )

        try:
            # Best-effort shutdown; don't let this kill the recovery path
            try:
                self.shutdown()
            except Exception:
                logger.debug(
                    "Failed to cleanly shut down crashed worker",
                    exc_info=True,
                )

            # Restart must succeed or we bail
            self.restart(autoload=False)

        except Exception as restart_exc:
            # Recovery itself failed
            logger.error(
                "R worker crashed and automatic restart failed.",
                exc_info=restart_exc,
            )
            raise RWorkerCrashedError(
                "R worker crashed; failed to start new session.",
                recovered=False,
                cause=restart_exc,
            ) from restart_exc

        # Recovery succeeded, but the *call* that hit this still failed
        raise RWorkerCrashedError(
            "R worker crashed; started a fresh session. See __cause__ for details.",
            recovered=True,
            cause=orig_exc,
        ) from orig_exc

    # ----------------- attribute proxying --------------

    def __getattribute__(self, name: str) -> Any:
        """
        Proxy attribute access for a module-like experience.

        Rules
        -----
        - Internal attributes are handled locally.
        - Callables found on the wrapped module are returned as wrappers that call into the worker.
        - Non-callables are mirrored directly.
        """
        # 1. Always allow access to internal attributes via base implementation
        if name in _INTERNAL_ATTRS or name.startswith("__") and name.endswith("__"):
            return ModuleType.__getattribute__(self, name)

        # 2. If we already have a cached wrapper for this name, return it
        func_cache = ModuleType.__getattribute__(self, "_func_cache")
        if name in func_cache:
            return func_cache[name]

        module = ModuleType.__getattribute__(self, "_module")

        # 3. If wrapped module has this attribute, decide what to do
        if hasattr(module, name):
            attr = getattr(module, name)

            if callable(attr) and not inspect.isclass(attr):
                # wrap callables so they run in worker
                return self._get_or_create_wrapper(name, attr)

            # non-callables (constants, types, etc.) are just mirrored
            return attr

        # 4. Fallback: use normal module attribute resolution
        return ModuleType.__getattribute__(self, name)

    def _get_or_create_wrapper(
        self, name: str, orig: Callable[..., Any]
    ) -> Callable[..., Any]:
        """
        Return a cached worker-calling wrapper for a callable attribute.

        Parameters
        ----------
        name : str
            Attribute name on the wrapped module.
        orig : collections.abc.Callable[..., Any]
            Original callable (used for metadata only).

        Returns
        -------
        collections.abc.Callable[..., Any]
            Wrapper that performs an IPC roundtrip to execute the callable in the worker.
        """
        func_cache = ModuleType.__getattribute__(self, "_func_cache")
        if name in func_cache:
            return func_cache[name]

        def wrapper(*args: Any, **kwargs: Any):
            return self._call_remote(name, *args, **kwargs)

        wrapper.__name__ = getattr(orig, "__name__", name)
        wrapper.__doc__ = getattr(orig, "__doc__", None)
        wrapper.__wrapped__ = orig  # type: ignore[attr-defined]

        func_cache[name] = wrapper
        return wrapper

    def __dir__(self) -> list[str]:
        """Expose the merged surface of the proxy and the wrapped module."""
        module = ModuleType.__getattribute__(self, "_module")
        return sorted(set(self.__dict__) | set(dir(module)))

    # ----------------- lifetime ------------------------

    def shutdown(self) -> None:
        """Shut down the worker and related resources."""
        self._teardown_worker()

    def __del__(self) -> None:
        """Best-effort cleanup on GC; errors are suppressed."""
        try:
            self.shutdown()
        except Exception:
            pass

    def add_contextmanager(
        self,
        *,
        surface_class: type,
        surface_class_path: str,
    ):
        """
        Attach a class-based "surface" API as a context manager factory.

        This is used to implement `brmspy.brms.manage()` and `brmspy.brms._build()`
        without exposing worker internals to the main process.

        Parameters
        ----------
        surface_class : type
            Class whose `@staticmethod` members define the surface API available inside
            the context. Only staticmethod members are exposed.
        surface_class_path : str
            Fully qualified path like ``"pkg.module.ClassName"`` used for worker target
            resolution.

        Returns
        -------
        collections.abc.Callable[..., contextlib.AbstractContextManager]
            Factory that produces a context manager. On enter it restarts the worker
            (autoload disabled) and yields a `ClassProxy` for the surface class.

        Notes
        -----
        - Nesting contexts is forbidden; this is enforced via `self._active_ctx`.
        - On exit, the selected environment config is persisted via the environment store.
        """

        session = self

        if surface_class is None or surface_class_path is None:
            raise ValueError("surface_class and surface_class_path must both be set.")
        if "." not in surface_class_path:
            raise ValueError(
                "surface_class_path must look like 'pkg.module.ClassName'."
            )

        module_path, class_name = surface_class_path.rsplit(".", 1)
        ctx_label = f"{module_path}::{class_name}"

        class _Ctx:
            def __init__(
                self,
                *,
                environment_config: EnvironmentConfig | dict[str, str] | None = None,
                environment_name: str | None = None,
            ) -> None:
                self._environment_config = environment_config
                self._environment_name = environment_name
                self._new_conf: EnvironmentConfig | None = None

            def __enter__(self):
                if session._active_ctx is not None:
                    raise RuntimeError(
                        f"Nested brmspy contexts are not supported "
                        f"(active={session._active_ctx!r}, new={ctx_label!r})."
                    )
                try:
                    session._active_ctx = ctx_label

                    if self._environment_name and self._environment_config:
                        session._active_ctx = None
                        raise Exception(
                            "Only provide one: environment name or environment config"
                        )

                    if not self._environment_name and self._environment_config:
                        overrides = EnvironmentConfig.from_obj(self._environment_config)
                    elif self._environment_name:
                        overrides = get_environment_config(self._environment_name)
                    else:
                        overrides = None

                    old_conf = session._environment_conf
                    new_conf = overrides if overrides else old_conf
                    self._new_conf = new_conf

                    # fresh worker with new_conf
                    session.restart(environment_conf=new_conf, autoload=False)

                    return ClassProxy(
                        session=session,
                        surface_class=surface_class,
                        module_path=module_path,
                        class_name=class_name,
                    )
                except Exception as e:
                    session._active_ctx = None
                    raise e

            def __exit__(self, exc_type, exc, tb) -> None:
                try:
                    if self._new_conf is not None:
                        save(self._new_conf)
                        save_as_state(self._new_conf)
                finally:
                    session._active_ctx = None
                return None

        def factory(
            *,
            environment_config: EnvironmentConfig | dict[str, str] | None = None,
            environment_name: str | None = None,
        ):
            return _Ctx(
                environment_config=environment_config, environment_name=environment_name
            )

        return factory

    def restart(
        self,
        environment_conf: dict[str, Any] | EnvironmentConfig | None = None,
        autoload: bool = True,
        # empty_shm: bool = False,
    ) -> None:
        """
        Restart the worker process and SHM manager.

        Parameters
        ----------
        environment_conf : dict[str, Any] | brmspy.types.session.EnvironmentConfig | None
            If provided, replaces the existing environment configuration for the new worker.
        autoload : bool, default=True
            Whether to enable autoload for the new worker process.

        Notes
        -----
        This tears down the existing worker and starts a new one. Any previously
        returned R object wrappers are no longer reattachable after restart.
        """
        if environment_conf is not None:
            self._environment_conf = EnvironmentConfig.from_obj(environment_conf)

        # Tear down existing worker (if any)
        self._teardown_worker()

        self._func_cache.clear()

        # Start a fresh worker with current env conf
        self._setup_worker(autoload=autoload)

    def environment_exists(self, name: str) -> bool:
        """
        Check whether an environment with the given name exists on disk.

        Parameters
        ----------
        name : str
            Environment name.

        Returns
        -------
        bool
        """
        return get_environment_exists(name)

    def environment_activate(self, name: str) -> None:
        """
        Activate an environment by restarting the worker through `manage()`.

        Parameters
        ----------
        name : str
            Environment name to activate.

        Notes
        -----
        This is a convenience helper used by tests and developer flows.
        """
        manage = self.manage
        if manage:
            with manage(environment_name=name) as ctx:
                pass
        else:
            raise Exception("Invalid state. manage is not defined!")

    def _run_test_by_name(
        self, module_path: str, class_name: str | None, func_name: str
    ) -> Any:
        """
        Run a test identified by module/class/function inside the worker.

        Parameters
        ----------
        module_path : str
            Importable module path (e.g. ``"tests.test_file"``).
        class_name : str | None
            Optional class name if the test is a method.
        func_name : str
            Test function/method name.

        Returns
        -------
        Any
            Decoded return value from the test function.

        Raises
        ------
        RuntimeError
            If `BRMSPY_TEST=1` is not set.
        brmspy.types.errors.RSessionError
            If the worker reports a failure.
        """
        if os.getenv("BRMSPY_TEST") != "1":
            raise RuntimeError("BRMSPY_TEST=1 required for worker test execution")
        if self._closed:
            raise RuntimeError("Connection not open! Cant run test.")

        req_id = str(uuid.uuid4())
        self._conn.send(
            {
                "id": req_id,
                "cmd": "_RUN_TEST_BY_NAME",
                "target": "",
                "args": [],
                "kwargs": {
                    "module": module_path,
                    "class": class_name,
                    "func": func_name,
                },
            }
        )

        resp = self._conn.recv()

        if not resp.get("ok", False):
            raise RSessionError(
                resp.get("error", "Worker test failed"),
                remote_traceback=resp.get("traceback"),
            )

        pres = resp["result"]

        return self._reg.decode(
            pres,
            shm_pool=self._shm_pool,
        )

Attributes

_instances = weakref.WeakSet() class-attribute instance-attribute
_atexit_registered = False class-attribute instance-attribute
_is_rsession = True class-attribute instance-attribute
_module = module instance-attribute
_module_path = module_path instance-attribute
_environment_conf = EnvironmentConfig.from_obj(environment_conf) instance-attribute
_func_cache = {} instance-attribute
_active_ctx = None instance-attribute
_closed = True instance-attribute

Functions

__init__(module, module_path, environment_conf=None)

Create a new session proxy and immediately start its worker.

Parameters:

Name Type Description Default
module ModuleType

Wrapped module used for surface mirroring in the main process.

required
module_path str

Worker import root used for resolving targets.

required
environment_conf EnvironmentConfig | dict[str, Any] | None

Initial worker environment configuration.

None

Raises:

Type Description
RuntimeError

If the worker fails to start or does not respond to the startup handshake.

Source code in brmspy/_session/session.py
def __init__(
    self,
    module: ModuleType,
    module_path: str,
    environment_conf: EnvironmentConfig | dict[str, Any] | None = None,
) -> None:
    """
    Create a new session proxy and immediately start its worker.

    Parameters
    ----------
    module : types.ModuleType
        Wrapped module used for surface mirroring in the main process.
    module_path : str
        Worker import root used for resolving targets.
    environment_conf : brmspy.types.session.EnvironmentConfig | dict[str, Any] | None
        Initial worker environment configuration.

    Raises
    ------
    RuntimeError
        If the worker fails to start or does not respond to the startup handshake.
    """
    # Pretend to be the same module (for IDEs/docs)
    super().__init__(module.__name__, module.__doc__)

    if environment_conf is None:

        try:
            environment_conf = get_environment_config("default")
        except:

            pass

    # Store wrapped module and how to import it in worker
    self._module: ModuleType = module
    self._module_path: str = module_path
    self._environment_conf: EnvironmentConfig = EnvironmentConfig.from_obj(
        environment_conf
    )

    if "BRMSPY_AUTOLOAD" in self._environment_conf.env:
        del self._environment_conf.env["BRMSPY_AUTOLOAD"]

    # cache of Python wrappers for functions
    self._func_cache: dict[str, Callable[..., Any]] = {}

    # Disallow nested tooling contexts (manage/_build/etc)
    self._active_ctx: str | None = None

    self._closed = True

    # start SHM manager + worker
    self._setup_worker()

    # copy attributes so IDEs / dir() see the module surface
    self.__dict__.update(module.__dict__)

    # register for global cleanup at exit
    RModuleSession._instances.add(self)
    if not RModuleSession._atexit_registered:
        atexit.register(RModuleSession._cleanup_all)
        RModuleSession._atexit_registered = True
_setup_worker(autoload=True)

Start the SHM manager + worker process and perform the startup handshake.

Parameters:

Name Type Description Default
autoload bool

If True, sets BRMSPY_AUTOLOAD=1 for the worker, allowing it to auto-activate the last configured runtime on startup. Context-managed tooling flows (e.g. manage()) typically start the worker with autoload=False.

True

Raises:

Type Description
RuntimeError

If the worker fails to start within the handshake timeout or reports an init error.

Source code in brmspy/_session/session.py
def _setup_worker(self, autoload: bool = True) -> None:
    """
    Start the SHM manager + worker process and perform the startup handshake.

    Parameters
    ----------
    autoload : bool, default=True
        If True, sets `BRMSPY_AUTOLOAD=1` for the worker, allowing it to auto-activate
        the last configured runtime on startup. Context-managed tooling flows
        (e.g. `manage()`) typically start the worker with `autoload=False`.

    Raises
    ------
    RuntimeError
        If the worker fails to start within the handshake timeout or reports an init error.
    """

    mgr = SharedMemoryManager(ctx=ctx)
    mgr.start()

    mgr_address = mgr.address
    mgr_authkey = mgr._authkey  # type: ignore[attr-defined]

    parent_conn, child_conn = mp.Pipe()
    self._conn = parent_conn

    env_overrides: dict[str, str] = {
        "BRMSPY_WORKER": "1",
        **self._environment_conf.env,
    }
    # --- logging bridge: child -> parent ---
    self._log_queue: mp.Queue = ctx.Queue()

    # Use whatever handlers are currently on the root logger.
    root = logging.getLogger()
    self._log_listener = QueueListener(
        self._log_queue,
        *root.handlers,
        respect_handler_level=True,
    )
    self._log_listener.start()

    if autoload:
        env_overrides["BRMSPY_AUTOLOAD"] = "1"
    else:
        env_overrides["BRMSPY_AUTOLOAD"] = "0"

    # Spawn worker. Important: close our local copy of the child's end of the Pipe
    # after the process starts, otherwise each restart leaks file descriptors.
    proc = None
    try:
        proc = spawn_worker(
            target=worker_main,
            args=(child_conn, mgr_address, mgr_authkey, self._environment_conf),
            env_overrides=env_overrides,
            log_queue=self._log_queue,
        )
    finally:
        try:
            child_conn.close()
        except Exception:
            pass

    self._mgr = mgr
    self._proc = proc
    self._shm_pool = ShmPool(mgr)
    from .._singleton._shm_singleton import _set_shm

    _set_shm(self._shm_pool)

    self._reg = get_default_registry()
    self._closed = False

    # --- handshake: wait until worker is ready ---
    # This is MANDATORY. Unless we want zombies or race conditions.
    req_id = str(uuid.uuid4())
    self._conn.send(
        {
            "id": req_id,
            "cmd": "PING",
            "target": "",
            "args": [],
            "kwargs": {},
        }
    )
    if not self._conn.poll(30.0):
        # worker never replied -> treat as startup failure,
        # clean up and raise
        self._teardown_worker()
        raise RuntimeError("Worker failed to start within timeout")

    resp = self._conn.recv()
    if not resp.get("ok", False):
        self._teardown_worker()
        raise RuntimeError(f"Worker failed to initialize: {resp.get('error')}")
_teardown_worker()

Tear down worker process, SHM manager, and logging bridge.

This is best-effort cleanup used by shutdown() and restart():

  • sends SHUTDOWN (non-fatal if it fails)
  • stops the QueueListener for worker logging
  • shuts down the SharedMemoryManager
  • joins/terminates the worker if needed
Notes

The "join then terminate" sequence is intentional to avoid leaving zombie processes behind in interactive environments.

Source code in brmspy/_session/session.py
def _teardown_worker(self) -> None:
    """
    Tear down worker process, SHM manager, and logging bridge.

    This is best-effort cleanup used by `shutdown()` and `restart()`:

    - sends `SHUTDOWN` (non-fatal if it fails)
    - stops the `QueueListener` for worker logging
    - shuts down the `SharedMemoryManager`
    - joins/terminates the worker if needed

    Notes
    -----
    The "join then terminate" sequence is intentional to avoid leaving zombie
    processes behind in interactive environments.
    """
    if self._closed:
        return

    # best-effort graceful shutdown
    try:
        if self._conn:
            req_id = str(uuid.uuid4())
            self._conn.send(
                {
                    "id": req_id,
                    "cmd": "SHUTDOWN",
                    "target": "",
                    "args": [],
                    "kwargs": {},
                }
            )
            # wait for ack, but don't block forever
            if self._conn.poll(5.0):
                _ = self._conn.recv()
    except Exception:
        pass

    # close the pipe connection in this process (best-effort)
    try:
        conn = getattr(self, "_conn", None)
        if conn is not None:
            conn.close()
    except Exception:
        pass

    # stop logging listener
    try:
        listener = getattr(self, "_log_listener", None)
        if listener is not None:
            listener.stop()
    except Exception:
        pass

    # close the log queue to release its pipe FDs (important on macOS low ulimit)
    try:
        q = getattr(self, "_log_queue", None)
        if q is not None:
            try:
                q.close()
            except Exception:
                pass
            # Avoid potential hangs waiting for queue feeder threads in teardown paths.
            try:
                q.cancel_join_thread()
            except Exception:
                pass
    except Exception:
        pass

    # close any SHM blocks we have attached/allocated in this process
    try:
        pool = getattr(self, "_shm_pool", None)
        if pool is not None:
            pool.close_all()
        from .._singleton._shm_singleton import _set_shm

        _set_shm(None)
    except Exception:
        pass

    # shut down SHM manager
    try:
        self._mgr.shutdown()
    except Exception:
        pass

    # give worker a chance to exit, then kill it if needed
    # This is MANDATORY. Unless we want zombies or race conditions running amock
    try:
        if self._proc is not None:
            self._proc.join(timeout=5.0)
            if self._proc.is_alive():
                self._proc.terminate()
                self._proc.join(timeout=5.0)
    except Exception:
        pass

    self._closed = True
_cleanup_all() classmethod

Atexit hook to shut down all live sessions.

This is registered once for the class and iterates over a WeakSet of RModuleSession instances.

Source code in brmspy/_session/session.py
@classmethod
def _cleanup_all(cls) -> None:
    """
    Atexit hook to shut down all live sessions.

    This is registered once for the class and iterates over a WeakSet of
    `RModuleSession` instances.
    """
    for inst in list(cls._instances):
        try:
            inst.shutdown()
        except Exception:
            pass
_encode_arg(obj)

Encode a single Python argument into an IPC payload dict.

Parameters:

Name Type Description Default
obj Any

Value to encode.

required

Returns:

Type Description
dict[str, Any]

A JSON-serializable structure containing:

  • codec: registry codec id
  • meta: codec metadata
  • buffers: list of SHM block references (name, size)
Source code in brmspy/_session/session.py
def _encode_arg(self, obj: Any) -> PayloadRef:
    """
    Encode a single Python argument into an IPC payload dict.

    Parameters
    ----------
    obj : Any
        Value to encode.

    Returns
    -------
    dict[str, Any]
        A JSON-serializable structure containing:

        - `codec`: registry codec id
        - `meta`: codec metadata
        - `buffers`: list of SHM block references (`name`, `size`)
    """
    enc = self._reg.encode(obj, self._shm_pool)
    return {
        "codec": enc.codec,
        "meta": enc.meta,
        "buffers": enc.buffers,
    }
_decode_result(resp)

Decode a worker response into a Python value or raise.

Parameters:

Name Type Description Default
resp dict[str, Any]

Response message from the worker.

required

Returns:

Type Description
Any

Decoded Python object.

Raises:

Type Description
RSessionError

If resp["ok"] is false. Includes best-effort remote traceback.

Source code in brmspy/_session/session.py
def _decode_result(self, resp: Response) -> Any:
    """
    Decode a worker response into a Python value or raise.

    Parameters
    ----------
    resp : dict[str, Any]
        Response message from the worker.

    Returns
    -------
    Any
        Decoded Python object.

    Raises
    ------
    brmspy.types.errors.RSessionError
        If `resp["ok"]` is false. Includes best-effort remote traceback.
    """
    if not resp["ok"]:
        raise RSessionError(
            resp.get("error") or "Worker error",
            remote_traceback=resp.get("traceback"),
        )
    pres = resp["result"]
    if not pres:
        return None
    decoded = self._reg.decode(
        pres,
        shm_pool=self._shm_pool,
    )
    return decoded
_call_remote(func_name, *args, **kwargs)

Perform a remote call in the worker.

Parameters:

Name Type Description Default
func_name str

Function name or fully qualified target.

  • If it starts with "mod:", it is treated as a full worker target.
  • Otherwise it is resolved as a function on self._module_path.
required
*args Any

Call arguments, encoded via the session codec registry.

()
**kwargs Any

Call arguments, encoded via the session codec registry.

()

Returns:

Type Description
Any

Decoded return value.

Raises:

Type Description
RuntimeError

If the session has been shut down.

RSessionError

If the worker reports an error while executing the call.

Source code in brmspy/_session/session.py
def _call_remote(self, func_name: str, *args: Any, **kwargs: Any) -> Any:
    """
    Perform a remote call in the worker.

    Parameters
    ----------
    func_name : str
        Function name or fully qualified target.

        - If it starts with ``"mod:"``, it is treated as a full worker target.
        - Otherwise it is resolved as a function on `self._module_path`.
    *args, **kwargs
        Call arguments, encoded via the session codec registry.

    Returns
    -------
    Any
        Decoded return value.

    Raises
    ------
    RuntimeError
        If the session has been shut down.
    brmspy.types.errors.RSessionError
        If the worker reports an error while executing the call.
    """
    if self._closed:
        raise RuntimeError("RModuleSession is closed")

    try:
        if func_name.startswith("mod:"):
            target = func_name
        else:
            target = f"mod:{self._module_path}.{func_name}"

        req_id = str(uuid.uuid4())
        req: Request = {
            "id": req_id,
            "cmd": "CALL",
            "target": target,
            "args": [self._encode_arg(a) for a in args],
            "kwargs": {k: self._encode_arg(v) for k, v in kwargs.items()},
        }
        self._conn.send(req)
        resp = self._conn.recv()
        decoded = self._decode_result(resp)

        try:
            # MUST be run after and never before decoding!
            if self._shm_pool:
                self._shm_pool.gc()
        except:
            pass

        return decoded

    except (BrokenPipeError, ConnectionResetError, EOFError) as e:
        self._recover(e)
_recover(orig_exc)
Source code in brmspy/_session/session.py
def _recover(self, orig_exc: BaseException) -> None:
    logger = get_logger()

    logger.warning(
        "R worker crashed; attempting to start a new session...",
        exc_info=orig_exc,
    )

    try:
        # Best-effort shutdown; don't let this kill the recovery path
        try:
            self.shutdown()
        except Exception:
            logger.debug(
                "Failed to cleanly shut down crashed worker",
                exc_info=True,
            )

        # Restart must succeed or we bail
        self.restart(autoload=False)

    except Exception as restart_exc:
        # Recovery itself failed
        logger.error(
            "R worker crashed and automatic restart failed.",
            exc_info=restart_exc,
        )
        raise RWorkerCrashedError(
            "R worker crashed; failed to start new session.",
            recovered=False,
            cause=restart_exc,
        ) from restart_exc

    # Recovery succeeded, but the *call* that hit this still failed
    raise RWorkerCrashedError(
        "R worker crashed; started a fresh session. See __cause__ for details.",
        recovered=True,
        cause=orig_exc,
    ) from orig_exc
__getattribute__(name)

Proxy attribute access for a module-like experience.

Rules
  • Internal attributes are handled locally.
  • Callables found on the wrapped module are returned as wrappers that call into the worker.
  • Non-callables are mirrored directly.
Source code in brmspy/_session/session.py
def __getattribute__(self, name: str) -> Any:
    """
    Proxy attribute access for a module-like experience.

    Rules
    -----
    - Internal attributes are handled locally.
    - Callables found on the wrapped module are returned as wrappers that call into the worker.
    - Non-callables are mirrored directly.
    """
    # 1. Always allow access to internal attributes via base implementation
    if name in _INTERNAL_ATTRS or name.startswith("__") and name.endswith("__"):
        return ModuleType.__getattribute__(self, name)

    # 2. If we already have a cached wrapper for this name, return it
    func_cache = ModuleType.__getattribute__(self, "_func_cache")
    if name in func_cache:
        return func_cache[name]

    module = ModuleType.__getattribute__(self, "_module")

    # 3. If wrapped module has this attribute, decide what to do
    if hasattr(module, name):
        attr = getattr(module, name)

        if callable(attr) and not inspect.isclass(attr):
            # wrap callables so they run in worker
            return self._get_or_create_wrapper(name, attr)

        # non-callables (constants, types, etc.) are just mirrored
        return attr

    # 4. Fallback: use normal module attribute resolution
    return ModuleType.__getattribute__(self, name)
_get_or_create_wrapper(name, orig)

Return a cached worker-calling wrapper for a callable attribute.

Parameters:

Name Type Description Default
name str

Attribute name on the wrapped module.

required
orig Callable[..., Any]

Original callable (used for metadata only).

required

Returns:

Type Description
Callable[..., Any]

Wrapper that performs an IPC roundtrip to execute the callable in the worker.

Source code in brmspy/_session/session.py
def _get_or_create_wrapper(
    self, name: str, orig: Callable[..., Any]
) -> Callable[..., Any]:
    """
    Return a cached worker-calling wrapper for a callable attribute.

    Parameters
    ----------
    name : str
        Attribute name on the wrapped module.
    orig : collections.abc.Callable[..., Any]
        Original callable (used for metadata only).

    Returns
    -------
    collections.abc.Callable[..., Any]
        Wrapper that performs an IPC roundtrip to execute the callable in the worker.
    """
    func_cache = ModuleType.__getattribute__(self, "_func_cache")
    if name in func_cache:
        return func_cache[name]

    def wrapper(*args: Any, **kwargs: Any):
        return self._call_remote(name, *args, **kwargs)

    wrapper.__name__ = getattr(orig, "__name__", name)
    wrapper.__doc__ = getattr(orig, "__doc__", None)
    wrapper.__wrapped__ = orig  # type: ignore[attr-defined]

    func_cache[name] = wrapper
    return wrapper
__dir__()

Expose the merged surface of the proxy and the wrapped module.

Source code in brmspy/_session/session.py
def __dir__(self) -> list[str]:
    """Expose the merged surface of the proxy and the wrapped module."""
    module = ModuleType.__getattribute__(self, "_module")
    return sorted(set(self.__dict__) | set(dir(module)))
shutdown()

Shut down the worker and related resources.

Source code in brmspy/_session/session.py
def shutdown(self) -> None:
    """Shut down the worker and related resources."""
    self._teardown_worker()
__del__()

Best-effort cleanup on GC; errors are suppressed.

Source code in brmspy/_session/session.py
def __del__(self) -> None:
    """Best-effort cleanup on GC; errors are suppressed."""
    try:
        self.shutdown()
    except Exception:
        pass
add_contextmanager(*, surface_class, surface_class_path)

Attach a class-based "surface" API as a context manager factory.

This is used to implement brmspy.brms.manage() and brmspy.brms._build() without exposing worker internals to the main process.

Parameters:

Name Type Description Default
surface_class type

Class whose @staticmethod members define the surface API available inside the context. Only staticmethod members are exposed.

required
surface_class_path str

Fully qualified path like "pkg.module.ClassName" used for worker target resolution.

required

Returns:

Type Description
Callable[..., AbstractContextManager]

Factory that produces a context manager. On enter it restarts the worker (autoload disabled) and yields a ClassProxy for the surface class.

Notes
  • Nesting contexts is forbidden; this is enforced via self._active_ctx.
  • On exit, the selected environment config is persisted via the environment store.
Source code in brmspy/_session/session.py
def add_contextmanager(
    self,
    *,
    surface_class: type,
    surface_class_path: str,
):
    """
    Attach a class-based "surface" API as a context manager factory.

    This is used to implement `brmspy.brms.manage()` and `brmspy.brms._build()`
    without exposing worker internals to the main process.

    Parameters
    ----------
    surface_class : type
        Class whose `@staticmethod` members define the surface API available inside
        the context. Only staticmethod members are exposed.
    surface_class_path : str
        Fully qualified path like ``"pkg.module.ClassName"`` used for worker target
        resolution.

    Returns
    -------
    collections.abc.Callable[..., contextlib.AbstractContextManager]
        Factory that produces a context manager. On enter it restarts the worker
        (autoload disabled) and yields a `ClassProxy` for the surface class.

    Notes
    -----
    - Nesting contexts is forbidden; this is enforced via `self._active_ctx`.
    - On exit, the selected environment config is persisted via the environment store.
    """

    session = self

    if surface_class is None or surface_class_path is None:
        raise ValueError("surface_class and surface_class_path must both be set.")
    if "." not in surface_class_path:
        raise ValueError(
            "surface_class_path must look like 'pkg.module.ClassName'."
        )

    module_path, class_name = surface_class_path.rsplit(".", 1)
    ctx_label = f"{module_path}::{class_name}"

    class _Ctx:
        def __init__(
            self,
            *,
            environment_config: EnvironmentConfig | dict[str, str] | None = None,
            environment_name: str | None = None,
        ) -> None:
            self._environment_config = environment_config
            self._environment_name = environment_name
            self._new_conf: EnvironmentConfig | None = None

        def __enter__(self):
            if session._active_ctx is not None:
                raise RuntimeError(
                    f"Nested brmspy contexts are not supported "
                    f"(active={session._active_ctx!r}, new={ctx_label!r})."
                )
            try:
                session._active_ctx = ctx_label

                if self._environment_name and self._environment_config:
                    session._active_ctx = None
                    raise Exception(
                        "Only provide one: environment name or environment config"
                    )

                if not self._environment_name and self._environment_config:
                    overrides = EnvironmentConfig.from_obj(self._environment_config)
                elif self._environment_name:
                    overrides = get_environment_config(self._environment_name)
                else:
                    overrides = None

                old_conf = session._environment_conf
                new_conf = overrides if overrides else old_conf
                self._new_conf = new_conf

                # fresh worker with new_conf
                session.restart(environment_conf=new_conf, autoload=False)

                return ClassProxy(
                    session=session,
                    surface_class=surface_class,
                    module_path=module_path,
                    class_name=class_name,
                )
            except Exception as e:
                session._active_ctx = None
                raise e

        def __exit__(self, exc_type, exc, tb) -> None:
            try:
                if self._new_conf is not None:
                    save(self._new_conf)
                    save_as_state(self._new_conf)
            finally:
                session._active_ctx = None
            return None

    def factory(
        *,
        environment_config: EnvironmentConfig | dict[str, str] | None = None,
        environment_name: str | None = None,
    ):
        return _Ctx(
            environment_config=environment_config, environment_name=environment_name
        )

    return factory
restart(environment_conf=None, autoload=True)

Restart the worker process and SHM manager.

Parameters:

Name Type Description Default
environment_conf dict[str, Any] | EnvironmentConfig | None

If provided, replaces the existing environment configuration for the new worker.

None
autoload bool

Whether to enable autoload for the new worker process.

True
Notes

This tears down the existing worker and starts a new one. Any previously returned R object wrappers are no longer reattachable after restart.

Source code in brmspy/_session/session.py
def restart(
    self,
    environment_conf: dict[str, Any] | EnvironmentConfig | None = None,
    autoload: bool = True,
    # empty_shm: bool = False,
) -> None:
    """
    Restart the worker process and SHM manager.

    Parameters
    ----------
    environment_conf : dict[str, Any] | brmspy.types.session.EnvironmentConfig | None
        If provided, replaces the existing environment configuration for the new worker.
    autoload : bool, default=True
        Whether to enable autoload for the new worker process.

    Notes
    -----
    This tears down the existing worker and starts a new one. Any previously
    returned R object wrappers are no longer reattachable after restart.
    """
    if environment_conf is not None:
        self._environment_conf = EnvironmentConfig.from_obj(environment_conf)

    # Tear down existing worker (if any)
    self._teardown_worker()

    self._func_cache.clear()

    # Start a fresh worker with current env conf
    self._setup_worker(autoload=autoload)
environment_exists(name)

Check whether an environment with the given name exists on disk.

Parameters:

Name Type Description Default
name str

Environment name.

required

Returns:

Type Description
bool
Source code in brmspy/_session/session.py
def environment_exists(self, name: str) -> bool:
    """
    Check whether an environment with the given name exists on disk.

    Parameters
    ----------
    name : str
        Environment name.

    Returns
    -------
    bool
    """
    return get_environment_exists(name)
environment_activate(name)

Activate an environment by restarting the worker through manage().

Parameters:

Name Type Description Default
name str

Environment name to activate.

required
Notes

This is a convenience helper used by tests and developer flows.

Source code in brmspy/_session/session.py
def environment_activate(self, name: str) -> None:
    """
    Activate an environment by restarting the worker through `manage()`.

    Parameters
    ----------
    name : str
        Environment name to activate.

    Notes
    -----
    This is a convenience helper used by tests and developer flows.
    """
    manage = self.manage
    if manage:
        with manage(environment_name=name) as ctx:
            pass
    else:
        raise Exception("Invalid state. manage is not defined!")
_run_test_by_name(module_path, class_name, func_name)

Run a test identified by module/class/function inside the worker.

Parameters:

Name Type Description Default
module_path str

Importable module path (e.g. "tests.test_file").

required
class_name str | None

Optional class name if the test is a method.

required
func_name str

Test function/method name.

required

Returns:

Type Description
Any

Decoded return value from the test function.

Raises:

Type Description
RuntimeError

If BRMSPY_TEST=1 is not set.

RSessionError

If the worker reports a failure.

Source code in brmspy/_session/session.py
def _run_test_by_name(
    self, module_path: str, class_name: str | None, func_name: str
) -> Any:
    """
    Run a test identified by module/class/function inside the worker.

    Parameters
    ----------
    module_path : str
        Importable module path (e.g. ``"tests.test_file"``).
    class_name : str | None
        Optional class name if the test is a method.
    func_name : str
        Test function/method name.

    Returns
    -------
    Any
        Decoded return value from the test function.

    Raises
    ------
    RuntimeError
        If `BRMSPY_TEST=1` is not set.
    brmspy.types.errors.RSessionError
        If the worker reports a failure.
    """
    if os.getenv("BRMSPY_TEST") != "1":
        raise RuntimeError("BRMSPY_TEST=1 required for worker test execution")
    if self._closed:
        raise RuntimeError("Connection not open! Cant run test.")

    req_id = str(uuid.uuid4())
    self._conn.send(
        {
            "id": req_id,
            "cmd": "_RUN_TEST_BY_NAME",
            "target": "",
            "args": [],
            "kwargs": {
                "module": module_path,
                "class": class_name,
                "func": func_name,
            },
        }
    )

    resp = self._conn.recv()

    if not resp.get("ok", False):
        raise RSessionError(
            resp.get("error", "Worker test failed"),
            remote_traceback=resp.get("traceback"),
        )

    pres = resp["result"]

    return self._reg.decode(
        pres,
        shm_pool=self._shm_pool,
    )

Functions

get_environment_config(name)

Load an environment configuration from disk.

Parameters:

Name Type Description Default
name str

Environment name.

required

Returns:

Type Description
EnvironmentConfig

Loaded configuration. If no config file exists, returns a default config with environment_name=name.

Source code in brmspy/_session/environment.py
def get_environment_config(name: str) -> EnvironmentConfig:
    """
    Load an environment configuration from disk.

    Parameters
    ----------
    name : str
        Environment name.

    Returns
    -------
    brmspy.types.session.EnvironmentConfig
        Loaded configuration. If no config file exists, returns a default config
        with `environment_name=name`.
    """
    base_dir = get_environment_base_dir()
    env_dir = base_dir / name
    config_dir = env_dir / "config.json"

    if not config_dir.exists():
        return EnvironmentConfig(environment_name=name)

    with open(config_dir) as f:
        data = json.load(f)
        return EnvironmentConfig.from_dict(data)

get_environment_exists(name)

Return True if an environment exists (determined by presence of config.json).

Source code in brmspy/_session/environment.py
def get_environment_exists(name: str) -> bool:
    """
    Return True if an environment exists (determined by presence of `config.json`).
    """
    base_dir = get_environment_base_dir()
    env_dir = base_dir / name
    config_dir = env_dir / "config.json"

    return config_dir.exists()

save(env_conf)

Persist an environment configuration and ensure the directory structure exists.

Parameters:

Name Type Description Default
env_conf EnvironmentConfig

Environment configuration to write.

required
Source code in brmspy/_session/environment_parent.py
def save(env_conf: EnvironmentConfig) -> None:
    """
    Persist an environment configuration and ensure the directory structure exists.

    Parameters
    ----------
    env_conf : brmspy.types.session.EnvironmentConfig
        Environment configuration to write.
    """
    base_dir = get_environment_base_dir()
    env_dir = base_dir / env_conf.environment_name
    env_rlib_dir = get_environment_userlibs_dir(name=env_conf.environment_name)
    config_dir = env_dir / "config.json"
    os.makedirs(env_dir, exist_ok=True)
    os.makedirs(env_rlib_dir, exist_ok=True)

    if "BRMSPY_AUTOLOAD" in env_conf.env:
        del env_conf.env["BRMSPY_AUTOLOAD"]

    with open(config_dir, "w", encoding="utf-8") as f:
        json.dump(env_conf.to_dict(), f, indent=2, ensure_ascii=False)

save_as_state(env_conf)

Record the active environment name in environment_state.json.

Parameters:

Name Type Description Default
env_conf EnvironmentConfig

Environment configuration whose name should be recorded.

required
Source code in brmspy/_session/environment_parent.py
def save_as_state(env_conf: EnvironmentConfig) -> None:
    """
    Record the active environment name in `environment_state.json`.

    Parameters
    ----------
    env_conf : brmspy.types.session.EnvironmentConfig
        Environment configuration whose name should be recorded.
    """
    state_path = get_environments_state_path()
    with open(state_path, "w", encoding="utf-8") as f:
        json.dump(
            {"active": env_conf.environment_name}, f, indent=2, ensure_ascii=False
        )

get_logger()

Get or create the brmspy logger instance.

Returns a configured logger with a custom formatter that outputs messages in the format: [brmspy][method_name] msg here

Returns:

Type Description
Logger

Configured brmspy logger instance

Examples:

>>> from brmspy.helpers.log import get_logger
>>> logger = get_logger()
>>> logger.info("Starting process")  # Prints: [brmspy][<module>] Starting process
Source code in brmspy/helpers/log.py
def get_logger() -> logging.Logger:
    """
    Get or create the brmspy logger instance.

    Returns a configured logger with a custom formatter that outputs
    messages in the format: `[brmspy][method_name] msg here`

    Returns
    -------
    logging.Logger
        Configured brmspy logger instance

    Examples
    --------
    >>> from brmspy.helpers.log import get_logger
    >>> logger = get_logger()
    >>> logger.info("Starting process")  # Prints: [brmspy][<module>] Starting process
    """
    global _logger

    if _logger is None:
        _logger = logging.getLogger("brmspy")
        _logger.setLevel(logging.INFO)

        if not _logger.handlers:
            # Handler for "normal" logs
            normal_handler = logging.StreamHandler()
            normal_handler.setFormatter(BrmspyFormatter())
            normal_handler.addFilter(NonPrintFilter())
            _logger.addHandler(normal_handler)

            # print logs: preserve control chars and explicit \n/\r
            print_handler = logging.StreamHandler()
            print_handler.setFormatter(logging.Formatter("%(message)s"))
            print_handler.addFilter(PrintOnlyFilter())
            print_handler.terminator = ""
            _logger.addHandler(print_handler)

        if _running_under_pytest():
            _logger.propagate = True
        else:
            _logger.propagate = False

    return _logger

log_warning(msg, method_name=None)

Log a warning message.

Parameters:

Name Type Description Default
msg str

The warning message to log

required
method_name str

The name of the method/function. If None, will auto-detect from call stack.

None
Source code in brmspy/helpers/log.py
def log_warning(msg: str, method_name: str | None = None):
    """
    Log a warning message.

    Parameters
    ----------
    msg : str
        The warning message to log
    method_name : str, optional
        The name of the method/function. If None, will auto-detect from call stack.

    """
    log(msg, method_name=method_name, level=logging.WARNING)

get_default_registry()

Return the process-global default codec registry.

Returns:

Type Description
CodecRegistry

Registry with SHM-first codecs registered, plus a pickle fallback.

Source code in brmspy/_session/codec/registry.py
def get_default_registry() -> CodecRegistry:
    """
    Return the process-global default codec registry.

    Returns
    -------
    brmspy._session.codec.base.CodecRegistry
        Registry with SHM-first codecs registered, plus a pickle fallback.
    """
    global _default_registry
    if _default_registry is None:
        reg = CodecRegistry()
        reg.register(NumpyArrayCodec())
        reg.register(InferenceDataCodec())
        reg.register(PandasDFCodec())

        register_dataclasses(reg)

        # MUST BE LAST
        reg.register(PickleCodec())

        _default_registry = reg
    return _default_registry

worker_main(conn, mgr_address, mgr_authkey, runtime_conf, log_queue)

Worker entrypoint.

  • Connects to the already-running SharedMemoryManager (started in parent)
  • Optionally configures R env via runtime_conf
  • Receives CALL/SHUTDOWN commands over conn
  • Executes Python functions in modules inside this worker. Those modules are free to use rpy2 / brms / cmdstanr however they like.
Source code in brmspy/_session/worker/worker.py
def worker_main(
    conn: Connection,
    mgr_address: str | None,
    mgr_authkey: bytes | None,
    runtime_conf: EnvironmentConfig,
    log_queue: mp.Queue,
) -> None:
    """
    Worker entrypoint.

    - Connects to the already-running SharedMemoryManager (started in parent)
    - Optionally configures R env via `runtime_conf`
    - Receives CALL/SHUTDOWN commands over `conn`
    - Executes *Python* functions in modules inside this worker.
      Those modules are free to use rpy2 / brms / cmdstanr however they like.
    """

    setup_worker_logging(log_queue)

    import os

    os.environ["BRMSPY_WORKER"] = "1"

    _initialise_r_safe()

    # 1. Connect to SHM manager
    smm = SharedMemoryManager(address=mgr_address, authkey=mgr_authkey, ctx=ctx)
    smm.connect()

    # 2. Optional environment init (R_HOME, R_LIBS_USER, etc.)
    activate(runtime_conf)
    run_startup_scripts(runtime_conf)

    shm_pool = ShmPool(smm)
    reg = get_default_registry()

    module_cache: dict[str, Any] = {}

    import rpy2.rinterface_lib.callbacks
    from rpy2.rinterface_lib.sexp import Sexp

    rpy2.rinterface_lib.callbacks._WRITECONSOLE_EXCEPTION_LOG = (
        "[R]: {exception} {exc_value} {traceback}"
    )

    from ..._singleton._shm_singleton import _set_shm

    _set_shm(shm_pool)

    try:
        while True:
            req = conn.recv()
            cmd = req["cmd"]
            req_id = req["id"]

            shm_pool.gc()

            try:
                if cmd == "SHUTDOWN":
                    conn.send(
                        {
                            "id": req_id,
                            "ok": True,
                            "result": None,
                            "error": None,
                            "traceback": None,
                        }
                    )
                    break

                elif cmd == "PING":
                    conn.send(
                        {
                            "id": req_id,
                            "ok": True,
                            "result": None,
                            "error": None,
                            "traceback": None,
                        }
                    )
                    continue

                elif cmd == "CALL":
                    # decode Python args
                    args = [reg.decode(p, shm_pool=shm_pool) for p in req["args"]]
                    kwargs = {
                        k: reg.decode(p, shm_pool=shm_pool)
                        for k, p in req["kwargs"].items()
                    }
                    args: list[Any] = reattach_sexp(args)
                    kwargs: dict[str, Any] = reattach_sexp(kwargs)

                    # resolve "mod:pkg.module.func"
                    target = _resolve_module_target(req["target"], module_cache)
                    out = target(*args, **kwargs)
                    out = cache_sexp(out)

                    # encode result
                    enc = reg.encode(out, shm_pool)
                    result_payload: PayloadRef = {
                        "codec": enc.codec,
                        "meta": enc.meta,
                        "buffers": enc.buffers,
                    }

                    conn.send(
                        {
                            "id": req_id,
                            "ok": True,
                            "result": result_payload,
                            "error": None,
                            "traceback": None,
                        }
                    )

                elif cmd == "_RUN_TEST_BY_NAME":
                    module = req["kwargs"]["module"]
                    classname = req["kwargs"]["class"]
                    funcname = req["kwargs"]["func"]

                    try:
                        mod = importlib.import_module(module)

                        if classname:
                            cls = getattr(mod, classname)
                            inst = cls()
                            fn = getattr(inst, funcname)
                        else:
                            fn = getattr(mod, funcname)

                        result = fn()

                        enc = reg.encode(result, shm_pool)
                        conn.send(
                            {
                                "id": req_id,
                                "ok": True,
                                "result": {
                                    "codec": enc.codec,
                                    "meta": enc.meta,
                                    "buffers": enc.buffers,
                                },
                                "error": None,
                                "traceback": None,
                            }
                        )

                    except Exception as e:
                        import traceback

                        conn.send(
                            {
                                "id": req_id,
                                "ok": False,
                                "result": None,
                                "error": str(e),
                                "traceback": traceback.format_exc(),
                            }
                        )

                else:
                    raise ValueError(f"Unknown command: {cmd!r}")

            except RRuntimeError as e:
                import traceback
                import rpy2.robjects as ro

                tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
                full_msg = str(e)

                ignore_msgs = ["Can't show last error because no error was recorded"]

                try:

                    # traceback() prints and returns a pairlist -> coerce to something nice
                    r_tb = "\n".join(
                        list(
                            str(v)
                            for v in cast(ro.ListVector, ro.r("unlist(traceback())"))
                        )
                    )
                    tb = r_tb
                except Exception as tb_exc:
                    pass

                # Full base R error message
                try:
                    # full rlang error message (can be multi-line, with bullets etc.)
                    _msg = str(
                        cast(
                            ro.ListVector,
                            ro.r("rlang::format_error_bullets(rlang::last_error())"),
                        )[0]
                    )
                    if _msg and not any(part in _msg for part in ignore_msgs):
                        full_msg = _msg
                    else:
                        raise
                except Exception:
                    # fallback to base R
                    try:
                        _msg = str(cast(ro.ListVector, ro.r("geterrmessage()"))[0])
                        if _msg and not any(part in _msg for part in ignore_msgs):
                            full_msg = _msg
                        else:
                            raise
                    except Exception:
                        pass

                conn.send(
                    {
                        "id": req_id,
                        "ok": False,
                        "result": None,
                        "error": str(full_msg),
                        "traceback": tb,
                    }
                )

            except Exception as e:
                import traceback

                tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
                conn.send(
                    {
                        "id": req_id,
                        "ok": False,
                        "result": None,
                        "error": str(e),
                        "traceback": tb,
                    }
                )
    finally:
        pass

r_home_from_subprocess()

Return the R home directory by calling R RHOME in a subprocess.

Source code in brmspy/_session/session.py
def r_home_from_subprocess() -> str | None:
    """Return the R home directory by calling `R RHOME` in a subprocess."""
    cmd = ("R", "RHOME")
    tmp = subprocess.check_output(cmd, universal_newlines=True)
    # may raise FileNotFoundError, WindowsError, etc
    r_home = tmp.split(os.linesep)
    if r_home[0].startswith("WARNING"):
        res = r_home[1]
    else:
        res = r_home[0].strip()
    return res

add_env_defaults(overrides)

Ensure required R environment variables exist inside overrides. Mutates overrides in-place and returns it.

  • Ensures R_HOME exists (or auto-detects)
  • Ensures LD_LIBRARY_PATH includes R_HOME/lib (Unix only)
Source code in brmspy/_session/session.py
def add_env_defaults(overrides: dict[str, str]) -> dict[str, str]:
    """
    Ensure required R environment variables exist inside overrides.
    Mutates overrides in-place and returns it.

    - Ensures R_HOME exists (or auto-detects)
    - Ensures LD_LIBRARY_PATH includes R_HOME/lib (Unix only)
    """
    # ---------------------------------------------------------
    # 1) R_HOME detection / override handling
    # ---------------------------------------------------------
    if "R_HOME" not in overrides:
        r_home = r_home_from_subprocess()
        if not r_home:
            raise RuntimeError(
                "`R_HOME` not provided and cannot auto-detect via subprocess. "
                "R must be installed or explicitly configured."
            )
        r_home_path = Path(r_home)
        overrides["R_HOME"] = r_home_path.as_posix()
    else:
        r_home_path = Path(overrides["R_HOME"])

    # ---------------------------------------------------------
    # 2) LD_LIBRARY_PATH for Unix-like systems
    # ---------------------------------------------------------
    if platform.system() != "Windows":
        r_lib_dir = (r_home_path / "lib").as_posix()

        if "LD_LIBRARY_PATH" not in overrides:
            # Take current LD_LIBRARY_PATH from environment
            current = os.environ.get("LD_LIBRARY_PATH", "")
        else:
            current = overrides["LD_LIBRARY_PATH"]

        # Split into entries (ignore empty ones)
        existing_parts = [p for p in current.split(":") if p]

        # Prepend R_HOME/lib if not already present
        if r_lib_dir not in existing_parts:
            new_ld = ":".join([r_lib_dir] + existing_parts)
        else:
            new_ld = current  # already included

        overrides["LD_LIBRARY_PATH"] = new_ld

    if "RPY2_CFFI_MODE" not in overrides:
        overrides["RPY2_CFFI_MODE"] = "ABI"

    # ---------------------------------------------------------
    return overrides

with_env(overrides)

Temporarily apply environment overrides, then restore.

Source code in brmspy/_session/session.py
@contextmanager
def with_env(overrides: dict[str, str]) -> Iterator[None]:
    """Temporarily apply environment overrides, then restore."""
    overrides = add_env_defaults(overrides)

    old = {}
    sentinel = object()

    for k, v in overrides.items():
        old[k] = os.environ.get(k, sentinel)
        os.environ[k] = v

    try:
        yield
    finally:
        for k, v_old in old.items():
            if v_old is sentinel:
                os.environ.pop(k, None)
            else:
                os.environ[k] = v_old

spawn_worker(target, args, env_overrides, log_queue=None)

Spawn the worker process with temporary environment overrides.

Notes
  • brmspy uses spawn semantics (see ctx = mp.get_context("spawn")).
  • The worker receives the log queue as an extra trailing argument when present.
Source code in brmspy/_session/session.py
def spawn_worker(
    target, args, env_overrides: dict[str, str], log_queue: mp.Queue | None = None
):
    """
    Spawn the worker process with temporary environment overrides.

    Notes
    -----
    - brmspy uses spawn semantics (see `ctx = mp.get_context("spawn")`).
    - The worker receives the log queue as an extra trailing argument when present.
    """
    with with_env(env_overrides):
        daemon = os.environ.get("BRMSPY_TEST") != "1" and not os.environ.get(
            "COVERAGE_PROCESS_START"
        )
        daemon = True
        if log_queue is not None:
            args = (*args, log_queue)
        proc = ctx.Process(target=target, args=args, daemon=daemon)
        proc.start()
    return proc