콘텐츠로 이동

Web Helper API Reference

Backend

models

Database models for CVLab-Kit web helper.

Device

Bases: Base

Database model for compute devices.

Source code in web_helper/backend/models/device.py
class Device(Base):
    """Database model for compute devices."""

    __tablename__ = "devices"

    id = Column(Integer, primary_key=True, index=True)
    host_id = Column(String, unique=True, index=True)

    # GPU monitoring (aggregated for backward compatibility)
    gpu_util = Column(Float)  # Average across all GPUs
    vram_used = Column(Integer)  # MB (sum across all GPUs)
    vram_total = Column(Integer)  # MB (sum across all GPUs)
    gpu_temperature = Column(Float)  # Celsius
    gpu_power_usage = Column(Float)  # Watts

    # Multi-GPU support (new fields)
    gpu_count = Column(Integer, default=0)  # Number of GPUs
    gpus_detail = Column(JSON, nullable=True)  # Array of detailed GPU info

    # System monitoring
    cpu_util = Column(Float)
    memory_used = Column(Integer)  # Bytes
    memory_total = Column(Integer)  # Bytes
    disk_free = Column(Float)  # GB

    # Software versions
    torch_version = Column(String)
    cuda_version = Column(String)

    # Code version for reproducibility
    code_version = Column(JSON, nullable=True)  # {git_hash, files_hash, uv_lock_hash, ...}

    # Status and timing
    status = Column(String, default="offline")
    last_heartbeat = Column(DateTime, default=datetime.utcnow)

JobPriority

Bases: str, Enum

Job priority levels

Source code in web_helper/backend/models/queue.py
class JobPriority(str, Enum):
    """Job priority levels"""

    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    URGENT = "urgent"

JobStatus

Bases: str, Enum

Job status enumeration

Source code in web_helper/backend/models/queue.py
class JobStatus(str, Enum):
    """Job status enumeration"""

    PENDING = "pending"
    QUEUED = "queued"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"
    PAUSED = "paused"
    EXPIRED = "expired"

QueueJob

Bases: BaseModel

Queue job model (Experiment)

Source code in web_helper/backend/models/queue.py
class QueueJob(BaseModel):
    """Queue job model (Experiment)"""

    # Primary key: experiment_uid is unique within a day (YYYYMMDD_hash4)
    experiment_uid: str = Field(
        ..., description="Experiment UID in format {YYYYMMDD}_{hash4} (Primary Key)"
    )
    name: str = Field(..., description="Human-readable job name")
    project: str = Field(..., description="Project name")
    config_path: str = Field(..., description="Path to configuration file")
    # Note: run_uid removed - Queue operates at experiment level only
    # run_name is specified in config YAML and handled by cvlabkit

    # Status and timing
    status: JobStatus = JobStatus.PENDING
    priority: JobPriority = JobPriority.NORMAL
    created_at: datetime = Field(default_factory=datetime.now)
    queued_at: Optional[datetime] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

    # Resource management
    requirements: Optional[ResourceRequirement] = None
    assigned_device: Optional[str] = None

    # Execution details
    command: Optional[str] = None
    working_directory: Optional[str] = None
    environment_vars: Dict[str, str] = Field(default_factory=dict)

    # Progress and results
    progress: float = Field(default=0.0, ge=0.0, le=1.0)
    current_epoch: Optional[int] = None
    total_epochs: Optional[int] = None
    current_metrics: Dict[str, Any] = Field(default_factory=dict)

    # Error handling
    error_message: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3

    # User and metadata
    user: Optional[str] = None
    tags: List[str] = Field(default_factory=list)
    metadata: Dict[str, Any] = Field(default_factory=dict)

QueueStats

Bases: BaseModel

Queue statistics

Source code in web_helper/backend/models/queue.py
class QueueStats(BaseModel):
    """Queue statistics"""

    total_jobs: int = 0
    pending_jobs: int = 0
    queued_jobs: int = 0
    running_jobs: int = 0
    completed_jobs: int = 0
    failed_jobs: int = 0
    cancelled_jobs: int = 0

    # Resource utilization
    total_cpu_cores: int = 0
    used_cpu_cores: int = 0
    total_memory_gb: float = 0.0
    used_memory_gb: float = 0.0
    total_gpu_count: int = 0
    used_gpu_count: int = 0

    # Timing statistics
    avg_queue_time_minutes: Optional[float] = None
    avg_execution_time_minutes: Optional[float] = None
    longest_running_job_minutes: Optional[float] = None

ResourceRequirement

Bases: BaseModel

Resource requirements for a job

Source code in web_helper/backend/models/queue.py
class ResourceRequirement(BaseModel):
    """Resource requirements for a job"""

    cpu_cores: Optional[int] = None
    memory_gb: Optional[float] = None
    gpu_count: Optional[int] = None
    gpu_memory_gb: Optional[float] = None
    disk_space_gb: Optional[float] = None
    estimated_runtime_hours: Optional[float] = None

QueueExperiment

Bases: Base

Database model for queue experiment execution history.

Source code in web_helper/backend/models/queue_experiment.py
class QueueExperiment(Base):
    """Database model for queue experiment execution history."""

    __tablename__ = "queue_experiments"

    id = Column(Integer, primary_key=True, index=True)
    experiment_uid = Column(String, unique=True, index=True)  # e.g., "20251007_7e7e"

    # Basic info
    name = Column(String)
    project = Column(String, index=True)
    status = Column(
        String, default="unknown"
    )  # pending, running, completed, failed, cancelled

    # Timestamps
    created_at = Column(DateTime)
    started_at = Column(DateTime, nullable=True)
    completed_at = Column(DateTime, nullable=True)

    # File paths
    config_path = Column(String)  # Path to config.yaml
    log_path = Column(String)  # Path to terminal_log.log
    error_log_path = Column(String)  # Path to terminal_err.log

    # Execution details
    assigned_device = Column(String, nullable=True)
    priority = Column(String, default="normal")
    pid = Column(Integer, nullable=True)  # Process ID for running jobs
    exit_code = Column(Integer, nullable=True)

    # File checksums for change detection (xxhash3)
    config_hash = Column(String, nullable=True)
    log_hash = Column(String, nullable=True)
    error_log_hash = Column(String, nullable=True)

    # Additional metadata (renamed to avoid SQLAlchemy reserved word)
    meta = Column(JSON, nullable=True)

    # Distributed execution support
    server_origin = Column(String, default="local")  # "local" | "remote-{host_id}"
    sync_status = Column(String, default="synced")  # "synced" | "syncing" | "outdated"
    last_sync_at = Column(DateTime, nullable=True)
    remote_mtime = Column(
        Integer, nullable=True
    )  # Remote file modification time (epoch)
    recovery_checkpoint = Column(JSON, nullable=True)  # {epoch: 5, step: 1000, ...}

    # Last update timestamp
    last_indexed = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

Run

Bases: Base

Database model for experiment runs.

Source code in web_helper/backend/models/run.py
class Run(Base):
    """Database model for experiment runs."""

    __tablename__ = "runs"

    id = Column(Integer, primary_key=True, index=True)
    project = Column(String, index=True)
    run_name = Column(
        String, index=True
    )  # User-specified run identifier (supports overwrite)

    status = Column(String, default="unknown")
    started_at = Column(DateTime)
    finished_at = Column(DateTime)
    config_path = Column(String)
    metrics_path = Column(String)
    checkpoint_path = Column(String)
    total_steps = Column(Integer, default=0)
    hyperparameters = Column(JSON)  # Hyperparameters from config
    final_metrics = Column(JSON)  # Last step metrics
    max_metrics = Column(JSON)  # Maximum value for each metric across all steps
    min_metrics = Column(JSON)  # Minimum value for each metric across all steps
    mean_metrics = Column(JSON)  # Mean value for each metric across all steps
    median_metrics = Column(JSON)  # Median value for each metric across all steps
    last_updated = Column(DateTime, default=datetime.utcnow)

    # File change detection (mtime + size for idempotent reindexing)
    file_fingerprint = Column(String)  # Format: "mtime_size" for metrics file

    # User annotations
    notes = Column(String(2000), default="")  # User notes for this run
    tags = Column(JSON, default=list)  # List of tag strings

    # Composite unique constraint: same run_name can exist in different projects
    __table_args__ = (
        UniqueConstraint("project", "run_name", name="uix_project_run_name"),
    )

CodeVersion

Bases: BaseModel

Code version information for reproducibility tracking.

Worker reports this on each heartbeat to ensure experiment reproducibility. Server stores this with experiment results for audit trail.

Source code in web_helper/backend/models/sync.py
class CodeVersion(BaseModel):
    """Code version information for reproducibility tracking.

    Worker reports this on each heartbeat to ensure experiment reproducibility.
    Server stores this with experiment results for audit trail.
    """

    git_hash: str = Field(..., description="Git commit hash of cvlabkit")
    git_dirty: bool = Field(False, description="True if working directory has uncommitted changes")
    files_hash: str = Field(..., description="xxhash3 of core source files for integrity verification")
    uv_lock_hash: str = Field(..., description="xxhash3 of uv.lock for dependency reproducibility")

    # Optional metadata
    branch: Optional[str] = Field(None, description="Current git branch")
    python_version: Optional[str] = Field(None, description="Python version")
    pytorch_version: Optional[str] = Field(None, description="PyTorch version")
    cuda_version: Optional[str] = Field(None, description="CUDA version")

    # Component versions for experiment reproducibility
    component_versions: Dict[str, str] = Field(
        default_factory=dict,
        description="xxhash3 per component file: {path: hash} (agent + used components)",
    )

FileCheckpoint

Bases: BaseModel

Checkpoint for a single file's sync state.

Used for delta synchronization of append-only files (CSV, log).

Source code in web_helper/backend/models/sync.py
class FileCheckpoint(BaseModel):
    """Checkpoint for a single file's sync state.

    Used for delta synchronization of append-only files (CSV, log).
    """

    file_path: str = Field(..., description="Relative path from experiment directory")
    offset: int = Field(0, description="Last synced byte offset")
    hash: str = Field(..., description="xxhash3 of synced content (for verification)")
    last_synced_at: datetime = Field(default_factory=datetime.utcnow)

SyncCheckpoint

Bases: BaseModel

Complete checkpoint for experiment synchronization.

Stored on both Server and Worker for recovery after disconnection.

Source code in web_helper/backend/models/sync.py
class SyncCheckpoint(BaseModel):
    """Complete checkpoint for experiment synchronization.

    Stored on both Server and Worker for recovery after disconnection.
    """

    experiment_uid: str = Field(..., description="Unique experiment identifier")
    worker_host_id: str = Field(..., description="Host ID of the Worker")

    # File sync state
    file_checkpoints: Dict[str, FileCheckpoint] = Field(
        default_factory=dict,
        description="Checkpoints per file: {filename: FileCheckpoint}",
    )

    # Overall sync state
    sync_status: SyncStatus = Field(SyncStatus.PENDING)
    last_synced_at: Optional[datetime] = Field(None)
    error_message: Optional[str] = Field(None, description="Last error if sync_status is FAILED")

    # Code version at execution time (for reproducibility)
    code_version: Optional[CodeVersion] = Field(None)

SyncRequest

Bases: BaseModel

Request model for file synchronization API.

Worker sends this to Server to upload experiment files.

Source code in web_helper/backend/models/sync.py
class SyncRequest(BaseModel):
    """Request model for file synchronization API.

    Worker sends this to Server to upload experiment files.
    """

    experiment_uid: str
    file_name: str
    content_type: str = Field(..., description="'delta' for append-only, 'full' for complete replacement")
    offset: int = Field(0, description="Start offset for delta sync")
    content_hash: str = Field(..., description="xxhash3 of the content being sent")

SyncResponse

Bases: BaseModel

Response model for file synchronization API.

Server returns this to confirm sync status.

Source code in web_helper/backend/models/sync.py
class SyncResponse(BaseModel):
    """Response model for file synchronization API.

    Server returns this to confirm sync status.
    """

    success: bool
    message: str
    server_offset: int = Field(0, description="Server's current offset (for verification)")
    server_hash: Optional[str] = Field(None, description="Server's hash (for verification)")

SyncStatus

Bases: str, Enum

Synchronization status for experiment files.

Source code in web_helper/backend/models/sync.py
class SyncStatus(str, Enum):
    """Synchronization status for experiment files."""

    PENDING = "pending"  # Not yet started syncing
    SYNCING = "syncing"  # Currently syncing
    SYNCED = "synced"  # Fully synchronized
    FAILED = "failed"  # Sync failed (will retry)
    OUTDATED = "outdated"  # Local has newer data than server

SyncStatusRequest

Bases: BaseModel

Request model for querying sync status.

Worker sends this on reconnection to determine what needs to be re-synced.

Source code in web_helper/backend/models/sync.py
class SyncStatusRequest(BaseModel):
    """Request model for querying sync status.

    Worker sends this on reconnection to determine what needs to be re-synced.
    """

    experiment_uid: str
    worker_host_id: str
    file_names: list[str] = Field(default_factory=list, description="Files to check status for")

SyncStatusResponse

Bases: BaseModel

Response model for sync status query.

Server returns current sync state for requested files.

Source code in web_helper/backend/models/sync.py
class SyncStatusResponse(BaseModel):
    """Response model for sync status query.

    Server returns current sync state for requested files.
    """

    experiment_uid: str
    sync_status: SyncStatus
    file_states: Dict[str, FileCheckpoint] = Field(
        default_factory=dict,
        description="Current state per file on server",
    )
    needs_resync: list[str] = Field(
        default_factory=list,
        description="Files that need to be re-synced",
    )

ComponentVersion

Bases: Base

Database model for component versions (content-addressable storage).

Source code in web_helper/backend/models/component.py
class ComponentVersion(Base):
    """Database model for component versions (content-addressable storage)."""

    __tablename__ = "component_versions"

    id = Column(Integer, primary_key=True, index=True)
    hash = Column(String, unique=True, index=True)  # xxhash3 of content
    path = Column(String, index=True)  # "agent/classification.py"
    category = Column(String, index=True)  # "agent", "model", "transform", etc.
    name = Column(String, index=True)  # "classification", "resnet", etc.
    content = Column(Text)  # Actual code content
    is_active = Column(Boolean, default=False)  # Current active version for this path
    created_at = Column(DateTime, default=datetime.utcnow)

ExperimentComponentManifest

Bases: Base

Database model for experiment component snapshots (reproducibility).

Source code in web_helper/backend/models/component.py
class ExperimentComponentManifest(Base):
    """Database model for experiment component snapshots (reproducibility)."""

    __tablename__ = "experiment_component_manifests"

    id = Column(Integer, primary_key=True, index=True)
    experiment_uid = Column(String, index=True)  # Experiment identifier
    component_path = Column(String)  # "agent/classification.py"
    component_hash = Column(String)  # Hash of component version used
    created_at = Column(DateTime, default=datetime.utcnow)

ComponentVersionResponse

Bases: BaseModel

Response model for component version.

Source code in web_helper/backend/models/component.py
class ComponentVersionResponse(BaseModel):
    """Response model for component version."""

    hash: str
    path: str
    category: str
    name: str
    is_active: bool
    created_at: datetime
    content: Optional[str] = None  # Only included when explicitly requested

    class Config:
        from_attributes = True

ComponentListItem

Bases: BaseModel

List item for component overview.

Source code in web_helper/backend/models/component.py
class ComponentListItem(BaseModel):
    """List item for component overview."""

    path: str
    category: str
    name: str
    active_hash: Optional[str] = None
    version_count: int
    updated_at: Optional[datetime] = None

ComponentUploadRequest

Bases: BaseModel

Request model for uploading a new component version.

Source code in web_helper/backend/models/component.py
class ComponentUploadRequest(BaseModel):
    """Request model for uploading a new component version."""

    path: str  # "agent/classification.py" or "model/resnet.py"
    content: str  # Code content
    activate: bool = True  # Whether to activate this version immediately

ComponentUploadResponse

Bases: BaseModel

Response model for component upload.

Source code in web_helper/backend/models/component.py
class ComponentUploadResponse(BaseModel):
    """Response model for component upload."""

    hash: str
    path: str
    category: str
    name: str
    is_new: bool  # True if this is a new version, False if already exists
    is_active: bool

ComponentActivateRequest

Bases: BaseModel

Request model for activating a specific component version.

Source code in web_helper/backend/models/component.py
class ComponentActivateRequest(BaseModel):
    """Request model for activating a specific component version."""

    hash: str

ComponentDiffRequest

Bases: BaseModel

Request model for comparing two component versions.

Source code in web_helper/backend/models/component.py
class ComponentDiffRequest(BaseModel):
    """Request model for comparing two component versions."""

    from_hash: str
    to_hash: str

ComponentDiffResponse

Bases: BaseModel

Response model for component diff.

Source code in web_helper/backend/models/component.py
class ComponentDiffResponse(BaseModel):
    """Response model for component diff."""

    from_hash: str
    to_hash: str
    from_content: str
    to_content: str
    path: str

ComponentSyncRequest

Bases: BaseModel

Request model for component synchronization.

Source code in web_helper/backend/models/component.py
class ComponentSyncRequest(BaseModel):
    """Request model for component synchronization."""

    required_components: list[str]  # List of paths needed
    local_hashes: dict[str, str]  # path -> local hash mapping

ComponentSyncResponse

Bases: BaseModel

Response model for component synchronization.

Source code in web_helper/backend/models/component.py
class ComponentSyncResponse(BaseModel):
    """Response model for component synchronization."""

    to_download: list[str]  # List of hashes to download
    components: dict[str, ComponentVersionResponse]  # hash -> component info

ExperimentManifestResponse

Bases: BaseModel

Response model for experiment component manifest.

Source code in web_helper/backend/models/component.py
class ExperimentManifestResponse(BaseModel):
    """Response model for experiment component manifest."""

    experiment_uid: str
    components: dict[str, str]  # path -> hash mapping
    created_at: datetime

get_db

Database dependency for FastAPI endpoints.

Source code in web_helper/backend/models/database.py
def get_db():
    """Database dependency for FastAPI endpoints."""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

init_database

Initialize database tables.

Source code in web_helper/backend/models/database.py
def init_database():
    """Initialize database tables."""
    # Import all models to ensure they're registered with Base
    # Note: ProcessState moved to JSON file, no longer in DB

    Base.metadata.create_all(bind=engine)

services

Business logic services.

EventManager

Manages Server-Sent Events connections and broadcasts.

Source code in web_helper/backend/services/event_manager.py
class EventManager:
    """Manages Server-Sent Events connections and broadcasts."""

    def __init__(self):
        self.connections: Set[asyncio.Queue] = set()
        self._running = False

    async def connect(self, request: Request) -> asyncio.Queue:
        """Add a new SSE connection."""
        queue = asyncio.Queue()
        self.connections.add(queue)
        logger.info(f"New SSE connection. Total: {len(self.connections)}")

        # Send initial connection event
        await self.broadcast(
            {
                "type": "connection",
                "message": "Connected to CVLab-Kit Web Helper",
                "timestamp": datetime.utcnow().isoformat() + "Z",
            }
        )

        return queue

    async def disconnect(self, queue: asyncio.Queue):
        """Remove an SSE connection."""
        if queue in self.connections:
            self.connections.remove(queue)
            logger.info(f"SSE connection removed. Total: {len(self.connections)}")

    async def broadcast(self, data: Dict[str, Any]):
        """Broadcast data to all connected clients."""
        if not self.connections:
            return

        message = json.dumps(data)
        disconnected = set()

        # Create a copy to avoid "Set changed size during iteration" error
        for queue in list(self.connections):
            try:
                await asyncio.wait_for(queue.put(message), timeout=1.0)
            except asyncio.TimeoutError:
                logger.warning("SSE queue timeout, marking for removal")
                disconnected.add(queue)
            except Exception as e:
                logger.error(f"Error broadcasting to SSE client: {e}")
                disconnected.add(queue)

        # Remove disconnected clients
        for queue in disconnected:
            await self.disconnect(queue)

    async def send_device_update(self, device_data: Dict[str, Any]):
        """Send device status update."""
        await self.broadcast(
            {
                "type": "device_update",
                "data": device_data,
                "timestamp": datetime.utcnow().isoformat() + "Z",
            }
        )

    async def send_queue_update(self, queue_data: Dict[str, Any]):
        """Send queue status update."""
        await self.broadcast(
            {
                "type": "queue_update",
                "data": queue_data,
                "timestamp": datetime.utcnow().isoformat() + "Z",
            }
        )

    async def send_run_update(self, run_data: Dict[str, Any]):
        """Send experiment run update."""
        await self.broadcast(
            {
                "type": "run_update",
                "data": run_data,
                "timestamp": datetime.utcnow().isoformat() + "Z",
            }
        )
connect async

Add a new SSE connection.

Source code in web_helper/backend/services/event_manager.py
async def connect(self, request: Request) -> asyncio.Queue:
    """Add a new SSE connection."""
    queue = asyncio.Queue()
    self.connections.add(queue)
    logger.info(f"New SSE connection. Total: {len(self.connections)}")

    # Send initial connection event
    await self.broadcast(
        {
            "type": "connection",
            "message": "Connected to CVLab-Kit Web Helper",
            "timestamp": datetime.utcnow().isoformat() + "Z",
        }
    )

    return queue
disconnect async

Remove an SSE connection.

Source code in web_helper/backend/services/event_manager.py
async def disconnect(self, queue: asyncio.Queue):
    """Remove an SSE connection."""
    if queue in self.connections:
        self.connections.remove(queue)
        logger.info(f"SSE connection removed. Total: {len(self.connections)}")
broadcast async

Broadcast data to all connected clients.

Source code in web_helper/backend/services/event_manager.py
async def broadcast(self, data: Dict[str, Any]):
    """Broadcast data to all connected clients."""
    if not self.connections:
        return

    message = json.dumps(data)
    disconnected = set()

    # Create a copy to avoid "Set changed size during iteration" error
    for queue in list(self.connections):
        try:
            await asyncio.wait_for(queue.put(message), timeout=1.0)
        except asyncio.TimeoutError:
            logger.warning("SSE queue timeout, marking for removal")
            disconnected.add(queue)
        except Exception as e:
            logger.error(f"Error broadcasting to SSE client: {e}")
            disconnected.add(queue)

    # Remove disconnected clients
    for queue in disconnected:
        await self.disconnect(queue)
send_device_update async

Send device status update.

Source code in web_helper/backend/services/event_manager.py
async def send_device_update(self, device_data: Dict[str, Any]):
    """Send device status update."""
    await self.broadcast(
        {
            "type": "device_update",
            "data": device_data,
            "timestamp": datetime.utcnow().isoformat() + "Z",
        }
    )
send_queue_update async

Send queue status update.

Source code in web_helper/backend/services/event_manager.py
async def send_queue_update(self, queue_data: Dict[str, Any]):
    """Send queue status update."""
    await self.broadcast(
        {
            "type": "queue_update",
            "data": queue_data,
            "timestamp": datetime.utcnow().isoformat() + "Z",
        }
    )
send_run_update async

Send experiment run update.

Source code in web_helper/backend/services/event_manager.py
async def send_run_update(self, run_data: Dict[str, Any]):
    """Send experiment run update."""
    await self.broadcast(
        {
            "type": "run_update",
            "data": run_data,
            "timestamp": datetime.utcnow().isoformat() + "Z",
        }
    )

FileMonitor

Monitors file system changes in CVLab-Kit logs directory.

Source code in web_helper/backend/services/file_monitor.py
class FileMonitor:
    """Monitors file system changes in CVLab-Kit logs directory."""

    def __init__(self, logs_dir: str = "logs"):
        self.logs_dir = Path(logs_dir)
        self.observer = Observer()
        self.handler = CVLabKitFileHandler(self)
        self._running = False

    async def start(self):
        """Start file monitoring."""
        if self._running:
            logger.warning("File monitor already running")
            return

        # Create logs directory if it doesn't exist
        self.logs_dir.mkdir(exist_ok=True)

        # Start watching
        self.observer.schedule(self.handler, str(self.logs_dir), recursive=True)
        self.observer.start()
        self._running = True

        logger.info(f"File monitor started for directory: {self.logs_dir}")

        # Send initial scan complete event
        await event_manager.broadcast(
            {
                "type": "file_monitor_started",
                "message": f"Monitoring {self.logs_dir} for changes",
                "timestamp": asyncio.get_event_loop().time(),
            }
        )

    async def stop(self):
        """Stop file monitoring."""
        if not self._running:
            return

        self.observer.stop()
        self.observer.join()
        self._running = False

        logger.info("File monitor stopped")

    def is_running(self) -> bool:
        """Check if monitor is running."""
        return self._running and self.observer.is_alive()
start async

Start file monitoring.

Source code in web_helper/backend/services/file_monitor.py
async def start(self):
    """Start file monitoring."""
    if self._running:
        logger.warning("File monitor already running")
        return

    # Create logs directory if it doesn't exist
    self.logs_dir.mkdir(exist_ok=True)

    # Start watching
    self.observer.schedule(self.handler, str(self.logs_dir), recursive=True)
    self.observer.start()
    self._running = True

    logger.info(f"File monitor started for directory: {self.logs_dir}")

    # Send initial scan complete event
    await event_manager.broadcast(
        {
            "type": "file_monitor_started",
            "message": f"Monitoring {self.logs_dir} for changes",
            "timestamp": asyncio.get_event_loop().time(),
        }
    )
stop async

Stop file monitoring.

Source code in web_helper/backend/services/file_monitor.py
async def stop(self):
    """Stop file monitoring."""
    if not self._running:
        return

    self.observer.stop()
    self.observer.join()
    self._running = False

    logger.info("File monitor stopped")
is_running

Check if monitor is running.

Source code in web_helper/backend/services/file_monitor.py
def is_running(self) -> bool:
    """Check if monitor is running."""
    return self._running and self.observer.is_alive()

QueueFileMonitor

Monitors file system changes in queue_logs directory.

Source code in web_helper/backend/services/queue_file_monitor.py
class QueueFileMonitor:
    """Monitors file system changes in queue_logs directory."""

    def __init__(self, queue_logs_dir: str = "web_helper/queue_logs"):
        self.queue_logs_dir = Path(queue_logs_dir)
        self.observer = Observer()
        self.handler = QueueFileHandler(self)
        self._running = False

    async def start(self):
        """Start file monitoring."""
        if self._running:
            logger.warning("Queue file monitor already running")
            return

        # Create queue_logs directory if it doesn't exist
        self.queue_logs_dir.mkdir(parents=True, exist_ok=True)

        # Start watching
        self.observer.schedule(self.handler, str(self.queue_logs_dir), recursive=True)
        self.observer.start()
        self._running = True

        logger.info(f"Queue file monitor started for directory: {self.queue_logs_dir}")

        # Send initial scan complete event
        await event_manager.broadcast(
            {
                "type": "queue_monitor_started",
                "message": f"Monitoring {self.queue_logs_dir} for changes",
                "timestamp": asyncio.get_event_loop().time(),
            }
        )

    async def stop(self):
        """Stop file monitoring."""
        if not self._running:
            return

        self.observer.stop()
        self.observer.join()
        self._running = False

        logger.info("Queue file monitor stopped")

    def is_running(self) -> bool:
        """Check if monitor is running."""
        return self._running and self.observer.is_alive()
start async

Start file monitoring.

Source code in web_helper/backend/services/queue_file_monitor.py
async def start(self):
    """Start file monitoring."""
    if self._running:
        logger.warning("Queue file monitor already running")
        return

    # Create queue_logs directory if it doesn't exist
    self.queue_logs_dir.mkdir(parents=True, exist_ok=True)

    # Start watching
    self.observer.schedule(self.handler, str(self.queue_logs_dir), recursive=True)
    self.observer.start()
    self._running = True

    logger.info(f"Queue file monitor started for directory: {self.queue_logs_dir}")

    # Send initial scan complete event
    await event_manager.broadcast(
        {
            "type": "queue_monitor_started",
            "message": f"Monitoring {self.queue_logs_dir} for changes",
            "timestamp": asyncio.get_event_loop().time(),
        }
    )
stop async

Stop file monitoring.

Source code in web_helper/backend/services/queue_file_monitor.py
async def stop(self):
    """Stop file monitoring."""
    if not self._running:
        return

    self.observer.stop()
    self.observer.join()
    self._running = False

    logger.info("Queue file monitor stopped")
is_running

Check if monitor is running.

Source code in web_helper/backend/services/queue_file_monitor.py
def is_running(self) -> bool:
    """Check if monitor is running."""
    return self._running and self.observer.is_alive()

ComponentStore

Service for managing component versions.

Source code in web_helper/backend/services/component_store.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
class ComponentStore:
    """Service for managing component versions."""

    def __init__(self, db: Session):
        self.db = db

    def get_all_components(self) -> list[ComponentListItem]:
        """Get list of all registered components with their active versions."""
        # Get distinct paths with their active version info
        results = (
            self.db.query(
                ComponentVersion.path,
                ComponentVersion.category,
                ComponentVersion.name,
                func.count(ComponentVersion.id).label("version_count"),
                func.max(ComponentVersion.created_at).label("updated_at"),
            )
            .group_by(ComponentVersion.path)
            .all()
        )

        items = []
        for row in results:
            # Get active hash for this path
            active = (
                self.db.query(ComponentVersion.hash)
                .filter(ComponentVersion.path == row.path, ComponentVersion.is_active == True)
                .first()
            )

            items.append(
                ComponentListItem(
                    path=row.path,
                    category=row.category,
                    name=row.name,
                    active_hash=active.hash if active else None,
                    version_count=row.version_count,
                    updated_at=row.updated_at,
                )
            )

        return items

    def get_components_by_category(self, category: str) -> list[ComponentListItem]:
        """Get components filtered by category."""
        if category not in VALID_CATEGORIES:
            raise ValueError(f"Invalid category: {category}")

        all_components = self.get_all_components()
        return [c for c in all_components if c.category == category]

    def get_component_versions(
        self, category: str, name: str, include_content: bool = False
    ) -> list[ComponentVersionResponse]:
        """Get all versions of a specific component."""
        path = f"{category}/{name}.py"

        query = self.db.query(ComponentVersion).filter(ComponentVersion.path == path)
        versions = query.order_by(ComponentVersion.created_at.desc()).all()

        return [
            ComponentVersionResponse(
                hash=v.hash,
                path=v.path,
                category=v.category,
                name=v.name,
                is_active=v.is_active,
                created_at=v.created_at,
                content=v.content if include_content else None,
            )
            for v in versions
        ]

    def get_version_by_hash(self, hash: str, include_content: bool = True) -> Optional[ComponentVersionResponse]:
        """Get specific version by hash."""
        version = self.db.query(ComponentVersion).filter(ComponentVersion.hash == hash).first()

        if not version:
            return None

        return ComponentVersionResponse(
            hash=version.hash,
            path=version.path,
            category=version.category,
            name=version.name,
            is_active=version.is_active,
            created_at=version.created_at,
            content=version.content if include_content else None,
        )

    def get_active_version(self, path: str) -> Optional[ComponentVersionResponse]:
        """Get current active version for a component path."""
        path = normalize_path(path)

        version = (
            self.db.query(ComponentVersion)
            .filter(ComponentVersion.path == path, ComponentVersion.is_active == True)
            .first()
        )

        if not version:
            return None

        return ComponentVersionResponse(
            hash=version.hash,
            path=version.path,
            category=version.category,
            name=version.name,
            is_active=version.is_active,
            created_at=version.created_at,
            content=version.content,
        )

    def upload_version(self, path: str, content: str, activate: bool = True) -> tuple[ComponentVersionResponse, bool]:
        """Upload a new component version.

        Args:
            path: Component path like "agent/classification.py"
            content: Python code content
            activate: Whether to activate this version

        Returns:
            Tuple of (ComponentVersionResponse, is_new)
        """
        path = normalize_path(path)
        category, name = parse_component_path(path)

        # Calculate content hash
        content_hash = calculate_content_hash(content.encode("utf-8"))

        # Check if this exact version already exists
        existing = self.db.query(ComponentVersion).filter(ComponentVersion.hash == content_hash).first()

        if existing:
            # Version already exists
            is_new = False
            if activate and not existing.is_active:
                self._activate_version(existing)
            return (
                ComponentVersionResponse(
                    hash=existing.hash,
                    path=existing.path,
                    category=existing.category,
                    name=existing.name,
                    is_active=existing.is_active,
                    created_at=existing.created_at,
                ),
                is_new,
            )

        # Create new version
        new_version = ComponentVersion(
            hash=content_hash,
            path=path,
            category=category,
            name=name,
            content=content,
            is_active=False,
            created_at=datetime.utcnow(),
        )
        self.db.add(new_version)

        if activate:
            self._activate_version(new_version)

        self.db.commit()

        return (
            ComponentVersionResponse(
                hash=new_version.hash,
                path=new_version.path,
                category=new_version.category,
                name=new_version.name,
                is_active=new_version.is_active,
                created_at=new_version.created_at,
            ),
            True,
        )

    def activate_version(self, hash: str, restore_local: bool = True) -> Optional[ComponentVersionResponse]:
        """Activate a specific version (rollback).

        Args:
            hash: Content hash of the version to activate
            restore_local: If True, also restore the local file with version content
        """
        version = self.db.query(ComponentVersion).filter(ComponentVersion.hash == hash).first()

        if not version:
            return None

        self._activate_version(version)
        self.db.commit()

        # Restore local file if requested
        if restore_local and version.content:
            try:
                local_path = Path("cvlabkit") / version.path
                local_path.parent.mkdir(parents=True, exist_ok=True)
                local_path.write_text(version.content, encoding="utf-8")
                logger.info(f"Restored local file: {local_path}")
            except Exception as e:
                logger.error(f"Failed to restore local file {version.path}: {e}")

        return ComponentVersionResponse(
            hash=version.hash,
            path=version.path,
            category=version.category,
            name=version.name,
            is_active=True,
            created_at=version.created_at,
        )

    def _activate_version(self, version: ComponentVersion):
        """Internal: Activate a version and deactivate others for same path."""
        # Deactivate all versions for this path
        self.db.query(ComponentVersion).filter(
            ComponentVersion.path == version.path, ComponentVersion.is_active == True
        ).update({"is_active": False})

        # Activate this version
        version.is_active = True

    def get_active_hashes(self, paths: list[str]) -> dict[str, str]:
        """Get active hashes for multiple paths.

        Args:
            paths: List of component paths

        Returns:
            Dictionary of {path: hash} for active versions
        """
        result = {}
        for path in paths:
            try:
                path = normalize_path(path)
                version = self.get_active_version(path)
                if version:
                    result[path] = version.hash
            except ValueError:
                continue
        return result

    # =========================================================================
    # Experiment Manifest (Reproducibility)
    # =========================================================================

    def save_experiment_manifest(self, experiment_uid: str, components: dict[str, str]):
        """Save component versions used in an experiment.

        Args:
            experiment_uid: Experiment identifier
            components: Dictionary of {path: hash}
        """
        # Delete existing manifest for this experiment
        self.db.query(ExperimentComponentManifest).filter(
            ExperimentComponentManifest.experiment_uid == experiment_uid
        ).delete()

        # Create new manifest entries
        now = datetime.utcnow()
        for path, hash in components.items():
            entry = ExperimentComponentManifest(
                experiment_uid=experiment_uid,
                component_path=path,
                component_hash=hash,
                created_at=now,
            )
            self.db.add(entry)

        self.db.commit()

    def get_experiment_manifest(self, experiment_uid: str) -> Optional[ExperimentManifestResponse]:
        """Get component manifest for an experiment."""
        entries = (
            self.db.query(ExperimentComponentManifest)
            .filter(ExperimentComponentManifest.experiment_uid == experiment_uid)
            .all()
        )

        if not entries:
            return None

        components = {e.component_path: e.component_hash for e in entries}

        return ExperimentManifestResponse(
            experiment_uid=experiment_uid,
            components=components,
            created_at=entries[0].created_at,
        )

    # =========================================================================
    # Sync Helpers
    # =========================================================================

    def get_components_to_sync(
        self, required_paths: list[str], local_hashes: dict[str, str]
    ) -> tuple[list[str], dict[str, ComponentVersionResponse]]:
        """Determine which components need to be synced.

        Args:
            required_paths: List of component paths needed
            local_hashes: Current hashes on worker {path: hash}

        Returns:
            Tuple of (hashes_to_download, component_info)
        """
        to_download = []
        components = {}

        for path in required_paths:
            try:
                path = normalize_path(path)
                active = self.get_active_version(path)

                if not active:
                    logger.warning(f"No active version for {path}")
                    continue

                # Check if local hash matches
                local_hash = local_hashes.get(path)
                if local_hash != active.hash:
                    to_download.append(active.hash)

                components[active.hash] = active

            except ValueError as e:
                logger.warning(f"Invalid component path {path}: {e}")
                continue

        return to_download, components

    def scan_local_components(self, base_path: Path = None) -> int:
        """Scan local cvlabkit components and register them.

        Args:
            base_path: Base path to cvlabkit (default: ./cvlabkit)

        Returns:
            Number of components registered
        """
        if base_path is None:
            base_path = Path("cvlabkit")

        count = 0

        # Scan agents
        agent_path = base_path / "agent"
        if agent_path.exists():
            for py_file in agent_path.glob("*.py"):
                if py_file.name.startswith("_"):
                    continue
                try:
                    content = py_file.read_text(encoding="utf-8")
                    self.upload_version(f"agent/{py_file.stem}.py", content, activate=True)
                    count += 1
                except Exception as e:
                    logger.error(f"Failed to register {py_file}: {e}")

        # Scan component categories
        component_path = base_path / "component"
        if component_path.exists():
            for category_dir in component_path.iterdir():
                if not category_dir.is_dir():
                    continue
                category = category_dir.name
                if category not in VALID_CATEGORIES or category == "base":
                    continue

                for py_file in category_dir.glob("*.py"):
                    if py_file.name.startswith("_"):
                        continue
                    try:
                        content = py_file.read_text(encoding="utf-8")
                        self.upload_version(f"{category}/{py_file.stem}.py", content, activate=True)
                        count += 1
                    except Exception as e:
                        logger.error(f"Failed to register {py_file}: {e}")

        return count
get_all_components

Get list of all registered components with their active versions.

Source code in web_helper/backend/services/component_store.py
def get_all_components(self) -> list[ComponentListItem]:
    """Get list of all registered components with their active versions."""
    # Get distinct paths with their active version info
    results = (
        self.db.query(
            ComponentVersion.path,
            ComponentVersion.category,
            ComponentVersion.name,
            func.count(ComponentVersion.id).label("version_count"),
            func.max(ComponentVersion.created_at).label("updated_at"),
        )
        .group_by(ComponentVersion.path)
        .all()
    )

    items = []
    for row in results:
        # Get active hash for this path
        active = (
            self.db.query(ComponentVersion.hash)
            .filter(ComponentVersion.path == row.path, ComponentVersion.is_active == True)
            .first()
        )

        items.append(
            ComponentListItem(
                path=row.path,
                category=row.category,
                name=row.name,
                active_hash=active.hash if active else None,
                version_count=row.version_count,
                updated_at=row.updated_at,
            )
        )

    return items
get_components_by_category

Get components filtered by category.

Source code in web_helper/backend/services/component_store.py
def get_components_by_category(self, category: str) -> list[ComponentListItem]:
    """Get components filtered by category."""
    if category not in VALID_CATEGORIES:
        raise ValueError(f"Invalid category: {category}")

    all_components = self.get_all_components()
    return [c for c in all_components if c.category == category]
get_component_versions

Get all versions of a specific component.

Source code in web_helper/backend/services/component_store.py
def get_component_versions(
    self, category: str, name: str, include_content: bool = False
) -> list[ComponentVersionResponse]:
    """Get all versions of a specific component."""
    path = f"{category}/{name}.py"

    query = self.db.query(ComponentVersion).filter(ComponentVersion.path == path)
    versions = query.order_by(ComponentVersion.created_at.desc()).all()

    return [
        ComponentVersionResponse(
            hash=v.hash,
            path=v.path,
            category=v.category,
            name=v.name,
            is_active=v.is_active,
            created_at=v.created_at,
            content=v.content if include_content else None,
        )
        for v in versions
    ]
get_version_by_hash

Get specific version by hash.

Source code in web_helper/backend/services/component_store.py
def get_version_by_hash(self, hash: str, include_content: bool = True) -> Optional[ComponentVersionResponse]:
    """Get specific version by hash."""
    version = self.db.query(ComponentVersion).filter(ComponentVersion.hash == hash).first()

    if not version:
        return None

    return ComponentVersionResponse(
        hash=version.hash,
        path=version.path,
        category=version.category,
        name=version.name,
        is_active=version.is_active,
        created_at=version.created_at,
        content=version.content if include_content else None,
    )
get_active_version

Get current active version for a component path.

Source code in web_helper/backend/services/component_store.py
def get_active_version(self, path: str) -> Optional[ComponentVersionResponse]:
    """Get current active version for a component path."""
    path = normalize_path(path)

    version = (
        self.db.query(ComponentVersion)
        .filter(ComponentVersion.path == path, ComponentVersion.is_active == True)
        .first()
    )

    if not version:
        return None

    return ComponentVersionResponse(
        hash=version.hash,
        path=version.path,
        category=version.category,
        name=version.name,
        is_active=version.is_active,
        created_at=version.created_at,
        content=version.content,
    )
upload_version

Upload a new component version.

Parameters:

Name Type Description Default
path str

Component path like "agent/classification.py"

required
content str

Python code content

required
activate bool

Whether to activate this version

True

Returns:

Type Description
tuple[ComponentVersionResponse, bool]

Tuple of (ComponentVersionResponse, is_new)

Source code in web_helper/backend/services/component_store.py
def upload_version(self, path: str, content: str, activate: bool = True) -> tuple[ComponentVersionResponse, bool]:
    """Upload a new component version.

    Args:
        path: Component path like "agent/classification.py"
        content: Python code content
        activate: Whether to activate this version

    Returns:
        Tuple of (ComponentVersionResponse, is_new)
    """
    path = normalize_path(path)
    category, name = parse_component_path(path)

    # Calculate content hash
    content_hash = calculate_content_hash(content.encode("utf-8"))

    # Check if this exact version already exists
    existing = self.db.query(ComponentVersion).filter(ComponentVersion.hash == content_hash).first()

    if existing:
        # Version already exists
        is_new = False
        if activate and not existing.is_active:
            self._activate_version(existing)
        return (
            ComponentVersionResponse(
                hash=existing.hash,
                path=existing.path,
                category=existing.category,
                name=existing.name,
                is_active=existing.is_active,
                created_at=existing.created_at,
            ),
            is_new,
        )

    # Create new version
    new_version = ComponentVersion(
        hash=content_hash,
        path=path,
        category=category,
        name=name,
        content=content,
        is_active=False,
        created_at=datetime.utcnow(),
    )
    self.db.add(new_version)

    if activate:
        self._activate_version(new_version)

    self.db.commit()

    return (
        ComponentVersionResponse(
            hash=new_version.hash,
            path=new_version.path,
            category=new_version.category,
            name=new_version.name,
            is_active=new_version.is_active,
            created_at=new_version.created_at,
        ),
        True,
    )
activate_version

Activate a specific version (rollback).

Parameters:

Name Type Description Default
hash str

Content hash of the version to activate

required
restore_local bool

If True, also restore the local file with version content

True
Source code in web_helper/backend/services/component_store.py
def activate_version(self, hash: str, restore_local: bool = True) -> Optional[ComponentVersionResponse]:
    """Activate a specific version (rollback).

    Args:
        hash: Content hash of the version to activate
        restore_local: If True, also restore the local file with version content
    """
    version = self.db.query(ComponentVersion).filter(ComponentVersion.hash == hash).first()

    if not version:
        return None

    self._activate_version(version)
    self.db.commit()

    # Restore local file if requested
    if restore_local and version.content:
        try:
            local_path = Path("cvlabkit") / version.path
            local_path.parent.mkdir(parents=True, exist_ok=True)
            local_path.write_text(version.content, encoding="utf-8")
            logger.info(f"Restored local file: {local_path}")
        except Exception as e:
            logger.error(f"Failed to restore local file {version.path}: {e}")

    return ComponentVersionResponse(
        hash=version.hash,
        path=version.path,
        category=version.category,
        name=version.name,
        is_active=True,
        created_at=version.created_at,
    )
get_active_hashes

Get active hashes for multiple paths.

Parameters:

Name Type Description Default
paths list[str]

List of component paths

required

Returns:

Type Description
dict[str, str]

Dictionary of {path: hash} for active versions

Source code in web_helper/backend/services/component_store.py
def get_active_hashes(self, paths: list[str]) -> dict[str, str]:
    """Get active hashes for multiple paths.

    Args:
        paths: List of component paths

    Returns:
        Dictionary of {path: hash} for active versions
    """
    result = {}
    for path in paths:
        try:
            path = normalize_path(path)
            version = self.get_active_version(path)
            if version:
                result[path] = version.hash
        except ValueError:
            continue
    return result
save_experiment_manifest

Save component versions used in an experiment.

Parameters:

Name Type Description Default
experiment_uid str

Experiment identifier

required
components dict[str, str]

Dictionary of {path: hash}

required
Source code in web_helper/backend/services/component_store.py
def save_experiment_manifest(self, experiment_uid: str, components: dict[str, str]):
    """Save component versions used in an experiment.

    Args:
        experiment_uid: Experiment identifier
        components: Dictionary of {path: hash}
    """
    # Delete existing manifest for this experiment
    self.db.query(ExperimentComponentManifest).filter(
        ExperimentComponentManifest.experiment_uid == experiment_uid
    ).delete()

    # Create new manifest entries
    now = datetime.utcnow()
    for path, hash in components.items():
        entry = ExperimentComponentManifest(
            experiment_uid=experiment_uid,
            component_path=path,
            component_hash=hash,
            created_at=now,
        )
        self.db.add(entry)

    self.db.commit()
get_experiment_manifest

Get component manifest for an experiment.

Source code in web_helper/backend/services/component_store.py
def get_experiment_manifest(self, experiment_uid: str) -> Optional[ExperimentManifestResponse]:
    """Get component manifest for an experiment."""
    entries = (
        self.db.query(ExperimentComponentManifest)
        .filter(ExperimentComponentManifest.experiment_uid == experiment_uid)
        .all()
    )

    if not entries:
        return None

    components = {e.component_path: e.component_hash for e in entries}

    return ExperimentManifestResponse(
        experiment_uid=experiment_uid,
        components=components,
        created_at=entries[0].created_at,
    )
get_components_to_sync

Determine which components need to be synced.

Parameters:

Name Type Description Default
required_paths list[str]

List of component paths needed

required
local_hashes dict[str, str]

Current hashes on worker {path: hash}

required

Returns:

Type Description
tuple[list[str], dict[str, ComponentVersionResponse]]

Tuple of (hashes_to_download, component_info)

Source code in web_helper/backend/services/component_store.py
def get_components_to_sync(
    self, required_paths: list[str], local_hashes: dict[str, str]
) -> tuple[list[str], dict[str, ComponentVersionResponse]]:
    """Determine which components need to be synced.

    Args:
        required_paths: List of component paths needed
        local_hashes: Current hashes on worker {path: hash}

    Returns:
        Tuple of (hashes_to_download, component_info)
    """
    to_download = []
    components = {}

    for path in required_paths:
        try:
            path = normalize_path(path)
            active = self.get_active_version(path)

            if not active:
                logger.warning(f"No active version for {path}")
                continue

            # Check if local hash matches
            local_hash = local_hashes.get(path)
            if local_hash != active.hash:
                to_download.append(active.hash)

            components[active.hash] = active

        except ValueError as e:
            logger.warning(f"Invalid component path {path}: {e}")
            continue

    return to_download, components
scan_local_components

Scan local cvlabkit components and register them.

Parameters:

Name Type Description Default
base_path Path

Base path to cvlabkit (default: ./cvlabkit)

None

Returns:

Type Description
int

Number of components registered

Source code in web_helper/backend/services/component_store.py
def scan_local_components(self, base_path: Path = None) -> int:
    """Scan local cvlabkit components and register them.

    Args:
        base_path: Base path to cvlabkit (default: ./cvlabkit)

    Returns:
        Number of components registered
    """
    if base_path is None:
        base_path = Path("cvlabkit")

    count = 0

    # Scan agents
    agent_path = base_path / "agent"
    if agent_path.exists():
        for py_file in agent_path.glob("*.py"):
            if py_file.name.startswith("_"):
                continue
            try:
                content = py_file.read_text(encoding="utf-8")
                self.upload_version(f"agent/{py_file.stem}.py", content, activate=True)
                count += 1
            except Exception as e:
                logger.error(f"Failed to register {py_file}: {e}")

    # Scan component categories
    component_path = base_path / "component"
    if component_path.exists():
        for category_dir in component_path.iterdir():
            if not category_dir.is_dir():
                continue
            category = category_dir.name
            if category not in VALID_CATEGORIES or category == "base":
                continue

            for py_file in category_dir.glob("*.py"):
                if py_file.name.startswith("_"):
                    continue
                try:
                    content = py_file.read_text(encoding="utf-8")
                    self.upload_version(f"{category}/{py_file.stem}.py", content, activate=True)
                    count += 1
                except Exception as e:
                    logger.error(f"Failed to register {py_file}: {e}")

    return count

Frontend

Refer to TypeScript interface definitions in web_helper/frontend/src/types/.