Skip to content

_r_env

R environment operations: libPaths, cmdstan path, package loading. Each function does exactly one R operation. Stateless.

Functions

log_warning(msg, method_name=None)

Log a warning message.

Parameters:

Name Type Description Default
msg str

The warning message to log

required
method_name str

The name of the method/function. If None, will auto-detect from call stack.

None
Source code in brmspy/helpers/log.py
def log_warning(msg: str, method_name: str | None = None):
    """
    Log a warning message.

    Parameters
    ----------
    msg : str
        The warning message to log
    method_name : str, optional
        The name of the method/function. If None, will auto-detect from call stack.

    """
    log(msg, method_name=method_name, level=logging.WARNING)

get_lib_paths()

Get current .libPaths() from R.

Source code in brmspy/_runtime/_r_env.py
def get_lib_paths() -> list[str]:
    """Get current .libPaths() from R."""
    import rpy2.robjects as ro
    result = cast(ro.ListVector, ro.r('.libPaths()'))
    return [str(p) for p in result]

get_cmdstan_path()

Get current cmdstanr::cmdstan_path() or None.

Source code in brmspy/_runtime/_r_env.py
def get_cmdstan_path() -> str | None:
    """Get current cmdstanr::cmdstan_path() or None."""
    import rpy2.robjects as ro
    try:
        result = cast(ro.ListVector, ro.r("suppressWarnings(suppressMessages(cmdstanr::cmdstan_path()))"))
        return str(result[0]) if result else None
    except Exception:
        return None

is_namespace_loaded(name)

Check if package namespace is loaded.

Source code in brmspy/_runtime/_r_env.py
def is_namespace_loaded(name: str) -> bool:
    """Check if package namespace is loaded."""
    import rpy2.robjects as ro
    expr = f'"{name}" %in% loadedNamespaces()'

    res = cast(ro. ListVector, ro.r(expr))
    return str(res[0]).lower().strip() == "true"

is_package_attached(name)

Check if package is on search path.

Source code in brmspy/_runtime/_r_env.py
def is_package_attached(name: str) -> bool:
    """Check if package is on search path."""
    import rpy2.robjects as ro
    expr = f'paste0("package:", "{name}") %in% search()'
    res = cast(ro.ListVector, ro.r(expr))
    return str(res[0]).lower().strip() == "true"

unload_package(name)

Also known as footgun. Don't call without very good reason.

Attempt to unload package. Returns True if successful. Tries: detach -> unloadNamespace -> library.dynam.unload Does NOT uninstall.

Source code in brmspy/_runtime/_r_env.py
def unload_package(name: str) -> bool:
    """
    Also known as footgun. Don't call without very good reason.

    Attempt to unload package. Returns True if successful.
    Tries: detach -> unloadNamespace -> library.dynam.unload
    Does NOT uninstall.
    """
    import rpy2.robjects as ro

    detach_only = False

    r_code = f"""
      pkg <- "{name}"
      detach_only <- {str(detach_only).upper()}

      .unload_pkg <- function(pkg, detach_only) {{
        success <- TRUE

        # Always try to detach from search path first
        tryCatch({{
          search_name <- paste0("package:", pkg)
          if (search_name %in% search()) {{
            detach(search_name,
                   unload = !detach_only,
                   character.only = TRUE)
          }}
        }}, error = function(e) {{ success <<- FALSE }})

        if (detach_only) {{
          # do *not* touch namespace or DLL
          return(success)
        }}

        # 2) Unload namespace
        tryCatch({{
          if (pkg %in% loadedNamespaces()) {{
            unloadNamespace(pkg)
          }}
        }}, error = function(e) {{ success <<- FALSE }})

        # 3) pkgload (devtools-style unload)
        tryCatch({{
          if (requireNamespace("pkgload", quietly = TRUE)) {{
            pkgload::unload(pkg)
          }}
        }}, error = function(e) {{}})

        # 4) DLL unload if still registered
        tryCatch({{
          dlls <- getLoadedDLLs()
          if (pkg %in% rownames(dlls)) {{
            dll_info <- dlls[[pkg]]
            dll_name <- dll_info[["name"]]
            libpath  <- dirname(dll_info[["path"]])
            library.dynam.unload(
              chname  = dll_name,
              package = pkg,
              libpath = libpath
            )
          }}
        }}, error = function(e) {{}})

        success
      }}

      .unload_pkg(pkg, detach_only)
    """

    try:
        result = cast(list, ro.r(r_code))
        return str(result[0]).lower().strip() == "true"
    except Exception:
        return False

_find_libpath_packages(libpath, include_not_loaded=False)

Source code in brmspy/_runtime/_r_env.py
def _find_libpath_packages(libpath: Path | None, include_not_loaded: bool = False) -> list[str]:
    import rpy2.robjects as ro
    if libpath is None:
        return []
    pkgs = cast(Callable, ro.r("""
function(runtime_root, include_not_loaded = FALSE) {
  lib_root <- file.path(runtime_root, "Rlib")
  lib_root <- normalizePath(lib_root, winslash = "/", mustWork = TRUE)

  attached <- sub("^package:", "", grep("^package:", search(), value = TRUE))
  ns <- loadedNamespaces()
  pkgs <- unique(c(attached, ns))

  # optionally add packages that are installed in this lib_root
  if (isTRUE(include_not_loaded)) {
    inst <- tryCatch(
      installed.packages(lib.loc = lib_root)[, "Package"],
      error = function(e) character(0L)
    )
    pkgs <- unique(c(pkgs, inst))
  }

  res <- vapply(pkgs, function(p) {
    path <- suppressWarnings(tryCatch(
      find.package(p, quiet = TRUE)[1],
      error = function(e) NA_character_
    ))

    if (is.na(path) || !nzchar(path)) {
      return(NA_character_)
    }

    path <- normalizePath(path, winslash = "/", mustWork = FALSE)

    if (startsWith(path, lib_root)) p else NA_character_
  }, character(1L))

  res[!is.na(res)]
}
    """))(libpath.as_posix(), include_not_loaded)

    pkgs = [str(v) for v in cast(ro.StrVector, pkgs)]

    return pkgs

_compute_unload_order(pkgs)

Source code in brmspy/_runtime/_r_env.py
def _compute_unload_order(pkgs: list[str] | None) -> list[str] | None:
    if pkgs is None:
        return None
    if len(pkgs) == 0:
        return []
    import rpy2.robjects as ro
    fun = cast(Callable, ro.r("""
function(pkgs) {
  pkgs <- unique(as.character(pkgs))
  if (!length(pkgs)) return(character(0L))

  # Try to get metadata; if this fails, just treat as "no deps info"
  ip <- tryCatch(
    installed.packages(lib.loc = .libPaths(), noCache = TRUE)[, "Package"],
    error = function(e) character(0L)
  )

  pkgs_with_meta <- intersect(pkgs, ip)

  deps <- vector("list", length(pkgs))
  names(deps) <- pkgs

  if (length(pkgs_with_meta)) {
    d <- tools::package_dependencies(
      pkgs_with_meta,
      which     = c("Depends", "Imports", "LinkingTo"),
      recursive = FALSE,
      reverse   = TRUE
    )
    # Keep only edges within our original pkgs set
    d <- lapply(d, function(x) intersect(x, pkgs))
    deps[names(d)] <- d
  }

  remaining <- pkgs
  order <- character(0L)

  while (length(remaining)) {
    has_dependents <- unique(unlist(deps[remaining]))
    leaves <- setdiff(remaining, has_dependents)

    if (!length(leaves)) {
      # Cycle or no dep info: just append the rest
      order <- c(order, remaining)
      break
    }

    order <- c(order, leaves)
    remaining <- setdiff(remaining, leaves)
  }

  order
}
"""))
    lv = cast(ro.StrVector, fun(ro.StrVector(pkgs)))
    return [str(v) for v in lv]

_unload_libpath_packages(libpath)

Source code in brmspy/_runtime/_r_env.py
def _unload_libpath_packages(libpath: Path | None) -> None:
    if not libpath or not libpath.exists():
        return
    pkgs = _find_libpath_packages(libpath)
    if len(pkgs) == 0:
        return
    pkgs = _compute_unload_order(pkgs)
    if not pkgs:
        return
    for pkg in pkgs:
        try:
            unload_package(pkg)
        except Exception as e:
            log_warning(f"{e}")

_is_runtime_path(p)

Source code in brmspy/_runtime/_r_env.py
def _is_runtime_path(p: str):
    return ".brmspy/runtime/" not in p and ".brmspy\\runtime\\" not in p

_is_environment_path(p)

Source code in brmspy/_runtime/_r_env.py
def _is_environment_path(p: str):
    return ".brmspy/runtime/" not in p and ".brmspy\\runtime\\" not in p

_path_priority(p)

Source code in brmspy/_runtime/_r_env.py
def _path_priority(p: str) -> int:
    if _is_environment_path(p):
        return 0
    if _is_runtime_path(p):
        return 1
    return 2

set_lib_paths(paths)

Set .libPaths() in R.

Source code in brmspy/_runtime/_r_env.py
def set_lib_paths(paths: list[str]) -> None:
    """Set .libPaths() in R."""
    import rpy2.robjects as ro

    current = [str(p) for p in cast(ro.StrVector, ro.r(".libPaths()"))]
    if any(_is_environment_path(p) for p in paths):
        current = [p for p in current if not _is_environment_path(p)]
    elif any(_is_runtime_path(p) for p in paths):
        current = [p for p in current if not _is_runtime_path(p)]

    new_paths = list(dict.fromkeys(list(paths) + current))
    new_paths = sorted(new_paths, key=_path_priority)
    r_fun = cast(Callable, ro.r('.libPaths'))
    r_fun(ro.StrVector(new_paths))

set_cmdstan_path(path)

Set cmdstanr::set_cmdstan_path().

Source code in brmspy/_runtime/_r_env.py
def set_cmdstan_path(path: str | None) -> None:
    """Set cmdstanr::set_cmdstan_path()."""
    import rpy2.robjects as ro

    try:
      if path is None:
          path_str = "NULL"
      else:
          path_str = f'"{path}"'

      ro.r(f'''
      if (!requireNamespace("cmdstanr", quietly = TRUE)) {{
        stop("cmdstanr is not available in rlibs")
      }}
      suppressWarnings(suppressMessages(cmdstanr::set_cmdstan_path(path={path_str})))
      ''')

    except Exception as e:
        log_warning(f"Failed to set cmdstan_path to {path}: {e}")

run_gc()

Run garbage collection in both Python and R.

Source code in brmspy/_runtime/_r_env.py
def run_gc() -> None:
    """Run garbage collection in both Python and R."""
    import gc

    import rpy2.robjects as ro
    gc.collect()
    try:
        ro.r('gc()')
    except Exception:
        pass

forward_github_token()

Copy GITHUB_TOKEN/GITHUB_PAT to R's Sys.setenv.

Source code in brmspy/_runtime/_r_env.py
def forward_github_token() -> None:
    """Copy GITHUB_TOKEN/GITHUB_PAT to R's Sys.setenv."""
    import rpy2.robjects as ro
    try:
        kwargs = {}
        pat = os.environ.get("GITHUB_PAT")
        token = os.environ.get("GITHUB_TOKEN")

        if not pat and not token:
            return

        r_setenv = cast(Callable, ro.r("Sys.setenv"))

        if pat:
            kwargs["GITHUB_PAT"] = pat
        elif token:
            kwargs["GITHUB_TOKEN"] = token

        if kwargs:
            r_setenv(**kwargs)
    except Exception:
        pass