Extensibility & Parallelism¶
This page is a complete implementation guide for extending Coleman while keeping its native orchestration model.
What You Can Extend¶
| Component | Typical goal | Current support |
|---|---|---|
| Policy | New action-selection strategy | Fully supported (source extension + config) |
| Reward | New credit assignment signal | Fully supported (source extension + config) |
| Hooks | Domain lifecycle logic and custom artifacts | Fully supported via hooks.plugins |
| Extensions | Namespaced custom domain config | Fully supported via extensions |
| EvaluationMetric | Alternate quality/cost metric logic | Supported via source extension (runner wiring) |
| Environment | Alternate orchestration/runtime behavior | Supported via source extension (runner wiring) |
Public Runner Extension API¶
Coleman now provides a public extension layer that keeps orchestration in core while allowing custom environment and metric wiring.
from coleman.api import run_with_extension
from coleman.runner import RunnerExtension
extension = RunnerExtension(
build_environment_fn=my_build_environment,
build_metric_fn=my_build_metric, # optional
post_execution_fn=my_post_execution_hook, # optional
)
result = run_with_extension(spec, extension)
Concrete example entrypoint:
Core-owned responsibilities remain unchanged:
- execution plan generation
- process-pool orchestration
- hook event ordering
- run_id and folder layout
Extension-owned responsibilities:
- environment instantiation (
build_environment_fn) - optional metric substitution (
build_metric_fn) - optional per-execution post-processing (
post_execution_fn)
Parallelism Model (Complete)¶
Coleman has two independent parallel layers:
- Intra-run process parallelism
- Controlled by
execution.parallel_pool_size. -
Executes one run's
independent_executionsin worker processes. -
Inter-spec sweep parallelism
- Controlled by
coleman sweep --workersor APIrun_many(..., max_workers=...). - Executes multiple specs concurrently.
Operational contract¶
- Coordinator process events
on_run_start,on_dataset_start,on_dataset_end,on_run_end.- Worker process events
on_execution_start,on_execution_end.- Error event
on_errorcan be triggered in both contexts.- Profiling safety
force_sequential_under_scalene: trueforces sequential worker mode under Scalene.
Namespaced Custom Config (extensions)¶
Use extensions to pass domain-specific settings without relaxing strict RunSpec validation:
extensions:
my_domain:
forecast_selection:
policy: ThompsonSampling
reward: Binary
risk_budget: 0.15
Guidelines:
- Keep keys namespaced (
my_domain,my_team, etc.). - Keep values JSON-serializable.
- Consume values from hook context (
context.extensions).
Hook Plugins (hooks)¶
Register plugins only in configuration:
Supported plugin forms:
- Class plugin
- Instantiated with no arguments.
- Implement any lifecycle methods needed.
- Function plugin
- Signature:
(event_name, context, payload=None).
Hook lifecycle¶
on_run_start(context)on_dataset_start(context)on_execution_start(context)on_execution_end(context, execution_result)on_dataset_end(context, dataset_result)on_run_end(context, run_result)on_error(context, error)
on_error is dispatched for startup failures as well, so hook-based cleanup
and telemetry can observe broken execution setup paths.
Hook context fields¶
run_iddataset_idexecution_idworker_idparallel_modeiterationtrialsbudget_modebudget_valueextensions
Deterministic Hook Artifacts¶
Use ArtifactWriter to create deterministic, collision-safe paths and atomic
JSON/CSV files for hook artifacts.
from coleman.artifacts import ArtifactWriter
writer = ArtifactWriter("./runs")
path = writer.path_for(
run_id=context.run_id or "unknown",
dataset_id=context.dataset_id,
budget_mode=context.budget_mode.value if context.budget_mode else None,
budget_value=context.budget_value,
execution_id=context.execution_id,
artifact_type="forecast",
ext="json",
)
writer.write_json_atomic(path, payload)
Concrete plugin example (class + function hooks):
Default artifact layout:
<runs>/<run_id>/artifacts/<artifact_type>/<budget_segment>/<dataset_id>/<execution_id>/<stem>.<ext>
Public Scalene Compatibility Helper¶
For custom entrypoints, use effective_parallel_pool_size(...) from
coleman.runner to mirror the same Scalene safeguard behavior as the core runner.
Adding a New Policy¶
Current loading contract:
- Runner reads policy names from
experiment.policies. - For each policy
X, runner loads classXPolicyfromcoleman.policymodule exports.
Implementation steps¶
- Create your policy class in
coleman/policy/...extending base behavior from existing policy patterns. - Expose the class in
coleman/policy/__init__.py. - Add algorithm hyperparameters under
algorithm.<policy_name_lower>in YAML. - Add policy name (without suffix) in
experiment.policies. - Add tests for deterministic behavior and credit-assignment updates.
Example config:
Adding a New Reward¶
Current loading contract:
- Runner reads reward names from
experiment.rewards. - For each reward
Y, runner loads classYRewardfromcoleman.rewardmodule exports.
Implementation steps¶
- Create your reward class in
coleman/reward/...inheriting reward base contract. - Expose the class in
coleman/reward/__init__.py. - Add reward name (without suffix) in
experiment.rewards. - Add tests validating
evaluate(...)against expected metric scenarios.
Example config:
Custom EvaluationMetric (Source Extension)¶
Current default wiring uses NAPFDVerdictMetric in runner environment construction.
If you need a custom metric class today:
- Implement metric class in
coleman/evaluation/...extending evaluation base behavior. - Expose it in
coleman/evaluation/__init__.py. - Wire it in runner environment construction where evaluation metric instance is created.
- Add tests for metric math invariants and edge cases (no failures, full failures, partial budget).
Custom Environment (Source Extension)¶
Current default wiring uses Environment in runner environment construction.
If you need a custom environment today:
- Implement class inheriting
AbstractEnvironment. - Preserve compatibility with monitor/checkpoint/telemetry contracts.
- Wire your class in runner environment creation.
- Add integration tests for sequential and parallel execution consistency.
Recommended Strategy for Large Domain Workflows¶
- Prefer
hooks + extensionsfirst. - Keep core runner orchestration untouched whenever possible.
- Use source-level Environment/EvaluationMetric customization only when behavior cannot be expressed via hooks.
- Persist custom artifacts with run/execution identifiers for later joins in analysis.
Official Starter Template¶
An in-repo starter template is available under:
examples/custom-environment-template/minimal/examples/custom-environment-template/advanced/
The advanced track includes hooks, extension-style wiring, and manifest-ready artifact layout for parallel runs.
Quickstart for advanced track:
End-to-End Example (Parallel + Extensions + Hooks)¶
packs:
- execution/default
- experiment/alibaba_druid
- algorithm/defaults
- reward/rnfail
- results/parquet
execution:
independent_executions: 20
parallel_pool_size: 4
force_sequential_under_scalene: true
experiment:
policies: [UCB, MyPolicy]
rewards: [RNFail, MyReward]
algorithm:
ucb:
rnfail:
c: 0.3
mypolicy:
rnfail:
temperature: 0.15
hooks:
fail_fast: false
plugins:
- my_project.hooks.ForecastHook
- my_project.hooks.audit_hook
extensions:
my_domain:
forecast_selection:
policy: ThompsonSampling
reward: Binary
risk_budget: 0.15
CLI sweep with inter-spec parallelism:
coleman sweep --config run.yaml \
--grid execution.seed=range(0,10) \
--grid algorithm.ucb.rnfail.c=0.1,0.3,0.5 \
--workers 4
Testing Checklist for Extensions¶
- Sequential and parallel should produce expected hook side effects.
- Hook errors should obey
fail_fastpolicy. - Custom artifacts should carry
run_idandexecution_id. - New policies/rewards should be covered by deterministic unit tests.
- Any Environment/Metric source customization should include integration tests.