"""Upload pipeline: wrappers, local build orchestration, and archive upload."""
# pyright: reportPrivateUsage=false
import logging
import shutil
import tarfile
from pathlib import Path
from yt_framework.operations._internal.upload_helpers import (
_BUILD_CODE_DIR,
_copy_module_to_build_dir,
_copy_path_to_build_dir,
_copy_stage_to_build_dir,
_copy_ytjobs_to_build_dir,
_resolve_upload_target,
_validate_upload_config,
)
from yt_framework.operations._internal.upload_wrappers import (
_bash_wrapper_script_body,
_create_unified_wrapper_script,
_resolve_map_reduce_command_scripts,
_resolve_reduce_command_script,
_write_wrapper_file,
)
from yt_framework.utils import log_header, log_success
from yt_framework.yt.clients.client_base import BaseYTClient
def _create_map_reduce_command_wrappers(
stage_name: str,
stage_dir: Path,
build_dir: Path,
logger: logging.Logger,
) -> None:
"""Wrappers for ``tar_command_bootstrap`` map-reduce (JSON stdin/stdout legs)."""
mapper_f, reducer_f = _resolve_map_reduce_command_scripts(stage_dir, logger)
if not mapper_f:
return
m_rel = f"stages/{stage_name}/src/{mapper_f}"
body_m = _bash_wrapper_script_body(
stage_name,
m_rel,
"map-reduce mapper (command mode)",
)
_write_wrapper_file(
build_dir,
f"operation_wrapper_{stage_name}_map_reduce_mapper.sh",
body_m,
logger,
)
if reducer_f:
r_rel = f"stages/{stage_name}/src/{reducer_f}"
body_r = _bash_wrapper_script_body(
stage_name,
r_rel,
"map-reduce reducer (command mode)",
)
_write_wrapper_file(
build_dir,
f"operation_wrapper_{stage_name}_map_reduce_reducer.sh",
body_r,
logger,
)
else:
logger.warning(
"Stage %s: map_reduce_mapper wrapper created but no reducer script — map-reduce command mode will fail until job.map_reduce_command.reducer_script is set",
stage_name,
)
def _create_reduce_command_wrapper(
stage_name: str,
stage_dir: Path,
build_dir: Path,
logger: logging.Logger,
) -> None:
"""Create the reduce-only shell wrapper when ``tar_command_bootstrap`` is enabled."""
red = _resolve_reduce_command_script(stage_dir, logger)
if not red:
return
r_rel = f"stages/{stage_name}/src/{red}"
body = _bash_wrapper_script_body(stage_name, r_rel, "reduce (command mode)")
_write_wrapper_file(
build_dir,
f"operation_wrapper_{stage_name}_reduce.sh",
body,
logger,
)
def _stage_has_mapper_entrypoints(src_dir: Path) -> bool:
if (src_dir / "mapper.py").is_file():
return True
return bool(list(src_dir.glob("partition_*.py")))
def _create_wrappers_for_stage(
stage_name: str,
stage_dir: Path,
build_dir: Path,
logger: logging.Logger,
) -> None:
"""Create wrapper scripts for a stage based on what operation types it has.
Args:
stage_name: Name of the stage
stage_dir: Path to stage directory
build_dir: Local build directory path
logger: Logger instance
Returns:
None
"""
src_dir = stage_dir / "src"
if not src_dir.exists():
return
has_mapper = _stage_has_mapper_entrypoints(src_dir)
has_vanilla = (src_dir / "vanilla.py").exists()
# Create wrapper for each operation type found
if has_mapper:
_create_unified_wrapper_script(
stage_name=stage_name,
operation_type="map",
build_dir=build_dir,
logger=logger,
)
if has_vanilla:
_create_unified_wrapper_script(
stage_name=stage_name,
operation_type="vanilla",
build_dir=build_dir,
logger=logger,
)
if has_mapper:
_create_map_reduce_command_wrappers(
stage_name=stage_name,
stage_dir=stage_dir,
build_dir=build_dir,
logger=logger,
)
_create_reduce_command_wrapper(
stage_name=stage_name,
stage_dir=stage_dir,
build_dir=build_dir,
logger=logger,
)
def _copy_all_upload_modules(
*,
build_dir: Path,
upload_modules: list[str] | None,
logger: logging.Logger,
) -> int:
total = 0
for mod in upload_modules or []:
top_level = mod.split(".", maxsplit=1)[0]
total += _copy_module_to_build_dir(
module_name=mod,
target_dir=build_dir / top_level,
logger=logger,
)
return total
def _copy_all_upload_paths(
*,
build_dir: Path,
upload_paths: list[dict[str, str]] | None,
pipeline_dir: Path,
logger: logging.Logger,
) -> int:
total = 0
for entry in upload_paths or []:
source = entry["source"]
target = _resolve_upload_target(
source=source,
target=entry.get("target"),
_pipeline_dir=pipeline_dir,
)
total += _copy_path_to_build_dir(
source_path=source,
target_name=target,
build_dir=build_dir,
pipeline_dir=pipeline_dir,
logger=logger,
)
return total
def _copy_all_stages_to_build(
*,
build_dir: Path,
stages_dir: Path,
logger: logging.Logger,
) -> tuple[int, list[tuple[str, Path]]]:
stage_files = 0
stage_dirs_list: list[tuple[str, Path]] = []
for stage_dir in stages_dir.iterdir():
if not stage_dir.is_dir() or not (stage_dir / "src").exists():
continue
stage_dirs_list.append((stage_dir.name, stage_dir))
stage_files += _copy_stage_to_build_dir(
build_dir=build_dir,
stage_dir=stage_dir,
logger=logger,
)
return stage_files, stage_dirs_list
[docs]
def build_code_locally(
build_dir: Path,
pipeline_dir: Path,
logger: logging.Logger,
create_wrappers: bool = False, # noqa: FBT001,FBT002
upload_modules: list[str] | None = None,
upload_paths: list[dict[str, str]] | None = None,
) -> int:
"""Build all code in a local build directory.
Copies ytjobs package, optional custom modules/paths, and all stages' code
to the build directory, preserving the same structure as would be uploaded to YT.
Args:
build_dir: Local build directory path
pipeline_dir: Path to pipeline directory
logger: Logger instance
create_wrappers: If True, create wrapper scripts for all stages
upload_modules: Optional list of module names to upload
upload_paths: Optional list of {source, target?} for local paths
Returns:
Total number of files copied
"""
log_header(logger, "Code Build", f"Build directory: {build_dir}")
# Create build directory
build_dir.mkdir(parents=True)
# Validate and resolve upload config
_validate_upload_config(
upload_modules=upload_modules,
upload_paths=upload_paths,
pipeline_dir=pipeline_dir,
)
ytjobs_files = _copy_ytjobs_to_build_dir(
build_dir=build_dir,
logger=logger,
)
module_files = _copy_all_upload_modules(
build_dir=build_dir,
upload_modules=upload_modules,
logger=logger,
)
path_files = _copy_all_upload_paths(
build_dir=build_dir,
upload_paths=upload_paths,
pipeline_dir=pipeline_dir,
logger=logger,
)
# Copy all stages
stages_dir = pipeline_dir / "stages"
stage_files, stage_dirs_list = _copy_all_stages_to_build(
build_dir=build_dir,
stages_dir=stages_dir,
logger=logger,
)
# Create wrapper scripts if requested (for tar archive mode)
if create_wrappers:
for stage_name, stage_dir in stage_dirs_list:
_create_wrappers_for_stage(
stage_name=stage_name,
stage_dir=stage_dir,
build_dir=build_dir,
logger=logger,
)
logger.debug("Created wrapper scripts for %s stages", len(stage_dirs_list))
total_files = ytjobs_files + module_files + path_files + stage_files
log_success(logger, f"Code build completed: {total_files} total files")
return total_files
[docs]
def create_code_archive(
build_dir: Path,
archive_path: Path,
logger: logging.Logger,
) -> None:
"""Create a tar.gz archive from the build directory.
Args:
build_dir: Local build directory path
archive_path: Path where the archive should be created
logger: Logger instance
Returns:
None
"""
log_header(logger, "Code Archive", f"Creating archive: {archive_path}")
# Ensure parent directory exists
archive_path.parent.mkdir(parents=True, exist_ok=True)
with tarfile.open(archive_path, "w:gz") as tar:
# Add all files from build directory
for file_path in build_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(build_dir)
tar.add(file_path, arcname=arcname, recursive=False)
archive_size_mb = archive_path.stat().st_size / (1024 * 1024)
log_success(logger, f"Archive created: {archive_size_mb:.2f} MB")
[docs]
def upload_code_archive(
yt_client: BaseYTClient,
archive_path: Path,
build_folder: str,
logger: logging.Logger,
) -> None:
"""Upload code archive to YT.
Args:
yt_client: YT client instance
archive_path: Local path to the tar.gz archive
build_folder: YT build folder path
logger: Logger instance
Returns:
None
"""
log_header(logger, "Uploading Code Archive to YT")
logger.info("Build folder: %s", build_folder)
archive_name = archive_path.name
archive_yt_path = f"{build_folder}/{archive_name}"
yt_client.upload_file(
local_path=archive_path,
yt_path=archive_yt_path,
create_parent_dir=True, # Create build folder if it doesn't exist
)
log_success(logger, f"Archive uploaded: {archive_yt_path}")
def _resolve_build_code_dir(
pipeline_dir: Path,
logger: logging.Logger,
) -> Path:
"""Resolve build code directory path.
Args:
pipeline_dir: Path to pipeline directory
logger: Logger instance
Returns:
Resolved build code directory path
"""
build_code_dir = pipeline_dir / _BUILD_CODE_DIR
logger.debug("Using build directory: %s", build_code_dir)
# Ensure build directory exists
if build_code_dir.exists():
shutil.rmtree(build_code_dir)
build_code_dir.mkdir(parents=True)
return build_code_dir
[docs]
def upload_all_code(
yt_client: BaseYTClient,
build_folder: str,
pipeline_dir: Path,
logger: logging.Logger,
upload_modules: list[str] | None = None,
upload_paths: list[dict[str, str]] | None = None,
) -> None:
"""Upload all code to YT: ytjobs package, optional custom modules/paths, and stages.
Builds code locally, creates tar archive, and uploads it to YT.
Args:
yt_client: YT client instance
build_folder: YT build folder path
pipeline_dir: Path to pipeline directory
logger: Logger instance
upload_modules: Optional list of module names to upload
upload_paths: Optional list of {source, target?} for local paths
Returns:
None
"""
log_header(
logger,
"Code Upload",
f"Tar archive mode | Build folder: {build_folder}",
)
# Resolve build directory path
build_code_dir = _resolve_build_code_dir(pipeline_dir=pipeline_dir, logger=logger)
# Build code locally (including wrapper scripts for tar archive mode)
build_dir = build_code_dir / "source"
build_code_locally(
build_dir=build_dir,
pipeline_dir=pipeline_dir,
logger=logger,
create_wrappers=True,
upload_modules=upload_modules,
upload_paths=upload_paths,
)
# Create archive
archive_path = build_code_dir / "source.tar.gz"
create_code_archive(
build_dir=build_dir,
archive_path=archive_path,
logger=logger,
)
# Upload archive
upload_code_archive(
yt_client=yt_client,
archive_path=archive_path,
build_folder=build_folder,
logger=logger,
)
log_success(logger, "Code upload completed")