NODEDC_1C/canonical_layer/features.py

455 lines
18 KiB
Python

from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
import json
import math
import re
from typing import Any
from canonical_layer.store import CanonicalStore
from config.settings import OneCSettings, load_settings
ACCOUNT_TOKEN_RE = re.compile(r"\b\d{2}(?:\.\d{2})?\b")
@dataclass
class FeatureEngineResult:
run_id: str
status: str
baseline_window_hours: int
stale_refresh_threshold_hours: int
entities_total: int
metrics_written: int
anomalies_written: int
error_message: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"status": self.status,
"baseline_window_hours": self.baseline_window_hours,
"stale_refresh_threshold_hours": self.stale_refresh_threshold_hours,
"entities_total": self.entities_total,
"metrics_written": self.metrics_written,
"anomalies_written": self.anomalies_written,
"error_message": self.error_message,
}
class FeatureService:
def __init__(self, *, settings: OneCSettings, store: CanonicalStore) -> None:
self.settings = settings
self.store = store
self.store.ensure_created()
@classmethod
def build(cls) -> "FeatureService":
settings = load_settings()
store = CanonicalStore(settings.canonical_db_url)
return cls(settings=settings, store=store)
@staticmethod
def _stddev(values: list[int]) -> float:
if len(values) <= 1:
return 0.0
mean = sum(values) / len(values)
variance = sum((value - mean) ** 2 for value in values) / len(values)
return math.sqrt(variance)
def _latest_previous_successful_run_id(self, *, current_run_id: str) -> str | None:
runs = self.store.list_recent_feature_runs(limit=50)
for run in runs:
if run.get("run_id") == current_run_id:
continue
if run.get("status") == "success":
return str(run.get("run_id"))
return None
def run_feature_engine(
self,
*,
baseline_window_hours: int | None = None,
stale_refresh_threshold_hours: int | None = None,
top_account_tokens: int = 20,
entity_limit: int | None = None,
) -> FeatureEngineResult:
baseline_hours = baseline_window_hours or self.settings.feature_default_baseline_window_hours
stale_hours = stale_refresh_threshold_hours or self.settings.anomaly_stale_refresh_threshold_hours
scan_limit = entity_limit or self.settings.feature_entity_scan_limit
run_id = self.store.start_feature_run(baseline_window_hours=baseline_hours)
now = datetime.now(timezone.utc)
try:
entities = self.store.iter_entities_for_features(limit=scan_limit)
link_counts = self.store.link_counts_by_source()
per_set_count: dict[str, int] = defaultdict(int)
per_set_empty_display: dict[str, int] = defaultdict(int)
per_set_link_sum: dict[str, int] = defaultdict(int)
account_token_count: dict[str, int] = defaultdict(int)
entity_link_items: list[dict[str, Any]] = []
metrics: list[dict[str, Any]] = []
anomalies: list[dict[str, Any]] = []
for entity in entities:
source_entity = str(entity.get("source_entity", "unknown"))
source_id = str(entity.get("source_id", ""))
display_name = str(entity.get("display_name", "")).strip()
attributes = entity.get("attributes", {})
per_set_count[source_entity] += 1
if not display_name:
per_set_empty_display[source_entity] += 1
link_count = int(link_counts.get((source_entity, source_id), 0))
per_set_link_sum[source_entity] += link_count
entity_link_items.append(
{
"source_entity": source_entity,
"source_id": source_id,
"display_name": display_name,
"link_count": link_count,
}
)
searchable_blob = f"{display_name} {source_id} {json.dumps(attributes, ensure_ascii=False)}"
for token in ACCOUNT_TOKEN_RE.findall(searchable_blob):
account_token_count[token] += 1
entities_total = len(entities)
links_total = sum(item["link_count"] for item in entity_link_items)
entity_sets_total = len(per_set_count)
avg_links_per_entity = (links_total / entities_total) if entities_total else 0.0
metrics.append(
{
"metric_key": "canonical_entities_total",
"scope": "global",
"scope_id": "",
"metric_type": "gauge",
"metric_value": float(entities_total),
"attributes": {},
}
)
metrics.append(
{
"metric_key": "canonical_links_total",
"scope": "global",
"scope_id": "",
"metric_type": "gauge",
"metric_value": float(links_total),
"attributes": {},
}
)
metrics.append(
{
"metric_key": "canonical_entity_sets_total",
"scope": "global",
"scope_id": "",
"metric_type": "gauge",
"metric_value": float(entity_sets_total),
"attributes": {},
}
)
metrics.append(
{
"metric_key": "avg_links_per_entity",
"scope": "global",
"scope_id": "",
"metric_type": "gauge",
"metric_value": float(avg_links_per_entity),
"attributes": {},
}
)
if entities_total == 0:
anomalies.append(
{
"signal_type": "no_canonical_data",
"severity": "high",
"scope": "global",
"scope_id": "",
"score": 1.0,
"details": {"reason": "canonical_entities table is empty"},
}
)
for source_entity in sorted(per_set_count):
count = per_set_count[source_entity]
empty_count = per_set_empty_display.get(source_entity, 0)
empty_share = (empty_count / count) if count else 0.0
avg_links_local = (per_set_link_sum.get(source_entity, 0) / count) if count else 0.0
metrics.append(
{
"metric_key": "entity_count",
"scope": "source_entity",
"scope_id": source_entity,
"metric_type": "gauge",
"metric_value": float(count),
"attributes": {},
}
)
metrics.append(
{
"metric_key": "avg_links_per_entity",
"scope": "source_entity",
"scope_id": source_entity,
"metric_type": "gauge",
"metric_value": float(avg_links_local),
"attributes": {},
}
)
metrics.append(
{
"metric_key": "empty_display_share",
"scope": "source_entity",
"scope_id": source_entity,
"metric_type": "ratio",
"metric_value": float(empty_share),
"attributes": {"empty_count": empty_count, "total": count},
}
)
if count >= 50 and empty_share >= 0.2:
anomalies.append(
{
"signal_type": "empty_display_share_high",
"severity": "medium",
"scope": "source_entity",
"scope_id": source_entity,
"score": float(empty_share),
"details": {"empty_count": empty_count, "total": count},
}
)
top_tokens = sorted(account_token_count.items(), key=lambda item: (-item[1], item[0]))[:top_account_tokens]
for token, token_count in top_tokens:
metrics.append(
{
"metric_key": "account_token_frequency",
"scope": "account_token",
"scope_id": token,
"metric_type": "gauge",
"metric_value": float(token_count),
"attributes": {},
}
)
link_values = [item["link_count"] for item in entity_link_items if item["link_count"] > 0]
if link_values:
mean_links = sum(link_values) / len(link_values)
std_links = self._stddev(link_values)
high_link_threshold = max(10, int(mean_links + (3.0 * std_links)))
else:
high_link_threshold = 10
metrics.append(
{
"metric_key": "high_link_threshold",
"scope": "global",
"scope_id": "",
"metric_type": "gauge",
"metric_value": float(high_link_threshold),
"attributes": {},
}
)
suspicious = [
item for item in entity_link_items
if item["link_count"] >= high_link_threshold
]
suspicious.sort(key=lambda item: item["link_count"], reverse=True)
for item in suspicious[:50]:
score = float(item["link_count"]) / float(high_link_threshold) if high_link_threshold else 0.0
severity = "high" if score >= 2.0 else "medium"
anomalies.append(
{
"signal_type": "high_link_degree",
"severity": severity,
"scope": item["source_entity"],
"scope_id": item["source_id"],
"score": score,
"details": {
"link_count": item["link_count"],
"threshold": high_link_threshold,
"display_name": item["display_name"],
},
}
)
latest_refresh = self.store.latest_refresh_finished_at()
if latest_refresh is None:
anomalies.append(
{
"signal_type": "missing_refresh_baseline",
"severity": "high",
"scope": "global",
"scope_id": "",
"score": 1.0,
"details": {"reason": "no successful refresh run found"},
}
)
else:
if latest_refresh.tzinfo is None:
latest_refresh = latest_refresh.replace(tzinfo=timezone.utc)
age_hours = max(0.0, (now - latest_refresh).total_seconds() / 3600.0)
metrics.append(
{
"metric_key": "refresh_age_hours",
"scope": "global",
"scope_id": "",
"metric_type": "gauge",
"metric_value": float(age_hours),
"attributes": {"latest_refresh_finished_at": latest_refresh.isoformat()},
}
)
if age_hours > stale_hours:
anomalies.append(
{
"signal_type": "stale_refresh",
"severity": "high",
"scope": "global",
"scope_id": "",
"score": float(age_hours / stale_hours) if stale_hours else float(age_hours),
"details": {
"age_hours": age_hours,
"threshold_hours": stale_hours,
"latest_refresh_finished_at": latest_refresh.isoformat(),
},
}
)
previous_run_id = self._latest_previous_successful_run_id(current_run_id=run_id)
if previous_run_id:
prev_entity_count_rows = self.store.list_feature_metrics(
limit=5000,
metric_key="entity_count",
scope="source_entity",
run_id=previous_run_id,
)
prev_counts = {
str(row.get("scope_id", "")): float(row.get("metric_value", 0.0))
for row in prev_entity_count_rows
}
for source_entity, current_count in per_set_count.items():
previous_count = prev_counts.get(source_entity)
if previous_count is None or previous_count <= 0:
continue
drift_ratio = (float(current_count) - previous_count) / previous_count
metrics.append(
{
"metric_key": "entity_count_drift_ratio",
"scope": "source_entity",
"scope_id": source_entity,
"metric_type": "ratio",
"metric_value": float(drift_ratio),
"attributes": {
"previous_count": previous_count,
"current_count": current_count,
"previous_run_id": previous_run_id,
},
}
)
if abs(drift_ratio) >= 0.3 and abs(float(current_count) - previous_count) >= 10:
anomalies.append(
{
"signal_type": "entity_count_drift",
"severity": "high" if abs(drift_ratio) >= 1.0 else "medium",
"scope": "source_entity",
"scope_id": source_entity,
"score": float(abs(drift_ratio)),
"details": {
"previous_count": previous_count,
"current_count": current_count,
"drift_ratio": drift_ratio,
"previous_run_id": previous_run_id,
},
}
)
metrics_written, anomalies_written = self.store.replace_feature_results(
run_id=run_id,
metrics=metrics,
anomalies=anomalies,
)
details = {
"entity_sets_total": entity_sets_total,
"top_account_tokens": [{"token": token, "count": count} for token, count in top_tokens],
}
self.store.finish_feature_run(
run_id=run_id,
status="success",
entities_total=entities_total,
metrics_written=metrics_written,
anomalies_written=anomalies_written,
details=details,
)
return FeatureEngineResult(
run_id=run_id,
status="success",
baseline_window_hours=baseline_hours,
stale_refresh_threshold_hours=stale_hours,
entities_total=entities_total,
metrics_written=metrics_written,
anomalies_written=anomalies_written,
)
except Exception as exc:
self.store.finish_feature_run(
run_id=run_id,
status="failed",
entities_total=0,
metrics_written=0,
anomalies_written=0,
details={},
error_message=str(exc),
)
return FeatureEngineResult(
run_id=run_id,
status="failed",
baseline_window_hours=baseline_hours,
stale_refresh_threshold_hours=stale_hours,
entities_total=0,
metrics_written=0,
anomalies_written=0,
error_message=str(exc),
)
def list_recent_runs(self, limit: int = 20) -> list[dict[str, Any]]:
return self.store.list_recent_feature_runs(limit=limit)
def list_metrics(
self,
*,
limit: int = 200,
metric_key: str | None = None,
scope: str | None = None,
run_id: str | None = None,
) -> list[dict[str, Any]]:
return self.store.list_feature_metrics(limit=limit, metric_key=metric_key, scope=scope, run_id=run_id)
def list_anomalies(
self,
*,
limit: int = 200,
severity: str | None = None,
active_only: bool = True,
run_id: str | None = None,
) -> list[dict[str, Any]]:
return self.store.list_anomaly_signals(
limit=limit,
severity=severity,
active_only=active_only,
run_id=run_id,
)
def stats(self) -> dict[str, Any]:
return self.store.feature_store_stats()