Source code for yt_framework.operations.upload

"""Build `source.tar.gz`, merge extra modules/paths, and push artifacts to Cypress."""

import importlib
import logging
import shutil
import tarfile
from pathlib import Path
from typing import Dict, List, Optional, Literal, Tuple

from yt_framework.utils import log_header, log_success
from yt_framework.yt.client_base import BaseYTClient
from omegaconf import OmegaConf
from yt_framework.utils.ignore import YTIgnoreMatcher

# Marker for the implicit ytjobs framework package in target conflict checks
_IMPLICIT_YTJOBS_SOURCE = "implicit (framework)"

# Default build code directory name
_BUILD_CODE_DIR = ".build"


def _get_ytjobs_dir() -> Path:
    """Get ytjobs package directory dynamically."""
    import ytjobs

    return Path(ytjobs.__file__).parent


def _copy_ytjobs_to_build_dir(
    build_dir: Path,
    logger: logging.Logger,
) -> int:
    """
    Copy ytjobs package to local build directory.

    Respects .ytignore patterns if present in the ytjobs directory.

    Args:
        build_dir: Local build directory path
        logger: Logger instance

    Returns:
        Number of files copied
    """
    ytjobs_dir = _get_ytjobs_dir()
    target_dir = build_dir / "ytjobs"
    target_dir.mkdir(parents=True, exist_ok=True)

    logger.info(f"Copying ytjobs package to {target_dir}...")

    # Initialize .ytignore matcher for ytjobs directory
    ignore_matcher = YTIgnoreMatcher(ytjobs_dir)

    file_count = 0
    ignored_count = 0

    for source_file in ytjobs_dir.rglob("*"):
        if source_file.is_file():
            # Check if file should be ignored
            if ignore_matcher.should_ignore(source_file):
                logger.debug(
                    f"Ignoring file (matched .ytignore): {source_file.relative_to(ytjobs_dir)}"
                )
                ignored_count += 1
                continue

            rel_path = source_file.relative_to(ytjobs_dir)
            target_file = target_dir / rel_path
            target_file.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(source_file, target_file)
            file_count += 1

    log_success(logger, f"Copied {file_count} ytjobs files")
    if ignored_count > 0:
        logger.debug(f"  Ignored {ignored_count} files (matched .ytignore patterns)")
    return file_count


def _resolve_upload_target(
    source: str, target: Optional[str], pipeline_dir: Path
) -> str:
    """Resolve target name for upload_paths entry.

    Args:
        source: Source path from config
        target: Optional target from config
        pipeline_dir: Pipeline directory (unused, for API consistency)

    Returns:
        Target name (from config or derived from source basename)
    """
    if target is not None and str(target).strip():
        return str(target).strip()
    return Path(source).name


def _validate_upload_config(
    upload_modules: Optional[List[str]],
    upload_paths: Optional[List[Dict[str, str]]],
    pipeline_dir: Path,
) -> None:
    """Validate upload config for reserved targets and conflicts.

    Args:
        upload_modules: List of module names
        upload_paths: List of dicts with source and optional target
        pipeline_dir: Pipeline directory for path resolution

    Raises:
        ValueError: If reserved target used or target conflict detected
    """
    reserved = {"stages", "ytjobs"}

    targets: List[Tuple[str, str]] = []  # (target, source_description)

    # ytjobs is implicit
    targets.append(("ytjobs", _IMPLICIT_YTJOBS_SOURCE))

    # upload_modules (use top-level package as target; my_package.sub -> my_package)
    for mod in upload_modules or []:
        top_level = mod.split(".")[0]
        targets.append((top_level, f"upload_modules[{mod}]"))

    # upload_paths (validate source is within pipeline_dir)
    pipeline_dir_resolved = pipeline_dir.resolve()
    for i, entry in enumerate(upload_paths or []):
        if "source" not in entry:
            raise ValueError("upload_paths entry missing required 'source' key.")
        source = entry.get("source", "")
        resolved_path = (pipeline_dir / source).resolve()
        if not resolved_path.is_relative_to(pipeline_dir_resolved):
            raise ValueError(
                f"upload_paths[{i}] source must be within pipeline directory: "
                f"{resolved_path} (pipeline: {pipeline_dir_resolved})."
            )
        target = entry.get("target")
        resolved_target = _resolve_upload_target(source, target, pipeline_dir)
        targets.append((resolved_target, f"upload_paths[{source}]"))

    # Check reserved
    for target, source_desc in targets:
        if target in reserved and source_desc != _IMPLICIT_YTJOBS_SOURCE:
            raise ValueError(
                f"Reserved target name '{target}' cannot be used. "
                f"Reserved names: stages, ytjobs."
            )

    # Check conflicts
    seen: Dict[str, str] = {}
    for target, source_desc in targets:
        if target in seen and seen[target] != source_desc:
            sources = f"{seen[target]}, {source_desc}"
            raise ValueError(
                f"Upload target conflict: '{target}' is used by multiple sources: {sources}."
            )
        seen[target] = source_desc


def _copy_module_to_build_dir(
    module_name: str,
    target_dir: Path,
    logger: logging.Logger,
) -> int:
    """Copy an importable Python package to build directory.

    Only packages (directories with __init__.py) are supported; single-file
    modules are rejected. For dotted paths (e.g. my_package.submodule),
    copies the full top-level package. Respects .ytignore in the source dir.

    Args:
        module_name: Python module name to import (e.g. my_package or my_package.sub)
        target_dir: Target directory in build (top-level package name)
        logger: Logger instance

    Returns:
        Number of files copied

    Raises:
        ValueError: If module cannot be imported or has unsupported layout
    """
    top_level = module_name.split(".")[0]
    try:
        module = importlib.import_module(top_level)
    except ImportError as e:
        raise ValueError(
            f"Failed to import module '{module_name}' (top-level: {top_level}): {e}."
        ) from e

    if getattr(module, "__file__", None) is None:
        raise ValueError(
            f"Module '{module_name}' has no __file__ (namespace or non-file package). "
            "Only file-based packages are supported."
        )

    # Use __path__ for packages; reject single-file modules (which would copy
    # the entire containing directory, e.g. site-packages)
    if hasattr(module, "__path__"):
        source_dir = Path(module.__path__[0]).resolve()
        if not source_dir.is_dir():
            raise ValueError(
                f"Module '{module_name}' has invalid __path__: {source_dir} is not a directory."
            )
    else:
        raise ValueError(
            f"Module '{module_name}' is a single-file module, not a package. "
            "upload_modules supports only packages (directories with __init__.py). "
            "Single-file modules would copy the entire containing directory."
        )

    target_dir.mkdir(parents=True, exist_ok=True)

    logger.info(f"Copying module {module_name} to {target_dir}...")

    ignore_matcher = YTIgnoreMatcher(source_dir)
    file_count = 0
    ignored_count = 0

    for source_file in source_dir.rglob("*"):
        if source_file.is_file():
            if ignore_matcher.should_ignore(source_file):
                logger.debug(
                    f"Ignoring file (matched .ytignore): {source_file.relative_to(source_dir)}"
                )
                ignored_count += 1
                continue

            rel_path = source_file.relative_to(source_dir)
            target_file = target_dir / rel_path
            target_file.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(source_file, target_file)
            file_count += 1

    log_success(logger, f"Copied {file_count} {module_name} files")
    if ignored_count > 0:
        logger.debug(f"  Ignored {ignored_count} files (matched .ytignore patterns)")
    return file_count


def _copy_path_to_build_dir(
    source_path: str,
    target_name: str,
    build_dir: Path,
    pipeline_dir: Path,
    logger: logging.Logger,
) -> int:
    """Copy a local path to build directory.

    Respects .ytignore patterns if present in the source directory.

    Args:
        source_path: Source path relative to pipeline_dir
        target_name: Target directory name in build
        build_dir: Build directory path
        pipeline_dir: Pipeline directory for resolving source
        logger: Logger instance

    Returns:
        Number of files copied

    Raises:
        ValueError: If source path escapes pipeline directory or is not a directory
        FileNotFoundError: If source does not exist
    """
    pipeline_dir_resolved = pipeline_dir.resolve()
    resolved = (pipeline_dir / source_path).resolve()

    if not resolved.is_relative_to(pipeline_dir_resolved):
        raise ValueError(
            f"Upload path source must be within pipeline directory: {resolved} "
            f"(pipeline: {pipeline_dir_resolved}). Paths like '../foo' are not allowed."
        )

    if not resolved.exists():
        raise FileNotFoundError(f"Upload path source does not exist: {resolved}.")

    if not resolved.is_dir():
        raise ValueError(f"Upload path source must be a directory: {resolved}.")

    target_dir = build_dir / target_name
    target_dir.mkdir(parents=True, exist_ok=True)

    logger.info(f"Copying {resolved} to {target_dir}...")

    ignore_matcher = YTIgnoreMatcher(resolved)
    file_count = 0
    ignored_count = 0

    for source_file in resolved.rglob("*"):
        if source_file.is_file():
            if ignore_matcher.should_ignore(source_file):
                logger.debug(
                    f"Ignoring file (matched .ytignore): {source_file.relative_to(resolved)}"
                )
                ignored_count += 1
                continue

            rel_path = source_file.relative_to(resolved)
            target_file = target_dir / rel_path
            target_file.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(source_file, target_file)
            file_count += 1

    log_success(logger, f"Copied {file_count} files from {source_path}")
    if ignored_count > 0:
        logger.debug(f"  Ignored {ignored_count} files (matched .ytignore patterns)")
    return file_count


def _copy_stage_to_build_dir(
    build_dir: Path,
    stage_dir: Path,
    logger: logging.Logger,
) -> int:
    """
    Copy a single stage's code and config to local build directory.

    Respects .ytignore patterns if present in the stage directory.

    Args:
        build_dir: Local build directory path
        stage_dir: Path to stage directory (e.g., stages/run_map/)
        logger: Logger instance

    Returns:
        Number of files copied
    """
    stage_name = stage_dir.name
    file_count = 0

    # Initialize .ytignore matcher for stage directory
    ignore_matcher = YTIgnoreMatcher(stage_dir)
    ignored_count = 0

    # Copy config.yaml if it exists and has job section
    config_path = stage_dir / "config.yaml"
    if config_path.exists():
        # Check if config should be ignored
        if not ignore_matcher.should_ignore(config_path):
            try:
                config = OmegaConf.load(config_path)
                if (
                    "job" in config
                    and config.job
                    and (not isinstance(config.job, dict) or len(config.job) > 0)
                ):
                    target_config = build_dir / "stages" / stage_name / "config.yaml"
                    target_config.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(config_path, target_config)
                    file_count += 1
                    logger.debug(f"  Copied config: {stage_name}/config.yaml")
            except Exception:
                # If config parsing fails, skip it
                pass
        else:
            logger.debug(
                f"  Ignoring config: {stage_name}/config.yaml (matched .ytignore)"
            )
            ignored_count += 1

    # Copy src directory
    src_dir = stage_dir / "src"
    if src_dir.exists():
        target_src = build_dir / "stages" / stage_name / "src"
        target_src.mkdir(parents=True, exist_ok=True)

        for source_file in src_dir.rglob("*"):
            if source_file.is_file():
                # Check if file should be ignored
                if ignore_matcher.should_ignore(source_file):
                    logger.debug(
                        f"  Ignoring file: {source_file.relative_to(stage_dir)} (matched .ytignore)"
                    )
                    ignored_count += 1
                    continue

                rel_path = source_file.relative_to(src_dir)
                target_file = target_src / rel_path
                target_file.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(source_file, target_file)
                file_count += 1

        logger.debug(f"  Copied {file_count} files from {stage_name}/src/")
        if ignored_count > 0:
            logger.debug(
                f"  Ignored {ignored_count} files from {stage_name}/ (matched .ytignore patterns)"
            )

    return file_count


def _bash_wrapper_script_body(
    stage_name: str, python_script_relative: str, label: str
) -> str:
    """Shared body for operation wrappers (map, vanilla, map-reduce legs, reduce)."""
    requirements_path = f"stages/{stage_name}/requirements.txt"
    return f"""
#!/bin/bash
set -e

# Get current directory (sandbox root)
SANDBOX_ROOT="$(pwd)"

# Set PYTHONPATH to current directory so ytjobs and stages packages can be imported
export PYTHONPATH="${{PYTHONPATH}}:${{SANDBOX_ROOT}}"

# Set config path for ytjobs config loader
# Config file is extracted to stages/{stage_name}/config.yaml
export JOB_CONFIG_PATH="${{SANDBOX_ROOT}}/stages/{stage_name}/config.yaml"

# Install requirements.txt if it exists
if [ -f "{requirements_path}" ]; then
    echo "Installing dependencies from requirements.txt..." >&2
    pip install --quiet --no-cache-dir -r {requirements_path} || echo "Warning: Failed to install some dependencies" >&2
fi

# Execute: {label}
python3 {python_script_relative}
"""


def _write_wrapper_file(
    build_dir: Path,
    filename: str,
    body: str,
    logger: logging.Logger,
) -> None:
    wrapper_path = build_dir / filename
    wrapper_path.write_text(body)
    wrapper_path.chmod(0o755)
    logger.debug(f"Created wrapper script: {wrapper_path}")


def _load_stage_job_section(stage_dir: Path, logger: logging.Logger) -> Dict:
    """Return ``job`` dict from stage ``config.yaml`` if present."""
    cfg_path = stage_dir / "config.yaml"
    if not cfg_path.is_file():
        return {}
    try:
        cfg = OmegaConf.to_container(OmegaConf.load(cfg_path), resolve=True)
        if not isinstance(cfg, dict):
            return {}
        job = cfg.get("job")
        return job if isinstance(job, dict) else {}
    except Exception as e:
        logger.warning("Could not read %s for wrapper script paths: %s", cfg_path, e)
        return {}


def _resolve_map_reduce_command_scripts(
    stage_dir: Path,
    logger: logging.Logger,
) -> Tuple[Optional[str], Optional[str]]:
    """
    Resolve mapper/reducer Python entrypoints under stages/<name>/src/ for map-reduce
    command mode wrappers. Uses ``job.map_reduce_command`` when set.
    """
    job = _load_stage_job_section(stage_dir, logger)
    mrc = job.get("map_reduce_command") or {}
    if not isinstance(mrc, dict):
        mrc = {}
    mapper = str(mrc.get("mapper_script") or "mapper.py")
    reducer = mrc.get("reducer_script")
    if reducer is not None:
        reducer = str(reducer)
    src = stage_dir / "src"
    if reducer is None:
        for candidate in (
            "reducer.py",
            "reducer_mds.py",
            "reducer_main.py",
            "reducer_index.py",
        ):
            if (src / candidate).is_file():
                reducer = candidate
                break
    if not (src / mapper).is_file():
        logger.debug("No %s at %s — skipping map_reduce_mapper wrapper", mapper, src)
        return None, None
    if not reducer or not (src / reducer).is_file():
        logger.debug("No reducer script resolved for map-reduce in %s", src)
        return mapper, None
    return mapper, reducer


def _resolve_reduce_command_script(
    stage_dir: Path, logger: logging.Logger
) -> Optional[str]:
    """Reducer entrypoint for reduce-only command mode (``job.reduce_command``)."""
    job = _load_stage_job_section(stage_dir, logger)
    rc = job.get("reduce_command") or {}
    if not isinstance(rc, dict):
        rc = {}
    explicit = rc.get("reducer_script")
    if explicit:
        name = str(explicit)
        if (stage_dir / "src" / name).is_file():
            return name
        logger.warning(
            "reduce_command.reducer_script %s not found under %s",
            name,
            stage_dir / "src",
        )
    src = stage_dir / "src"
    for candidate in (
        "reducer.py",
        "reducer_index.py",
        "reducer_mds.py",
        "reducer_main.py",
    ):
        if (src / candidate).is_file():
            return candidate
    return None


def _create_unified_wrapper_script(
    stage_name: str,
    operation_type: Literal["map", "vanilla"],
    build_dir: Path,
    logger: logging.Logger,
) -> None:
    """
    Create unified wrapper script for map or vanilla operations.

    The wrapper script:
    1. Extracts tar.gz archive
    2. Sets up PYTHONPATH to include current directory
    3. Sets JOB_CONFIG_PATH to stage config
    4. Installs requirements.txt if present
    5. Executes the appropriate script (mapper.py or vanilla.py)

    Args:
        stage_name: Name of the stage
        operation_type: Type of operation ('map' or 'vanilla')
        build_dir: Local build directory path
        logger: Logger instance

    Returns:
        None
    """
    if operation_type == "map":
        script_path = f"stages/{stage_name}/src/mapper.py"
    else:
        script_path = f"stages/{stage_name}/src/vanilla.py"

    body = _bash_wrapper_script_body(
        stage_name, script_path, f"{operation_type} operation"
    )
    _write_wrapper_file(
        build_dir,
        f"operation_wrapper_{stage_name}_{operation_type}.sh",
        body,
        logger,
    )


def _create_map_reduce_command_wrappers(
    stage_name: str,
    stage_dir: Path,
    build_dir: Path,
    logger: logging.Logger,
) -> None:
    """Wrappers for ``tar_command_bootstrap`` map-reduce (JSON stdin/stdout legs)."""
    mapper_f, reducer_f = _resolve_map_reduce_command_scripts(stage_dir, logger)
    if not mapper_f:
        return
    m_rel = f"stages/{stage_name}/src/{mapper_f}"
    body_m = _bash_wrapper_script_body(
        stage_name, m_rel, "map-reduce mapper (command mode)"
    )
    _write_wrapper_file(
        build_dir,
        f"operation_wrapper_{stage_name}_map_reduce_mapper.sh",
        body_m,
        logger,
    )
    if reducer_f:
        r_rel = f"stages/{stage_name}/src/{reducer_f}"
        body_r = _bash_wrapper_script_body(
            stage_name, r_rel, "map-reduce reducer (command mode)"
        )
        _write_wrapper_file(
            build_dir,
            f"operation_wrapper_{stage_name}_map_reduce_reducer.sh",
            body_r,
            logger,
        )
    else:
        logger.warning(
            "Stage %s: map_reduce_mapper wrapper created but no reducer script — "
            "map-reduce command mode will fail until job.map_reduce_command.reducer_script is set",
            stage_name,
        )


def _create_reduce_command_wrapper(
    stage_name: str,
    stage_dir: Path,
    build_dir: Path,
    logger: logging.Logger,
) -> None:
    """Wrapper for reduce-only operations with ``tar_command_bootstrap``."""
    red = _resolve_reduce_command_script(stage_dir, logger)
    if not red:
        return
    r_rel = f"stages/{stage_name}/src/{red}"
    body = _bash_wrapper_script_body(stage_name, r_rel, "reduce (command mode)")
    _write_wrapper_file(
        build_dir,
        f"operation_wrapper_{stage_name}_reduce.sh",
        body,
        logger,
    )


def _create_wrappers_for_stage(
    stage_name: str,
    stage_dir: Path,
    build_dir: Path,
    logger: logging.Logger,
) -> None:
    """
    Create wrapper scripts for a stage based on what operation types it has.

    Args:
        stage_name: Name of the stage
        stage_dir: Path to stage directory
        build_dir: Local build directory path
        logger: Logger instance

    Returns:
        None
    """
    src_dir = stage_dir / "src"

    if not src_dir.exists():
        return

    # Check which operation types this stage supports
    has_mapper = (src_dir / "mapper.py").is_file() or bool(
        list(src_dir.glob("partition_*.py"))
    )
    has_vanilla = (src_dir / "vanilla.py").exists()

    # Create wrapper for each operation type found
    if has_mapper:
        _create_unified_wrapper_script(
            stage_name=stage_name,
            operation_type="map",
            build_dir=build_dir,
            logger=logger,
        )

    if has_vanilla:
        _create_unified_wrapper_script(
            stage_name=stage_name,
            operation_type="vanilla",
            build_dir=build_dir,
            logger=logger,
        )

    if has_mapper:
        _create_map_reduce_command_wrappers(
            stage_name=stage_name,
            stage_dir=stage_dir,
            build_dir=build_dir,
            logger=logger,
        )
        _create_reduce_command_wrapper(
            stage_name=stage_name,
            stage_dir=stage_dir,
            build_dir=build_dir,
            logger=logger,
        )


[docs] def build_code_locally( build_dir: Path, pipeline_dir: Path, logger: logging.Logger, create_wrappers: bool = False, upload_modules: Optional[List[str]] = None, upload_paths: Optional[List[Dict[str, str]]] = None, ) -> int: """ Build all code in a local build directory. Copies ytjobs package, optional custom modules/paths, and all stages' code to the build directory, preserving the same structure as would be uploaded to YT. Args: build_dir: Local build directory path pipeline_dir: Path to pipeline directory logger: Logger instance create_wrappers: If True, create wrapper scripts for all stages upload_modules: Optional list of module names to upload upload_paths: Optional list of {source, target?} for local paths Returns: Total number of files copied """ log_header(logger, "Code Build", f"Build directory: {build_dir}") # Create build directory build_dir.mkdir(parents=True) # Validate and resolve upload config _validate_upload_config( upload_modules=upload_modules, upload_paths=upload_paths, pipeline_dir=pipeline_dir, ) # Copy ytjobs package (implicit, always) ytjobs_files = _copy_ytjobs_to_build_dir( build_dir=build_dir, logger=logger, ) # Copy upload_modules (use top-level package name for target; dotted paths # e.g. my_package.submodule become build_dir/my_package/ with full tree) module_files = 0 for mod in upload_modules or []: top_level = mod.split(".")[0] module_files += _copy_module_to_build_dir( module_name=mod, target_dir=build_dir / top_level, logger=logger, ) # Copy upload_paths path_files = 0 for entry in upload_paths or []: source = entry["source"] target = _resolve_upload_target( source=source, target=entry.get("target"), pipeline_dir=pipeline_dir, ) path_files += _copy_path_to_build_dir( source_path=source, target_name=target, build_dir=build_dir, pipeline_dir=pipeline_dir, logger=logger, ) # Copy all stages stages_dir = pipeline_dir / "stages" stage_files = 0 stage_dirs_list = [] for stage_dir in stages_dir.iterdir(): if stage_dir.is_dir() and (stage_dir / "src").exists(): stage_name = stage_dir.name stage_dirs_list.append((stage_name, stage_dir)) files_copied = _copy_stage_to_build_dir( build_dir=build_dir, stage_dir=stage_dir, logger=logger, ) stage_files += files_copied # Create wrapper scripts if requested (for tar archive mode) if create_wrappers: for stage_name, stage_dir in stage_dirs_list: _create_wrappers_for_stage( stage_name=stage_name, stage_dir=stage_dir, build_dir=build_dir, logger=logger, ) logger.debug(f"Created wrapper scripts for {len(stage_dirs_list)} stages") total_files = ytjobs_files + module_files + path_files + stage_files log_success(logger, f"Code build completed: {total_files} total files") return total_files
[docs] def create_code_archive( build_dir: Path, archive_path: Path, logger: logging.Logger, ) -> None: """ Create a tar.gz archive from the build directory. Args: build_dir: Local build directory path archive_path: Path where the archive should be created logger: Logger instance Returns: None """ log_header(logger, "Code Archive", f"Creating archive: {archive_path}") # Ensure parent directory exists archive_path.parent.mkdir(parents=True, exist_ok=True) with tarfile.open(archive_path, "w:gz") as tar: # Add all files from build directory for file_path in build_dir.rglob("*"): if file_path.is_file(): arcname = file_path.relative_to(build_dir) tar.add(file_path, arcname=arcname, recursive=False) archive_size_mb = archive_path.stat().st_size / (1024 * 1024) log_success(logger, f"Archive created: {archive_size_mb:.2f} MB")
[docs] def upload_code_archive( yt_client: BaseYTClient, archive_path: Path, build_folder: str, logger: logging.Logger, ) -> None: """ Upload code archive to YT. Args: yt_client: YT client instance archive_path: Local path to the tar.gz archive build_folder: YT build folder path logger: Logger instance Returns: None """ log_header(logger, "Uploading Code Archive to YT") logger.info(f"Build folder: {build_folder}") archive_name = archive_path.name archive_yt_path = f"{build_folder}/{archive_name}" yt_client.upload_file( local_path=archive_path, yt_path=archive_yt_path, create_parent_dir=True, # Create build folder if it doesn't exist ) log_success(logger, f"Archive uploaded: {archive_yt_path}")
def _resolve_build_code_dir( pipeline_dir: Path, logger: logging.Logger, ) -> Path: """ Resolve build code directory path. Args: pipeline_dir: Path to pipeline directory logger: Logger instance Returns: Resolved build code directory path """ build_code_dir = pipeline_dir / _BUILD_CODE_DIR logger.debug(f"Using build directory: {build_code_dir}") # Ensure build directory exists if build_code_dir.exists(): shutil.rmtree(build_code_dir) build_code_dir.mkdir(parents=True) return build_code_dir
[docs] def upload_all_code( yt_client: BaseYTClient, build_folder: str, pipeline_dir: Path, logger: logging.Logger, upload_modules: Optional[List[str]] = None, upload_paths: Optional[List[Dict[str, str]]] = None, ) -> None: """ Upload all code to YT: ytjobs package, optional custom modules/paths, and stages. Builds code locally, creates tar archive, and uploads it to YT. Args: yt_client: YT client instance build_folder: YT build folder path pipeline_dir: Path to pipeline directory logger: Logger instance upload_modules: Optional list of module names to upload upload_paths: Optional list of {source, target?} for local paths Returns: None """ log_header( logger, "Code Upload", f"Tar archive mode | Build folder: {build_folder}" ) # Resolve build directory path build_code_dir = _resolve_build_code_dir(pipeline_dir=pipeline_dir, logger=logger) # Build code locally (including wrapper scripts for tar archive mode) build_dir = build_code_dir / "source" build_code_locally( build_dir=build_dir, pipeline_dir=pipeline_dir, logger=logger, create_wrappers=True, upload_modules=upload_modules, upload_paths=upload_paths, ) # Create archive archive_path = build_code_dir / "source.tar.gz" create_code_archive( build_dir=build_dir, archive_path=archive_path, logger=logger, ) # Upload archive upload_code_archive( yt_client=yt_client, archive_path=archive_path, build_folder=build_folder, logger=logger, ) log_success(logger, "Code upload completed")