NODEDC_1C/tests/test_batch_runtime_handoff.py

50 lines
1.7 KiB
Python

from __future__ import annotations
from orchestration.batch_runtime import enqueue_refresh_and_answer_job, run_refresh_and_answer_job
def test_batch_runtime_handoff_executes_feature_and_risk() -> None:
job = enqueue_refresh_and_answer_job(
question_id="Q28",
slice_window="2020-06",
requested_outputs=["feature_store", "risk_store"],
reason=["needs_ranking", "aggregate_not_sufficient"],
)
result = run_refresh_and_answer_job(
job,
feature_executor=lambda: {"run_id": "feature-run-1"},
risk_executor=lambda: {"run_id": "risk-run-1"},
should_refresh=False,
)
payload = result.to_dict()
assert payload["status"] == "success"
assert payload["execution_mode"] == "batch_runtime_executed"
assert payload["run_ids"]["feature_run_id"] == "feature-run-1"
assert payload["run_ids"]["risk_run_id"] == "risk-run-1"
def test_batch_runtime_handoff_can_execute_refresh() -> None:
job = enqueue_refresh_and_answer_job(
question_id="Q30",
slice_window="2020-06",
requested_outputs=["feature_store", "risk_store"],
reason=["needs_full_period_aggregation", "refresh_stale"],
)
result = run_refresh_and_answer_job(
job,
refresh_executor=lambda: {"run_id": "refresh-run-1"},
feature_executor=lambda: {"run_id": "feature-run-2"},
risk_executor=lambda: {"run_id": "risk-run-2"},
should_refresh=True,
)
payload = result.to_dict()
assert payload["status"] == "success"
assert payload["run_ids"]["refresh_run_id"] == "refresh-run-1"
assert payload["run_ids"]["feature_run_id"] == "feature-run-2"
assert payload["run_ids"]["risk_run_id"] == "risk-run-2"