399 lines
15 KiB
Python
399 lines
15 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
from collections import Counter
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
import sys
|
||
from typing import Any
|
||
|
||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||
if str(PROJECT_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(PROJECT_ROOT))
|
||
|
||
from canonical_layer.mappers import canonical_relation_rule_catalog
|
||
|
||
|
||
SNAPSHOT_PATH = PROJECT_ROOT / "logs" / "pre_report_snapshot_2020_2020-06_semantic_v2.json"
|
||
if not SNAPSHOT_PATH.exists():
|
||
SNAPSHOT_PATH = PROJECT_ROOT / "logs" / "pre_report_snapshot_2020_2020-06.json"
|
||
OUTPUT_DIR = PROJECT_ROOT / "docs" / "ARCH" / "2020экспорт"
|
||
|
||
|
||
CANONICAL_CLASSES = [
|
||
"CanonicalEntity",
|
||
"Organization",
|
||
"Counterparty",
|
||
"Contract",
|
||
"Account",
|
||
"Subconto",
|
||
"ResponsiblePerson",
|
||
"Currency",
|
||
"Warehouse",
|
||
"CashflowArticle",
|
||
"Department",
|
||
"Individual",
|
||
"Item",
|
||
"BankAccount",
|
||
"Document",
|
||
"InvoiceDocument",
|
||
"Posting",
|
||
"RegisterMovement",
|
||
"RegisterRecord",
|
||
"Period",
|
||
]
|
||
|
||
|
||
def load_snapshot(path: Path) -> dict[str, Any]:
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
|
||
|
||
def low(value: Any) -> str:
|
||
return str(value or "").strip().lower()
|
||
|
||
|
||
def has_any_token(text: str, tokens: list[str]) -> bool:
|
||
lowered = low(text)
|
||
return any(token in lowered for token in tokens)
|
||
|
||
|
||
def short_record(record: dict[str, Any], *, include_links: bool = True) -> dict[str, Any]:
|
||
result = {
|
||
"source_entity": record.get("source_entity"),
|
||
"source_id": record.get("source_id"),
|
||
"display_name": record.get("display_name"),
|
||
"attributes": record.get("attributes", {}),
|
||
}
|
||
if include_links:
|
||
result["links"] = record.get("links", [])
|
||
return result
|
||
|
||
|
||
def to_md_table(headers: list[str], rows: list[list[Any]]) -> str:
|
||
lines = [
|
||
"| " + " | ".join(headers) + " |",
|
||
"| " + " | ".join("---" for _ in headers) + " |",
|
||
]
|
||
for row in rows:
|
||
lines.append("| " + " | ".join(str(cell) for cell in row) + " |")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def classify_entity_set(entity_set: str) -> str:
|
||
lowered = low(entity_set)
|
||
if "счетфактур" in lowered or "invoice" in lowered:
|
||
return "InvoiceDocument"
|
||
if "документ" in lowered or "document" in lowered:
|
||
return "Document"
|
||
if "контраг" in lowered or "counterparty" in lowered:
|
||
return "Counterparty"
|
||
if "договор" in lowered or "contract" in lowered:
|
||
return "Contract"
|
||
if "банковск" in lowered and "счет" in lowered:
|
||
return "BankAccount"
|
||
if "валют" in lowered or "currency" in lowered:
|
||
return "Currency"
|
||
if "склад" in lowered or "warehouse" in lowered:
|
||
return "Warehouse"
|
||
if "подраздел" in lowered or "department" in lowered:
|
||
return "Department"
|
||
if "физлиц" in lowered or "individual" in lowered:
|
||
return "Individual"
|
||
if "номенклатур" in lowered or "item" in lowered or "product" in lowered:
|
||
return "Item"
|
||
if "ответствен" in lowered or "employee" in lowered or "user" in lowered:
|
||
return "ResponsiblePerson"
|
||
if "статьядвиженияденежныхсредств" in lowered or "cashflow" in lowered:
|
||
return "CashflowArticle"
|
||
if "счет" in lowered or "account" in lowered:
|
||
return "Account"
|
||
if "субконто" in lowered or "subconto" in lowered:
|
||
return "Subconto"
|
||
if "движ" in lowered or "movement" in lowered:
|
||
return "RegisterMovement"
|
||
if "провод" in lowered or "posting" in lowered:
|
||
return "Posting"
|
||
if "регистр" in lowered or "register" in lowered:
|
||
return "RegisterRecord"
|
||
if "период" in lowered or "period" in lowered:
|
||
return "Period"
|
||
if "организ" in lowered or "organization" in lowered:
|
||
return "Organization"
|
||
return "CanonicalEntity"
|
||
|
||
|
||
def build_problem_fragment(items: list[dict[str, Any]], *, limit: int = 80) -> list[dict[str, Any]]:
|
||
problems: list[dict[str, Any]] = []
|
||
for row in items:
|
||
attrs = row.get("attributes", {})
|
||
links = row.get("links", [])
|
||
source_id = low(row.get("source_id"))
|
||
unknown_links = [link for link in links if low(link.get("target_entity")) in {"unknown", ""}]
|
||
flags: list[str] = []
|
||
if source_id in {"unknown", "", "none", "null"}:
|
||
flags.append("source_id_unknown")
|
||
if unknown_links:
|
||
flags.append("unknown_link_targets")
|
||
if isinstance(attrs, dict):
|
||
if any(low(v) == "00000000-0000-0000-0000-000000000000" for v in attrs.values()):
|
||
flags.append("zero_guid_present")
|
||
if any(k.endswith("@navigationLinkUrl") for k in attrs):
|
||
flags.append("navigation_links_present")
|
||
if flags:
|
||
problems.append(
|
||
{
|
||
"problem_flags": flags,
|
||
"unknown_link_count": len(unknown_links),
|
||
**short_record(row, include_links=True),
|
||
}
|
||
)
|
||
return problems[:limit]
|
||
|
||
|
||
def filter_samples(items: list[dict[str, Any]], predicate) -> list[dict[str, Any]]:
|
||
return [short_record(row, include_links=True) for row in items if predicate(row)]
|
||
|
||
|
||
def write_json(path: Path, payload: Any) -> None:
|
||
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
|
||
def write_text(path: Path, text: str) -> None:
|
||
path.write_text(text, encoding="utf-8")
|
||
|
||
|
||
def main() -> int:
|
||
snapshot = load_snapshot(SNAPSHOT_PATH)
|
||
items: list[dict[str, Any]] = snapshot.get("items", [])
|
||
records_per_set: dict[str, int] = snapshot.get("records_per_entity_set", {})
|
||
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
entity_set_classification = {
|
||
entity_set: classify_entity_set(entity_set) for entity_set in sorted(records_per_set.keys())
|
||
}
|
||
class_distribution = Counter(entity_set_classification.values())
|
||
|
||
link_target_distribution: Counter[str] = Counter()
|
||
relation_distribution: Counter[str] = Counter()
|
||
unknown_relations = 0
|
||
total_relations = 0
|
||
unknown_source_ids = 0
|
||
for row in items:
|
||
if low(row.get("source_id")) in {"unknown", "", "none", "null"}:
|
||
unknown_source_ids += 1
|
||
for link in row.get("links", []):
|
||
target_entity = str(link.get("target_entity", "Unknown"))
|
||
relation = str(link.get("relation", "reference"))
|
||
link_target_distribution[target_entity] += 1
|
||
relation_distribution[relation] += 1
|
||
total_relations += 1
|
||
if low(target_entity) == "unknown":
|
||
unknown_relations += 1
|
||
|
||
ontology_md = f"""# Текущая онтология / mapping-слой
|
||
|
||
Дата экспорта: {datetime.now(timezone.utc).isoformat()}
|
||
Источник snapshot: `{SNAPSHOT_PATH}`
|
||
|
||
## Что считается сущностями сейчас
|
||
|
||
Базовая модель (canonical classes):
|
||
{chr(10).join(f"- `{name}`" for name in CANONICAL_CLASSES)}
|
||
|
||
## Срез июня 2020: покрытие сущностей
|
||
|
||
- Отобранный период: `{snapshot.get("selected_window_key")}`
|
||
- Диапазон: `{snapshot.get("selected_window_start")} -> {snapshot.get("selected_window_end_exclusive")}`
|
||
- Записей в slice: `{snapshot.get("records_exported_total")}`
|
||
- Связей в slice: `{snapshot.get("links_exported_total")}`
|
||
- Entity sets: `{len(records_per_set)}`
|
||
- Записей с `source_id=unknown`: `{unknown_source_ids}`
|
||
|
||
### Распределение entity sets по canonical-классам
|
||
|
||
{to_md_table(["Canonical class", "Entity set count"], [[k, v] for k, v in sorted(class_distribution.items())])}
|
||
|
||
### Топ target_entity в links
|
||
|
||
{to_md_table(["target_entity", "count"], [[k, v] for k, v in link_target_distribution.most_common(15)])}
|
||
|
||
### Топ relation в links
|
||
|
||
{to_md_table(["relation", "count"], [[k, v] for k, v in relation_distribution.most_common(20)])}
|
||
|
||
### Качество типизации связей
|
||
|
||
- Всего связей: `{total_relations}`
|
||
- Связей с `target_entity=Unknown`: `{unknown_relations}`
|
||
- Доля unknown: `{round((unknown_relations / total_relations * 100.0), 2) if total_relations else 0.0}%`
|
||
"""
|
||
write_text(OUTPUT_DIR / "01_ontology_mapping_layer.md", ontology_md)
|
||
|
||
relation_rows = [
|
||
[row["context"], row["role"], row["relation"]] for row in canonical_relation_rule_catalog()
|
||
]
|
||
relation_rules_md = f"""# Текущие canonical relation rules
|
||
|
||
Источник: `canonical_layer/mappers.py`
|
||
|
||
## Текущий каталог semantic relations
|
||
|
||
{to_md_table(["Context", "Field role", "Relation"], relation_rows)}
|
||
|
||
## Базовые правила извлечения ссылок
|
||
|
||
1. Поле попадает в link, если это `_Key`, `*ref`, GUID или semantic-поле (например `Recorder`, `СчетФактура`).
|
||
2. `*_Type` используется как приоритетная подсказка типа target-сущности.
|
||
3. Нулевые GUID (`00000000-...`) отфильтровываются из canonical links.
|
||
4. Если `source_id` отсутствует, строится составной `cmp:<sha1>` ключ.
|
||
"""
|
||
write_text(OUTPUT_DIR / "02_canonical_relation_rules.md", relation_rules_md)
|
||
|
||
problem_fragment = build_problem_fragment(items, limit=80)
|
||
write_json(
|
||
OUTPUT_DIR / "03_snapshot_fragment_problem_cases.json",
|
||
{
|
||
"slice_window_key": snapshot.get("selected_window_key"),
|
||
"notes": [
|
||
"Фрагмент отобран по признакам: unknown source_id, unknown link targets, zero GUID, navigationLink присутствует.",
|
||
"Это не весь snapshot, а проблемный срез для диагностики.",
|
||
],
|
||
"records_total": len(problem_fragment),
|
||
"records": problem_fragment,
|
||
},
|
||
)
|
||
|
||
write_json(
|
||
OUTPUT_DIR / "04_samples_SpisanieSRaschetnogoScheta.json",
|
||
{
|
||
"selector": "source_entity contains 'СписаниеСРасчетногоСчета' OR latin fallback",
|
||
"records": filter_samples(
|
||
items,
|
||
lambda row: has_any_token(row.get("source_entity", ""), ["списаниесрасчетногосчета", "spisanie"]),
|
||
)[:40],
|
||
},
|
||
)
|
||
|
||
write_json(
|
||
OUTPUT_DIR / "05_samples_RealizaciyaTovarovUslug.json",
|
||
{
|
||
"selector": "source_entity contains 'РеализацияТоваровУслуг' OR latin fallback",
|
||
"records": filter_samples(
|
||
items,
|
||
lambda row: has_any_token(row.get("source_entity", ""), ["реализациятоваровуслуг", "realiz"]),
|
||
)[:40],
|
||
},
|
||
)
|
||
|
||
write_json(
|
||
OUTPUT_DIR / "06_samples_PostuplenieTovarovUslug.json",
|
||
{
|
||
"selector": "source_entity contains 'ПоступлениеТоваровУслуг' OR latin fallback",
|
||
"records": filter_samples(
|
||
items,
|
||
lambda row: has_any_token(row.get("source_entity", ""), ["поступлениетоваровуслуг", "postupl"]),
|
||
)[:40],
|
||
},
|
||
)
|
||
|
||
write_json(
|
||
OUTPUT_DIR / "07_samples_DocumentJournals.json",
|
||
{
|
||
"selector": "source_entity startswith DocumentJournal_",
|
||
"records": filter_samples(
|
||
items,
|
||
lambda row: str(row.get("source_entity", "")).startswith("DocumentJournal_"),
|
||
)[:80],
|
||
},
|
||
)
|
||
|
||
write_json(
|
||
OUTPUT_DIR / "08_samples_NDS_registers.json",
|
||
{
|
||
"selector": "source_entity startswith AccumulationRegister_ and contains НДС",
|
||
"records": filter_samples(
|
||
items,
|
||
lambda row: str(row.get("source_entity", "")).startswith("AccumulationRegister_")
|
||
and "ндс" in low(row.get("source_entity", "")),
|
||
)[:80],
|
||
},
|
||
)
|
||
|
||
def key_fields_predicate(row: dict[str, Any]) -> bool:
|
||
attrs = row.get("attributes", {})
|
||
if not isinstance(attrs, dict):
|
||
return False
|
||
keys = {low(key) for key in attrs.keys()}
|
||
tokens = {
|
||
"recorder",
|
||
"ref",
|
||
"ref_key",
|
||
"поставщик_key",
|
||
"покупатель_key",
|
||
"ответственный_key",
|
||
}
|
||
return any(token in keys for token in tokens)
|
||
|
||
key_field_records = filter_samples(items, key_fields_predicate)[:140]
|
||
write_json(
|
||
OUTPUT_DIR / "09_samples_key_fields_Recorder_Ref_Supplier_Buyer_Responsible.json",
|
||
{
|
||
"selector": "records where attributes contain any of Recorder, Ref/Ref_Key, Поставщик_Key, Покупатель_Key, Ответственный_Key",
|
||
"records_total": len(key_field_records),
|
||
"records": key_field_records,
|
||
},
|
||
)
|
||
|
||
key_stats = Counter()
|
||
for row in items:
|
||
attrs = row.get("attributes", {})
|
||
if not isinstance(attrs, dict):
|
||
continue
|
||
for key in attrs.keys():
|
||
lk = low(key)
|
||
if lk in {
|
||
"recorder",
|
||
"ref",
|
||
"ref_key",
|
||
"поставщик_key",
|
||
"покупатель_key",
|
||
"ответственный_key",
|
||
}:
|
||
key_stats[key] += 1
|
||
|
||
manifest_md = f"""# 2020 экспорт: состав выгрузки
|
||
|
||
Папка собрана автоматически для ручного анализа текущего состояния.
|
||
|
||
## Файлы
|
||
|
||
1. `01_ontology_mapping_layer.md` — текущая онтология/мэппинг и метрики среза.
|
||
2. `02_canonical_relation_rules.md` — правила построения canonical relations.
|
||
3. `03_snapshot_fragment_problem_cases.json` — проблемный фрагмент snapshot июня 2020.
|
||
4. `04_samples_SpisanieSRaschetnogoScheta.json` — реальные записи по `СписаниеСРасчетногоСчета`.
|
||
5. `05_samples_RealizaciyaTovarovUslug.json` — реальные записи по `РеализацияТоваровУслуг`.
|
||
6. `06_samples_PostuplenieTovarovUslug.json` — реальные записи по `ПоступлениеТоваровУслуг`.
|
||
7. `07_samples_DocumentJournals.json` — реальные записи по журналам документов.
|
||
8. `08_samples_NDS_registers.json` — реальные записи по НДС-регистрам.
|
||
9. `09_samples_key_fields_Recorder_Ref_Supplier_Buyer_Responsible.json` — записи с ключевыми полями.
|
||
|
||
## Ключевые поля: фактическая встречаемость в snapshot
|
||
|
||
{to_md_table(["field", "count"], [[k, v] for k, v in key_stats.most_common()] or [["(не найдено)", 0]])}
|
||
"""
|
||
write_text(OUTPUT_DIR / "00_manifest.md", manifest_md)
|
||
|
||
summary = {
|
||
"status": "success",
|
||
"output_dir": str(OUTPUT_DIR),
|
||
"snapshot_path": str(SNAPSHOT_PATH),
|
||
"files": sorted(path.name for path in OUTPUT_DIR.iterdir() if path.is_file()),
|
||
}
|
||
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|