Source code for ytjobs.checkpoint.utils

"""
Checkpoint Utilities for YTsaurus
==================================

Utilities for saving and loading checkpoints in YTsaurus Cypress file system.
Works without internet access - uses YT's internal distributed file system.
"""

import yt.wrapper as yt
import os
import json
import logging
from typing import Optional, Dict, Any


[docs] def get_checkpoint_path(checkpoint_name: str, base_path: Optional[str] = None) -> str: """ Get full YT path for a checkpoint. Args: checkpoint_name: Name of checkpoint file base_path: Base YT path (defaults to user's checkpoints folder) Returns: Full YT path to checkpoint """ if base_path is None: # Default to user's checkpoints folder # Extract username from YT_PROXY or use default username = os.environ.get("YT_USER", "default") base_path = f"//home/{username}/checkpoints" return f"{base_path}/{checkpoint_name}"
[docs] def save_checkpoint( data: bytes, checkpoint_name: str, metadata: Optional[Dict[str, Any]] = None, base_path: Optional[str] = None, logger: Optional[logging.Logger] = None, ) -> str: """ Save checkpoint to YTsaurus Cypress file system. Args: data: Checkpoint data as bytes checkpoint_name: Name of checkpoint file metadata: Optional metadata dictionary base_path: Base YT path for checkpoints logger: Optional logger Returns: Full YT path to saved checkpoint """ log = logger or logging.getLogger(__name__) checkpoint_path = get_checkpoint_path(checkpoint_name, base_path) # Ensure directory exists base_dir = "/".join(checkpoint_path.split("/")[:-1]) try: yt.create("map_node", base_dir, recursive=True, ignore_existing=True) except Exception as e: log.warning(f"Could not create checkpoint directory {base_dir}: {e}") # Save checkpoint try: yt.write_file(checkpoint_path, data, force_create=True, compute_md5=True) log.info(f"Saved checkpoint: {checkpoint_path} ({len(data)} bytes)") except Exception as e: log.error(f"Failed to save checkpoint {checkpoint_path}: {e}") raise # Save metadata if provided if metadata: metadata_path = f"{checkpoint_path}.meta" try: metadata_json = json.dumps(metadata, indent=2) yt.write_file( metadata_path, metadata_json.encode("utf-8"), force_create=True, compute_md5=True, ) log.debug(f"Saved checkpoint metadata: {metadata_path}") except Exception as e: log.warning(f"Failed to save checkpoint metadata: {e}") return checkpoint_path
[docs] def load_checkpoint( checkpoint_name: str, base_path: Optional[str] = None, logger: Optional[logging.Logger] = None, ) -> tuple[Optional[bytes], Optional[Dict[str, Any]]]: """ Load checkpoint and metadata from YTsaurus Cypress. Args: checkpoint_name: Name of checkpoint file base_path: Base YT path for checkpoints logger: Optional logger Returns: Tuple of (checkpoint_data, metadata_dict) or (None, None) if not found """ log = logger or logging.getLogger(__name__) checkpoint_path = get_checkpoint_path(checkpoint_name, base_path) if not yt.exists(checkpoint_path): log.debug(f"Checkpoint not found: {checkpoint_path}") return None, None try: # Load checkpoint data data = yt.read_file(checkpoint_path).read() log.info(f"Loaded checkpoint: {checkpoint_path} ({len(data)} bytes)") # Load metadata if exists metadata = None metadata_path = f"{checkpoint_path}.meta" if yt.exists(metadata_path): try: metadata_json = yt.read_file(metadata_path).read().decode("utf-8") metadata = json.loads(metadata_json) log.debug(f"Loaded checkpoint metadata: {metadata_path}") except Exception as e: log.warning(f"Failed to load checkpoint metadata: {e}") return data, metadata except Exception as e: log.error(f"Failed to load checkpoint {checkpoint_path}: {e}") return None, None
[docs] def list_checkpoints( base_path: Optional[str] = None, pattern: Optional[str] = None, logger: Optional[logging.Logger] = None, ) -> list[str]: """ List available checkpoints in YTsaurus. Args: base_path: Base YT path for checkpoints pattern: Optional pattern to filter checkpoint names logger: Optional logger Returns: List of checkpoint names (without .meta files) """ log = logger or logging.getLogger(__name__) checkpoint_dir = get_checkpoint_path("", base_path).rstrip("/") if not yt.exists(checkpoint_dir): log.debug(f"Checkpoint directory does not exist: {checkpoint_dir}") return [] try: files = yt.list(checkpoint_dir) # Filter out metadata files and apply pattern if provided checkpoints = [] for file in files: if file.endswith(".meta"): continue if pattern and pattern not in file: continue checkpoints.append(file) log.debug(f"Found {len(checkpoints)} checkpoints in {checkpoint_dir}") return sorted(checkpoints) except Exception as e: log.error(f"Failed to list checkpoints in {checkpoint_dir}: {e}") return []
[docs] def delete_checkpoint( checkpoint_name: str, base_path: Optional[str] = None, logger: Optional[logging.Logger] = None, ) -> bool: """ Delete checkpoint and its metadata from YTsaurus. Args: checkpoint_name: Name of checkpoint file base_path: Base YT path for checkpoints logger: Optional logger Returns: True if deleted successfully, False otherwise """ log = logger or logging.getLogger(__name__) checkpoint_path = get_checkpoint_path(checkpoint_name, base_path) try: if yt.exists(checkpoint_path): yt.remove(checkpoint_path, force=True) log.info(f"Deleted checkpoint: {checkpoint_path}") # Delete metadata if exists metadata_path = f"{checkpoint_path}.meta" if yt.exists(metadata_path): yt.remove(metadata_path, force=True) log.debug(f"Deleted checkpoint metadata: {metadata_path}") return True except Exception as e: log.error(f"Failed to delete checkpoint {checkpoint_path}: {e}") return False
[docs] def save_processing_state( state: Dict[str, Any], state_name: str = "processing_state", base_path: Optional[str] = None, logger: Optional[logging.Logger] = None, ) -> str: """ Save processing state (e.g., processed video list, iteration count) to checkpoint. Convenience function for saving JSON-serializable state dictionaries. Args: state: Dictionary with processing state state_name: Name for the state checkpoint base_path: Base YT path for checkpoints logger: Optional logger Returns: Full YT path to saved checkpoint """ log = logger or logging.getLogger(__name__) state_json = json.dumps(state, indent=2) checkpoint_name = f"{state_name}.json" return save_checkpoint( data=state_json.encode("utf-8"), checkpoint_name=checkpoint_name, metadata={"type": "processing_state", "state_name": state_name}, base_path=base_path, logger=log, )
[docs] def load_processing_state( state_name: str = "processing_state", base_path: Optional[str] = None, logger: Optional[logging.Logger] = None, ) -> Optional[Dict[str, Any]]: """ Load processing state from checkpoint. Convenience function for loading JSON-serializable state dictionaries. Args: state_name: Name of the state checkpoint base_path: Base YT path for checkpoints logger: Optional logger Returns: State dictionary or None if not found """ log = logger or logging.getLogger(__name__) checkpoint_name = f"{state_name}.json" data, metadata = load_checkpoint(checkpoint_name, base_path, log) if data is None: return None try: state = json.loads(data.decode("utf-8")) log.debug(f"Loaded processing state: {state_name}") return state except Exception as e: log.error(f"Failed to parse processing state {state_name}: {e}") return None