NODEDC_1C/scripts/run_refresh.py

90 lines
2.7 KiB
Python

from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from canonical_layer.refresh import REFRESH_MODES, RefreshService
from config.settings import LOGS_DIR
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run canonical refresh into local/remote canonical store",
)
parser.add_argument(
"--mode",
choices=sorted(REFRESH_MODES),
default="incremental",
help="Refresh mode: historical, incremental, targeted",
)
parser.add_argument("--from-date", dest="date_from", default=None, help="ISO date lower bound")
parser.add_argument("--to-date", dest="date_to", default=None, help="ISO date upper bound")
parser.add_argument("--target-id", default=None, help="Entity/document id fragment for targeted mode")
parser.add_argument(
"--limit-per-set",
type=int,
default=None,
help="How many source rows to read per entity set",
)
parser.add_argument(
"--entity-set",
action="append",
dest="entity_sets",
default=None,
help="Specific entity set to include (repeat flag for multiple values)",
)
parser.add_argument(
"--keyword",
action="append",
dest="keywords",
default=None,
help="Entity-set matcher keyword (repeat flag for multiple values)",
)
parser.add_argument(
"--output",
default=str(LOGS_DIR / "refresh_last_run.json"),
help="Where to write run summary json",
)
parser.add_argument(
"--strict",
action="store_true",
help="Exit with code 1 when run status is failed or partial_success",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
service = RefreshService.build()
result = service.run_refresh(
mode=args.mode,
date_from=args.date_from,
date_to=args.date_to,
target_id=args.target_id,
limit_per_set=args.limit_per_set,
requested_entity_sets=args.entity_sets,
entity_keywords=args.keywords,
)
payload = result.to_dict()
payload["store_stats"] = service.store_stats()
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(payload, ensure_ascii=False, indent=2))
if args.strict and result.status != "success":
return 1
return 0
if __name__ == "__main__":
sys.exit(main())