Extending the Framework¶
Guide for adding new features and customizations to the orchestrator.
Architecture Recap¶
graph TB
subgraph Core["Core Framework"]
Orch["BenchmarkOrchestrator"]
Servers["ServersModule"]
Clients["ClientsModule"]
Factory["JobFactory"]
end
subgraph Services["Service Classes"]
Base["BaseJob"]
Service["Service"]
Client["Client"]
end
subgraph Implementations["Implementations"]
Ollama["OllamaService"]
Redis["RedisService"]
NewSvc["NewService"]
end
Orch --> Servers
Orch --> Clients
Servers --> Factory
Clients --> Factory
Factory --> Base
Base --> Service
Base --> Client
Service --> Ollama
Service --> Redis
Service --> NewSvc
style NewSvc fill:#FFE0B2
Adding a New Service¶
Step 1: Create Service Class¶
# src/services/new_service.py
from typing import List
from .base import Service
from ..base import JobFactory
class NewService(Service):
"""New service implementation"""
def __init__(self, config: dict):
super().__init__(config)
# Extract service-specific config
self.custom_option = config.get('custom_option', 'default')
self.port = config.get('ports', [8080])[0]
def get_service_setup_commands(self) -> List[str]:
"""Setup commands run before container starts"""
commands = super().get_service_setup_commands()
commands.extend([
"",
"# NewService setup",
f"export NEW_SERVICE_PORT={self.port}",
f"export CUSTOM_OPTION={self.custom_option}",
"mkdir -p $HOME/new_service/data",
"mkdir -p $HOME/new_service/logs",
])
return commands
def get_container_command(self) -> str:
"""Container execution command"""
container_path = self._resolve_container_path()
cmd_parts = [
"apptainer exec",
"--bind $HOME/new_service/data:/data",
"--bind $HOME/new_service/logs:/logs",
]
# Add GPU support if needed
if self.resources.get('gres'):
cmd_parts.insert(1, "--nv")
cmd_parts.extend([
container_path,
f"new_service_binary --port {self.port}",
"&" # Run in background
])
return " \\\n ".join(cmd_parts)
def get_health_check_commands(self) -> List[str]:
"""Health check after container starts"""
return [
"",
"# Wait for NewService to start",
"sleep 10",
f"curl -s http://localhost:{self.port}/health || echo 'Health check pending'",
"",
f"echo 'NewService running on port {self.port}'",
]
# Register with JobFactory
JobFactory.register_service('new_service', NewService)
Step 2: Register in __init__.py¶
# src/services/__init__.py
from .new_service import NewService
__all__ = [
# ... existing services
'NewService',
]
Step 3: Create Recipe¶
# recipes/services/new_service.yaml
service:
name: new_service
description: "My new service"
container:
docker_source: docker://myimage:latest
image_path: $HOME/containers/new_service.sif
custom_option: "production"
resources:
cpus_per_task: 4
mem: "8G"
time: "02:00:00"
partition: cpu
ports:
- 8080
Step 4: Test¶
Adding a New Client¶
Step 1: Create Client Class¶
# src/services/new_client.py
from typing import List
from .base import Client
from ..base import JobFactory
class NewBenchmarkClient(Client):
"""Benchmark client for NewService"""
def __init__(self, config: dict):
super().__init__(config)
params = config.get('parameters', {})
self.num_requests = params.get('num_requests', 1000)
self.concurrent = params.get('concurrent', 10)
self.output_file = params.get('output_file', '$HOME/results/new_benchmark.json')
def get_benchmark_commands(self) -> List[str]:
"""Commands to run the benchmark"""
return [
"",
"# NewService Benchmark",
f"echo 'Running benchmark against {self.target_endpoint}'",
"",
f"python3 benchmark_scripts/new_benchmark.py \\",
f" --endpoint {self.target_endpoint} \\",
f" --requests {self.num_requests} \\",
f" --concurrent {self.concurrent} \\",
f" --output {self.output_file}",
"",
f"echo 'Results saved to {self.output_file}'",
]
# Register with JobFactory
JobFactory.register_client('new_benchmark', NewBenchmarkClient)
Step 2: Create Benchmark Script¶
# benchmark_scripts/new_benchmark.py
import argparse
import json
import time
import requests
from concurrent.futures import ThreadPoolExecutor
def run_request(endpoint):
start = time.time()
try:
response = requests.get(f"{endpoint}/api/test")
return {
'success': response.status_code == 200,
'latency': time.time() - start,
'status': response.status_code
}
except Exception as e:
return {'success': False, 'latency': time.time() - start, 'error': str(e)}
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--endpoint', required=True)
parser.add_argument('--requests', type=int, default=1000)
parser.add_argument('--concurrent', type=int, default=10)
parser.add_argument('--output', required=True)
args = parser.parse_args()
results = []
with ThreadPoolExecutor(max_workers=args.concurrent) as executor:
futures = [executor.submit(run_request, args.endpoint)
for _ in range(args.requests)]
results = [f.result() for f in futures]
# Calculate statistics
latencies = [r['latency'] for r in results if r['success']]
output = {
'endpoint': args.endpoint,
'total_requests': args.requests,
'successful': sum(1 for r in results if r['success']),
'failed': sum(1 for r in results if not r['success']),
'latency_avg': sum(latencies) / len(latencies) if latencies else 0,
'latency_p99': sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
with open(args.output, 'w') as f:
json.dump(output, f, indent=2)
print(json.dumps(output, indent=2))
if __name__ == '__main__':
main()
Step 3: Create Client Recipe¶
# recipes/clients/new_benchmark.yaml
client:
name: new_benchmark
type: new_benchmark
parameters:
num_requests: 1000
concurrent: 10
output_file: "$HOME/results/new_benchmark.json"
resources:
cpus_per_task: 2
mem: "4G"
time: "00:30:00"
partition: cpu
Customizing SLURM Script Generation¶
Override generate_slurm_script() for full control:
class CustomService(Service):
def generate_slurm_script(self) -> str:
"""Fully custom SLURM script"""
script = [
"#!/bin/bash",
f"#SBATCH --job-name={self.job_id}",
f"#SBATCH --account={self.account}",
# ... more SBATCH directives
"",
"# Custom script content",
"module load MyModule",
"",
"# Run service",
"my_custom_command",
]
return '\n'.join(script)
Testing New Components¶
# Test service class instantiation
python -c "
from src.services.new_service import NewService
config = {'name': 'test', 'container': {'image_path': 'test.sif'}}
svc = NewService(config)
print(svc.get_setup_commands())
"
# Test recipe loading
python main.py --verbose --recipe recipes/services/new_service.yaml
# Check generated script
cat scripts/service_new_service_*.sh
Next: Adding New Services