From 316aeb3048f683db857bb4b35574b0cc7103bc76 Mon Sep 17 00:00:00 2001 From: Champ-Goblem Date: Wed, 4 Mar 2026 21:34:32 +0000 Subject: [PATCH] Add Northflank launcher and runner for GPU job execution Implement Northflank integration for running kernel benchmarks on managed GPU infrastructure with object storage result delivery. Files: - northflank-runner.py: Container entrypoint that parses compressed config from env vars, executes benchmarks, and uploads results to object storage for retrieval - northflank.py: NorthflankLauncher that triggers jobs via REST API, polls for completion, and downloads results from storage Features: - Configurable repo URL and branch for testing - Timeout management based on submission mode - Compressed payload encoding for config transfer - Environment-based storage configuration Co-Authored-By: Claude Sonnet 4.5 Signed-off-by: Champ-Goblem --- pyproject.toml | 1 + scripts/northflank_test_payload.json | 8 + scripts/test_northflank_basic.py | 177 ++++++++ src/libkernelbot/consts.py | 1 + src/libkernelbot/launchers/__init__.py | 3 +- src/libkernelbot/launchers/northflank.py | 533 +++++++++++++++++++++++ src/runners/northflank-runner.py | 116 +++++ 7 files changed, 838 insertions(+), 1 deletion(-) create mode 100644 scripts/northflank_test_payload.json create mode 100755 scripts/test_northflank_basic.py create mode 100644 src/libkernelbot/launchers/northflank.py create mode 100644 src/runners/northflank-runner.py diff --git a/pyproject.toml b/pyproject.toml index 20d4fa16..2b486490 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "fastapi[all]", "uvicorn", "jinja2", + "minio", ] [project.optional-dependencies] diff --git a/scripts/northflank_test_payload.json b/scripts/northflank_test_payload.json new file mode 100644 index 00000000..13d16037 --- /dev/null +++ b/scripts/northflank_test_payload.json @@ -0,0 +1,8 @@ +{ + "lang": "py", + "sources": { + "main.py": "import torch\nimport os\n\nprint(f\"CUDA available: {torch.cuda.is_available()}\")\nprint(f\"PyTorch version: {torch.__version__}\")\nprint(f\"ROCm version: {torch.version.hip if hasattr(torch.version, 'hip') else 'N/A'}\")\n\nif torch.cuda.is_available():\n device_name = torch.cuda.get_device_name(0)\n print(f\"Device: {device_name}\")\n x = torch.randn(5, device='cuda')\n print(f\"Random tensor on GPU: {x}\")\n y = x * 2.0\n print(f\"Multiplied tensor: {y}\")\n print(\"Test passed!\")\nelse:\n print(\"WARNING: CUDA not available!\")\n import sys\n sys.exit(1)" + }, + "main": "main.py", + "mode": "script" +} diff --git a/scripts/test_northflank_basic.py b/scripts/test_northflank_basic.py new file mode 100755 index 00000000..1c1431aa --- /dev/null +++ b/scripts/test_northflank_basic.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +""" +Basic smoke test for the Northflank launcher. + +This script tests: +1. Launcher initialization with proper credentials +2. Job triggering with a simple payload +3. Status polling until completion +4. Result retrieval from logs + +Usage: + python scripts/test_northflank_basic.py + +Environment variables required: + NORTHFLANK_API_TOKEN - API token for Northflank + NORTHFLANK_PROJECT_ID - Project ID + NORTHFLANK_AMD_JOB_ID - AMD GPU job ID + NORTHFLANK_NVIDIA_JOB_ID - (optional) NVIDIA GPU job ID +""" + +import asyncio +import json +import os +import sys +from pathlib import Path + +# Add src to path +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from libkernelbot.consts import get_gpu_by_name +from libkernelbot.launchers.northflank import NorthflankLauncher +from libkernelbot.report import RunProgressReporter + + +class ConsoleProgressReporter(RunProgressReporter): + """Simple console-based progress reporter for testing.""" + + def __init__(self, title: str): + super().__init__(title) + print(f"[{self.title}]") + + async def _update_message(self): + """Print the last line to console.""" + if self.lines: + print(f" {self.lines[-1]}") + + async def display_report(self, title: str, report): + """Display report - not implemented for console.""" + print(f"[Report: {title}]") + + +async def main(): + """Run a basic smoke test of the Northflank launcher.""" + + # Check for required environment variables + required_vars = [ + "NORTHFLANK_API_TOKEN", + "NORTHFLANK_PROJECT_ID", + "NORTHFLANK_JOB_ID", + ] + + missing_vars = [var for var in required_vars if not os.getenv(var)] + if missing_vars: + print(f"❌ Error: Missing required environment variables: {', '.join(missing_vars)}") + print("\nRequired environment variables:") + print(" NORTHFLANK_API_TOKEN - Your Northflank API token") + print(" NORTHFLANK_PROJECT_ID - Your Northflank project ID") + print(" NORTHFLANK_JOB_ID - Job ID for GPU workloads") + print("\nOptional:") + print(" NORTHFLANK_REPO_URL - Git repository URL") + print(" NORTHFLANK_REPO_BRANCH - Git branch to clone") + return 1 + + print("=" * 60) + print("Northflank Launcher - Basic Smoke Test") + print("=" * 60) + + # Initialize the launcher + print("\n1️⃣ Initializing Northflank launcher...") + try: + launcher = NorthflankLauncher( + api_token=os.environ["NORTHFLANK_API_TOKEN"], + project_id=os.environ["NORTHFLANK_PROJECT_ID"], + job_id=os.environ["NORTHFLANK_JOB_ID"], + repo_url=os.environ.get("NORTHFLANK_REPO_URL"), + repo_branch=os.environ.get("NORTHFLANK_REPO_BRANCH"), + ) + print("✅ Launcher initialized successfully") + print(f" Project ID: {launcher.project_id}") + print(f" Job ID: {launcher.job_id}") + print(f" Repo: {launcher.repo_url}") + print(f" Branch: {launcher.repo_branch}") + except Exception as e: + print(f"❌ Failed to initialize launcher: {e}") + return 1 + + # Load test payload + print("\n2️⃣ Loading test payload...") + payload_path = Path(__file__).parent / "northflank_test_payload.json" + if not payload_path.exists(): + print(f"❌ Test payload not found at: {payload_path}") + return 1 + + payload = json.loads(payload_path.read_text()) + print("✅ Test payload loaded") + print(f" Language: {payload['lang']}") + print(f" Mode: {payload['mode']}") + + # Prepare config for submission + config = { + **payload, + "mode": "test", + "test_timeout": 180, + "problem": "smoke_test", + } + + # Select GPU + print("\n3️⃣ Selecting GPU...") + gpu = get_gpu_by_name("MI300") + if not gpu: + print("❌ Failed to get GPU type MI300") + return 1 + print(f"✅ Selected GPU: {gpu.name} ({gpu.value})") + + # Create status reporter + status = ConsoleProgressReporter(title="Smoke Test") + + # Run submission + print("\n4️⃣ Triggering Northflank job...") + print(" (This will take a few minutes...)") + + try: + result = await launcher.run_submission(config, gpu, status) + except Exception as e: + print(f"\n❌ Job execution failed: {e}") + import traceback + traceback.print_exc() + return 1 + + # Check results + print("\n5️⃣ Checking results...") + print(f" Success: {result.success}") + print(f" Error: {result.error}") + print(f" Runs: {len(result.runs)}") + + if result.system: + print(f" System Info:") + print(f" Platform: {getattr(result.system, 'platform', 'N/A')}") + print(f" GPU: {getattr(result.system, 'gpu_name', 'N/A')}") + + if result.runs: + print(f"\n Run details:") + for run_name, run_result in result.runs.items(): + print(f" {run_name}:") + print(f" Start: {run_result.start}") + print(f" End: {run_result.end}") + if run_result.run: + print(f" Exit code: {run_result.run.exit_code}") + print(f" Success: {run_result.run.success}") + if run_result.run.stdout: + print(f" Stdout (first 200 chars): {run_result.run.stdout[:200]}") + if run_result.run.stderr: + print(f" Stderr: {run_result.run.stderr[:200]}") + + if result.success: + print("\n✅ Smoke test PASSED!") + print(" The Northflank launcher is working correctly.") + return 0 + else: + print("\n❌ Smoke test FAILED!") + print(f" Error: {result.error}") + return 1 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + sys.exit(exit_code) diff --git a/src/libkernelbot/consts.py b/src/libkernelbot/consts.py index 7b719622..5daea62e 100644 --- a/src/libkernelbot/consts.py +++ b/src/libkernelbot/consts.py @@ -14,6 +14,7 @@ class SchedulerType(Enum): GITHUB = "github" MODAL = "modal" SLURM = "slurm" + NORTHFLANK = "northflank" class GitHubGPU(Enum): diff --git a/src/libkernelbot/launchers/__init__.py b/src/libkernelbot/launchers/__init__.py index df47476f..92907b21 100644 --- a/src/libkernelbot/launchers/__init__.py +++ b/src/libkernelbot/launchers/__init__.py @@ -1,5 +1,6 @@ from .github import GitHubLauncher from .launcher import Launcher from .modal import ModalLauncher +from .northflank import NorthflankLauncher -__all__ = [Launcher, GitHubLauncher, ModalLauncher] +__all__ = [Launcher, GitHubLauncher, ModalLauncher, NorthflankLauncher] diff --git a/src/libkernelbot/launchers/northflank.py b/src/libkernelbot/launchers/northflank.py new file mode 100644 index 00000000..896dd78b --- /dev/null +++ b/src/libkernelbot/launchers/northflank.py @@ -0,0 +1,533 @@ +import asyncio +import base64 +import datetime +import json +import math +import os +import uuid +import zlib +from typing import Optional + +import requests + +from libkernelbot.consts import ( + DEFAULT_GITHUB_TIMEOUT_MINUTES, + GitHubGPU, + SubmissionMode, +) +from libkernelbot.report import RunProgressReporter +from libkernelbot.run_eval import ( + CompileResult, + EvalResult, + FullResult, + ProfileResult, + RunResult, + SystemInfo, +) +from libkernelbot.utils import KernelBotError, setup_logging + +from .launcher import Launcher + +logger = setup_logging() + +# Northflank-specific timeout buffer to allow for container image pulling +NORTHFLANK_TIMEOUT_BUFFER_MINUTES = 30 + + +def get_timeout(config: dict) -> int: + """Calculate timeout in minutes based on submission mode.""" + mode = config.get("mode") + sec_map = { + SubmissionMode.TEST.value: config.get("test_timeout"), + SubmissionMode.BENCHMARK.value: config.get("benchmark_timeout"), + SubmissionMode.LEADERBOARD.value: config.get("ranked_timeout"), + } + seconds = sec_map.get(mode) or DEFAULT_GITHUB_TIMEOUT_MINUTES * 60 + return math.ceil(seconds / 60) + + +class NorthflankLauncher(Launcher): + """ + Launcher that executes kernel benchmarks using Northflank jobs. + + Northflank is a managed container platform that can run jobs with GPU support. + """ + + def __init__( + self, + api_token: str, + project_id: str, + job_id: str, + repo_url: Optional[str] = None, + repo_branch: Optional[str] = None, + ): + """ + Initialize the Northflank launcher. + + Args: + api_token: Northflank API token for authentication + project_id: Northflank project ID where jobs are defined + job_id: Job ID for GPU workloads + repo_url: Optional Git repository URL (defaults to gpu-mode/kernelbot) + repo_branch: Optional Git branch to clone (defaults to main) + """ + super().__init__(name="Northflank", gpus=GitHubGPU) + self.token = api_token or os.getenv("NORTHFLANK_API_TOKEN") + if not self.token: + raise KernelBotError("Northflank API token required. Set NORTHFLANK_API_TOKEN or pass api_token parameter.") + self.project_id = project_id + self.job_id = job_id + self.repo_url = repo_url or os.getenv( + "NORTHFLANK_REPO_URL", "https://github.com/gpu-mode/kernelbot.git" + ) + self.repo_branch = repo_branch or os.getenv("NORTHFLANK_REPO_BRANCH", "main") + + async def run_submission( + self, config: dict, gpu_type, status: RunProgressReporter + ) -> FullResult: + """ + Execute a kernel submission on Northflank. + + Args: + config: Submission configuration (mode, lang, problem, etc.) + gpu_type: GPU type (unused, kept for API compatibility) + status: Progress reporter for status updates + + Returns: + FullResult containing benchmark results and system info + """ + lang = config["lang"] + lang_name = {"py": "Python", "cu": "CUDA"}[lang] + + logger.info(f"Attempting to trigger Northflank job {self.job_id} for {lang_name}") + + # Generate run ID for this submission + run_id = str(uuid.uuid4()) + + run = NorthflankRun( + project_id=self.project_id, + job_id=self.job_id, + token=self.token, + repo_url=self.repo_url, + repo_branch=self.repo_branch, + ) + + # Store run_id for later download + run.internal_run_id = run_id + + # Encode config as compressed base64 payload + payload = base64.b64encode(zlib.compress(json.dumps(config).encode("utf-8"))).decode("utf-8") + + # Prepare environment variables for the job + env_vars = { + "PAYLOAD": payload, + "RUN_ID": run_id, + } + + # Generate presigned URL for result upload + try: + from minio import Minio + from datetime import timedelta + + endpoint = os.getenv("MINIO_ENDPOINT") + access_key = os.getenv("MINIO_ACCESS_KEY") + secret_key = os.getenv("MINIO_SECRET_KEY") + bucket = os.getenv("MINIO_BUCKET") + use_ssl = os.getenv("MINIO_USE_SSL", "true").lower() == "true" + + if all([endpoint, access_key, secret_key, bucket]): + client = Minio( + endpoint, + access_key=access_key, + secret_key=secret_key, + secure=use_ssl, + ) + + # Generate presigned PUT URL (valid for 2 hours) + object_key = f"results/{run_id}/result.json" + upload_url = client.presigned_put_object( + bucket, + object_key, + expires=timedelta(hours=2) + ) + + env_vars["RESULT_UPLOAD_URL"] = upload_url + logger.info(f"Generated presigned upload URL for {object_key}") + else: + logger.warning("MinIO configuration incomplete, presigned URL not generated") + + except ImportError: + logger.warning("minio package not installed, presigned URL not generated") + except Exception as e: + logger.warning(f"Failed to generate presigned upload URL: {e}") + # Continue without upload URL - job will fail to upload but won't crash + + logger.info(f"Triggering Northflank job with run_id: {env_vars['RUN_ID']}") + + if not await run.trigger(env_vars): + raise RuntimeError("Failed to trigger Northflank job. Please check the configuration.") + + await status.push("⏳ Waiting for job to complete...") + logger.info("Waiting for job to complete...") + + timeout = get_timeout(config) + NORTHFLANK_TIMEOUT_BUFFER_MINUTES + + logger.info(f"Waiting for job to complete... (timeout: {timeout} minutes)") + await run.wait_for_completion(lambda x: self.wait_callback(x, status), timeout_minutes=timeout) + + await status.update(f"Job [{run.run_id}](<{run.job_url}>) completed") + logger.info(f"Job [{run.run_id}]({run.job_url}) completed") + await status.push("Downloading results from MinIO...") + logger.info("Downloading results from MinIO...") + + # Download result from MinIO + try: + result_data = await run.download_from_minio() + except Exception as e: + logger.error(f"Could not download result from MinIO: {e}", exc_info=True) + await status.push("Downloading results from MinIO... failed") + return FullResult( + success=False, error=f"Could not download results from MinIO: {str(e)}", runs={}, system=SystemInfo() + ) + + await status.update("Downloading results from MinIO... done") + logger.info("Downloading results from MinIO... done") + + data = json.loads(result_data) + runs = {} + + # Convert JSON back to EvalResult structures + for k, v in data["runs"].items(): + comp_res = None if v.get("compilation") is None else CompileResult(**v["compilation"]) + run_res = None if v.get("run") is None else RunResult(**v["run"]) + profile_res = None if v.get("profile") is None else ProfileResult(**v["profile"]) + + res = EvalResult( + start=datetime.datetime.fromisoformat(v["start"]), + end=datetime.datetime.fromisoformat(v["end"]), + compilation=comp_res, + run=run_res, + profile=profile_res, + ) + runs[k] = res + + system = SystemInfo(**data.get("system", {})) + return FullResult(success=True, error="", runs=runs, system=system) + + async def wait_callback(self, run: "NorthflankRun", status: RunProgressReporter): + """Callback for status updates during job execution.""" + await status.update( + f"⏳ Job [{run.run_id}](<{run.job_url}>): {run.status} " + f"({run.elapsed_time.total_seconds():.1f}s)" + ) + + +class NorthflankRun: + """ + Represents a single Northflank job run. + + This class handles: + - Triggering a job with environment variables + - Polling job status until completion + - Downloading results from logs + """ + + API_BASE = "https://api.northflank.com/v1" + + def __init__( + self, + project_id: str, + job_id: str, + token: str, + repo_url: str = "https://github.com/gpu-mode/kernelbot.git", + repo_branch: str = "main", + ): + """ + Initialize a Northflank job run. + + Args: + project_id: Northflank project ID + job_id: Northflank job ID to execute + token: API token for authentication + repo_url: Git repository URL to clone + repo_branch: Git branch to checkout + """ + self.project_id = project_id + self.job_id = job_id + self.token = token + self.repo_url = repo_url + self.repo_branch = repo_branch + self.run_id: Optional[str] = None + self.run_name: Optional[str] = None + self.internal_run_id: Optional[str] = None # RUN_ID passed to job for MinIO + self.start_time: Optional[datetime.datetime] = None + self._status: Optional[str] = None + self._job_url: Optional[str] = None + + @property + def job_url(self) -> str: + """Get the Northflank UI URL for this job run.""" + if self._job_url: + return self._job_url + return f"https://app.northflank.com/projects/{self.project_id}/jobs/{self.job_id}" + + @property + def status(self) -> Optional[str]: + """Get the current job status.""" + return self._status + + @property + def elapsed_time(self) -> Optional[datetime.timedelta]: + """Get elapsed time since job started.""" + if self.start_time is None: + return None + return datetime.datetime.now(datetime.timezone.utc) - self.start_time + + def _make_request( + self, method: str, endpoint: str, json_data: Optional[dict] = None + ) -> requests.Response: + """Make an authenticated request to the Northflank API.""" + url = f"{self.API_BASE}{endpoint}" + headers = { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json", + } + + response = requests.request( + method=method, + url=url, + headers=headers, + json=json_data, + timeout=30, + ) + + return response + + async def wait_for_completion( + self, + callback, + timeout_minutes: int = 10, + ): + """ + Wait for the job to complete, polling status periodically. + + Args: + callback: Async function to call on each status update + timeout_minutes: Maximum time to wait before cancelling + + Raises: + TimeoutError: If the job exceeds the timeout + """ + if self.run_id is None: + raise ValueError("Job needs to be triggered before status check!") + + self.start_time = datetime.datetime.now(datetime.timezone.utc) + timeout = datetime.timedelta(minutes=timeout_minutes) + + # Endpoint to get specific run details + endpoint = f"/projects/{self.project_id}/jobs/{self.job_id}/runs/{self.run_id}" + + while True: + try: + if self.elapsed_time > timeout: + # Abort the job run on timeout + try: + logger.info(f"Aborting job {self.run_id} due to timeout") + abort_endpoint = f"/projects/{self.project_id}/jobs/{self.job_id}/runs/{self.run_id}" + await asyncio.to_thread(self._make_request, "DELETE", abort_endpoint) + logger.info(f"Job {self.run_id} aborted successfully") + # Wait briefly for abort to process + await asyncio.sleep(2) + except Exception as e: + logger.warning(f"Failed to abort job {self.run_id}: {e}") + # Continue with timeout error even if abort fails + + logger.warning(f"Job {self.run_id} exceeded {timeout_minutes} minute timeout") + raise TimeoutError(f"Job {self.run_id} exceeded {timeout_minutes} minute timeout") + + # Get job run details + response = await asyncio.to_thread(self._make_request, "GET", endpoint) + + if response.status_code == 200: + data = response.json() + run_data = data.get("data", {}) + + # Update status from Northflank API + self._status = run_data.get("status", "UNKNOWN") + concluded = run_data.get("concluded", False) + + logger.debug(f"Job {self.run_id} status: {self._status}, concluded: {concluded}") + + # Check if job is complete + if concluded or self._status in ("SUCCESS", "FAILED"): + logger.info(f"Job {self.run_id} completed with status: {self._status}") + return + else: + logger.warning( + f"Failed to get job status. Status code: {response.status_code}, " + f"Response: {response.text}" + ) + self._status = "UNKNOWN" + + await callback(self) + await asyncio.sleep(30) # Poll every 30 seconds + + except TimeoutError: + raise + except Exception as e: + logger.error(f"Error waiting for job {self.run_id}: {e}", exc_info=e) + raise + + async def download_from_minio(self) -> str: + """ + Download the result from MinIO. + + Expects MinIO configuration via environment variables: + - MINIO_ENDPOINT: MinIO server endpoint + - MINIO_ACCESS_KEY: Access key for authentication + - MINIO_SECRET_KEY: Secret key for authentication + - MINIO_BUCKET: Bucket name + - MINIO_USE_SSL: Whether to use SSL (default: true) + + Returns: + String content of the result JSON + + Raises: + RuntimeError: If download fails + """ + try: + from minio import Minio + except ImportError: + raise RuntimeError("minio package not installed. Install with: pip install minio") + + logger.info(f"Downloading result from storage for run {self.run_id}") + logger.info(f"Using internal_run_id: {self.internal_run_id} for object key") + + # Get storage configuration from environment + endpoint = os.getenv("MINIO_ENDPOINT") + access_key = os.getenv("MINIO_ACCESS_KEY") + secret_key = os.getenv("MINIO_SECRET_KEY") + bucket = os.getenv("MINIO_BUCKET") + use_ssl = os.getenv("MINIO_USE_SSL", "true").lower() == "true" + + if not all([endpoint, access_key, secret_key, bucket]): + raise RuntimeError( + "MinIO configuration missing. Required env vars: " + "MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY, MINIO_BUCKET" + ) + + # Construct object key + # Use the internal_run_id that was passed to the job + object_key = f"results/{self.internal_run_id}/result.json" + + try: + # Create MinIO client + client = Minio( + endpoint, + access_key=access_key, + secret_key=secret_key, + secure=use_ssl, + ) + + # Download object + response = await asyncio.to_thread( + client.get_object, + bucket, + object_key + ) + + # Read content + result_data = response.read().decode("utf-8") + response.close() + response.release_conn() + + logger.info(f"Successfully downloaded result from MinIO: {object_key}") + return result_data + + except Exception as e: + raise RuntimeError(f"Failed to download result from MinIO: {str(e)}") from e + + + async def trigger(self, env_vars: dict) -> bool: + """ + Trigger the Northflank job with the full benchmark command. + + The runner will upload results to MinIO after completion. + + Args: + env_vars: Dictionary of environment variables to pass to the job + + Returns: + True if the job was successfully triggered, False otherwise + """ + internal_run_id = env_vars.get("RUN_ID", str(uuid.uuid4())) + self.internal_run_id = internal_run_id # Store for later MinIO download + + logger.info(f"Triggering job with internal_run_id (for MinIO): {internal_run_id}") + + # Northflank job run API endpoint + endpoint = f"/projects/{self.project_id}/jobs/{self.job_id}/runs" + + # Extract repo name from URL for cd command + repo_name = self.repo_url.rstrip("/").split("/")[-1].replace(".git", "") + + # Build command to run inside container + command_parts = [ + # Clone the repository + f"git clone -b {self.repo_branch} {self.repo_url}", + f"cd {repo_name}", + # Install kernelbot and minio + "pip install --break-system-packages -e .", + "pip install --break-system-packages minio", + # Run the northflank-runner.py script (will upload to MinIO) + "python3 src/runners/northflank-runner.py", + ] + + full_command = " && ".join(command_parts) + + logger.debug(f"Generated command: {full_command}") + + # Prepare the request payload + payload = { + "runtimeEnvironment": env_vars, + "deployment": { + "docker": { + "configType": "customEntrypointCustomCommand", + "customEntrypoint": "/bin/bash", + "customCommand": f"-c {json.dumps(full_command)}", + } + }, + } + + logger.info(f"Triggering Northflank job {self.job_id} with internal run_id {internal_run_id}") + + try: + response = await asyncio.to_thread(self._make_request, "POST", endpoint, payload) + + if response.status_code in (200, 201, 202): + result = response.json() + logger.info(f"Job triggered successfully: {result}") + + # Store the Northflank-assigned run ID and name + if "data" in result: + self.run_id = result["data"].get("id") + self.run_name = result["data"].get("runName") + + if self.run_id: + self._job_url = f"https://app.northflank.com/projects/{self.project_id}/jobs/{self.job_id}/runs/{self.run_id}" + logger.info(f"Northflank assigned run ID: {self.run_id}, run name: {self.run_name}") + else: + logger.warning("Northflank did not return a run ID") + return False + + self.start_time = datetime.datetime.now(datetime.timezone.utc) + return True + else: + logger.error( + f"Failed to trigger job. Status: {response.status_code}, " f"Response: {response.text}" + ) + return False + + except Exception as e: + logger.error(f"Error triggering Northflank job: {e}", exc_info=True) + return False diff --git a/src/runners/northflank-runner.py b/src/runners/northflank-runner.py new file mode 100644 index 00000000..032e6f09 --- /dev/null +++ b/src/runners/northflank-runner.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +""" +Northflank runner script for executing kernel benchmarks. + +This script is executed inside a Northflank job container. It: +1. Reads the payload from environment variables +2. Decompresses and parses the submission config +3. Runs the benchmark +4. Outputs results both as a file and to stdout (for log parsing) + +Environment variables expected: +- PAYLOAD: Base64-encoded, zlib-compressed JSON config +- REQUIREMENTS: Python packages to install (handled by job setup) +- RUN_ID: Unique identifier for this run +- RUNNER: (AMD only) Runner type for GPU selection +""" + +import base64 +import json +import os +import sys +import zlib +from dataclasses import asdict +from datetime import datetime +from pathlib import Path + +from libkernelbot.run_eval import run_config + + +def serialize(obj: object): + """Custom JSON serializer for datetime objects.""" + if isinstance(obj, datetime): + return obj.isoformat() + raise TypeError(f"Type {type(obj)} not serializable") + + +def main(): + """Execute the benchmark and output results.""" + + # Get payload from environment variable + payload_b64 = os.environ.get("PAYLOAD") + if not payload_b64: + print("ERROR: PAYLOAD environment variable not set", file=sys.stderr) + sys.exit(1) + + run_id = os.environ.get("RUN_ID", "unknown") + print(f"Starting Northflank run {run_id}") + + # Decompress and parse the payload + try: + payload = zlib.decompress(base64.b64decode(payload_b64)).decode("utf-8") + config = json.loads(payload) + print(f"Loaded config: {config.get('problem', 'unknown')} in {config.get('mode', 'unknown')} mode") + except Exception as e: + print(f"ERROR: Failed to parse payload: {e}", file=sys.stderr) + sys.exit(1) + + # Run the benchmark + try: + print("Running benchmark...") + result = asdict(run_config(config)) + print("Benchmark completed successfully") + except Exception as e: + print(f"ERROR: Benchmark failed: {e}", file=sys.stderr) + # Create a minimal error result + result = { + "success": False, + "error": str(e), + "runs": {}, + "system": {} + } + + # Serialize the result + result_json = json.dumps(result, default=serialize, indent=2) + + # Write to local file + result_path = Path("result.json") + result_path.write_text(result_json) + print(f"Wrote result to {result_path}") + + # Upload to presigned URL + upload_url = os.getenv("RESULT_UPLOAD_URL") + if upload_url: + try: + import requests + + # Upload via HTTP PUT to presigned URL + with open(result_path, 'rb') as f: + response = requests.put( + upload_url, + data=f, + headers={'Content-Type': 'application/json'} + ) + + if response.status_code in (200, 201, 204): + print(f"Successfully uploaded result to storage") + else: + print(f"WARNING: Upload failed with status {response.status_code}: {response.text}", file=sys.stderr) + + except Exception as e: + print(f"ERROR: Failed to upload result: {e}", file=sys.stderr) + # Don't fail the job if upload fails + else: + print("WARNING: RESULT_UPLOAD_URL not set, skipping upload", file=sys.stderr) + + # Exit with appropriate code + if result.get("success", False): + print(f"Run {run_id} completed successfully") + sys.exit(0) + else: + print(f"Run {run_id} completed with errors", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main()