"""Shared helpers for map/vanilla/map-reduce (resources, secrets, tokenizer wiring)."""
import logging
from pathlib import Path
from typing import Dict, Optional, Any, Set, TYPE_CHECKING
from omegaconf import DictConfig
from yt_framework.yt.client_base import OperationResources
from yt_framework.utils.env import load_secrets
from .tokenizer_artifact import (
init_tokenizer_artifact_directory,
resolve_tokenizer_artifact_name,
resolve_tokenizer_archive_name,
)
if TYPE_CHECKING:
from yt_framework.core.stage import StageContext
def _get_config_value_with_default(
config: DictConfig,
key: str,
default: Any,
logger: logging.Logger,
) -> Any:
"""
Get config value with default, logging when default is used.
Args:
config: OmegaConf DictConfig object
key: Config key to access (supports dot notation like "client.pool")
default: Default value to use if key is missing or None
logger: Logger instance for logging defaults
Returns:
Config value if present and not None, otherwise default
"""
try:
# Check if key exists in config
if key not in config:
logger.info(f" Using default {key}={default} (not specified in config)")
return default
value = config.get(key)
# If value is None, use default and log
if value is None:
logger.info(f" Using default {key}={default} (value is None in config)")
return default
return value
except Exception:
# Key doesn't exist or access failed, use default
logger.info(f" Using default {key}={default} (not specified in config)")
return default
[docs]
def build_environment(
configs_dir: Path,
logger: logging.Logger,
) -> Dict[str, str]:
"""
Build environment variables for map and vanilla operations.
Jobs read configuration from config.yaml, so only secrets are passed
via environment variables.
Args:
configs_dir: Directory containing secrets.env file
logger: Logger instance
Returns:
Dictionary of secret environment variables
"""
# Get all secrets loaded from secrets.env file
logger.debug("Building environment with secrets...")
env = load_secrets(configs_dir)
# Log secret keys (mask values)
for key, value in env.items():
logger.debug(f" {key}: {'*' * min(len(value), 10)}")
logger.debug(f"Environment ready with {len(env)} secrets")
return env
[docs]
def prepare_docker_auth(
docker_image: Optional[str],
docker_username: Optional[str],
docker_password: Optional[str],
) -> Optional[Dict[str, str]]:
"""
Prepare Docker authentication dictionary.
Args:
docker_image: Optional Docker image name
docker_username: Optional Docker registry username
docker_password: Optional Docker registry password
Returns:
Docker authentication dict if all credentials provided, None otherwise
"""
if docker_image and docker_username and docker_password:
return {"username": docker_username, "password": docker_password}
return None
[docs]
def collect_passthrough_kwargs(
operation_config: DictConfig,
reserved_keys: Set[str],
) -> Dict[str, Any]:
"""
Collect top-level config values to forward to YT client.
OmegaConf dict nodes are resolved to plain Python containers.
"""
from omegaconf import DictConfig as OmegaDictConfig, OmegaConf
out: Dict[str, Any] = {}
for k in operation_config.keys():
if k in reserved_keys:
continue
v = operation_config.get(k)
if v is None:
continue
if isinstance(v, OmegaDictConfig):
out[str(k)] = OmegaConf.to_container(v, resolve=True)
else:
out[str(k)] = v
return out
[docs]
def build_operation_environment(
context: "StageContext",
operation_config: DictConfig,
logger: logging.Logger,
include_stage_name: bool = True,
include_tokenizer_artifact: bool = True,
) -> Dict[str, str]:
"""
Build operation environment from secrets + explicit env config + optional helpers.
"""
env = build_environment(configs_dir=context.deps.configs_dir, logger=logger)
for k, v in (operation_config.get("env") or {}).items():
if v is not None:
env[str(k)] = str(v)
if include_tokenizer_artifact:
tokenizer_cfg = operation_config.get("tokenizer_artifact")
if tokenizer_cfg:
init_tokenizer_artifact_directory(
context=context,
tokenizer_artifact_config=tokenizer_cfg,
)
if tokenizer_cfg.get("artifact_base"):
artifact_name = resolve_tokenizer_artifact_name(
stage_config=context.config,
tokenizer_artifact_config=tokenizer_cfg,
)
if artifact_name:
archive_name = resolve_tokenizer_archive_name(artifact_name)
env.setdefault("TOKENIZER_ARTIFACT_FILE", archive_name)
env.setdefault(
"TOKENIZER_ARTIFACT_DIR", f"tokenizer_artifacts/{artifact_name}"
)
env.setdefault("TOKENIZER_ARTIFACT_NAME", artifact_name)
if include_stage_name:
env.setdefault("YT_STAGE_NAME", context.name)
return env