from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import json from typing import Any import xml.etree.ElementTree as ET import requests from requests.auth import HTTPBasicAuth from config.settings import OneCSettings GUID_FIELDS = {"Ref_Key", "ref_key", "ID", "Id", "id"} @dataclass class ODataResponse: url: str status_code: int elapsed_ms: int payload: dict[str, Any] def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _extract_records(payload: dict[str, Any]) -> list[dict[str, Any]]: if "value" in payload and isinstance(payload["value"], list): return payload["value"] d = payload.get("d") if isinstance(d, dict): results = d.get("results") if isinstance(results, list): return results if isinstance(results, dict): return [results] if d: return [d] return [] def extract_entity_sets(metadata_xml: str) -> list[dict[str, str]]: root = ET.fromstring(metadata_xml) entity_sets: list[dict[str, str]] = [] for node in root.iter(): if not node.tag.endswith("EntitySet"): continue name = node.attrib.get("Name", "") entity_type = node.attrib.get("EntityType", "") if not name: continue entity_sets.append( { "name": name, "entity_type": entity_type, "path": name, } ) entity_sets.sort(key=lambda item: item["name"].lower()) return entity_sets def flatten_guid_like_fields(record: dict[str, Any]) -> list[str]: candidates: list[str] = [] for field, value in record.items(): lowered = field.lower() if field in GUID_FIELDS or lowered.endswith("_key") or "ref" in lowered: candidates.append(field) continue if isinstance(value, str) and len(value) == 36 and value.count("-") == 4: candidates.append(field) return sorted(set(candidates)) class ODataClient: def __init__(self, settings: OneCSettings) -> None: self.settings = settings self.session = requests.Session() self.session.headers.update({"Accept": "application/json"}) if settings.username: self.session.auth = HTTPBasicAuth(settings.username, settings.password) def _request( self, method: str, url: str, *, params: dict[str, Any] | None = None, accept: str = "application/json", ) -> requests.Response: headers = {"Accept": accept} response = self.session.request( method=method, url=url, params=params, headers=headers, timeout=self.settings.timeout, verify=self.settings.verify_tls, ) response.raise_for_status() return response def fetch_metadata(self) -> str: response = self._request("GET", self.settings.metadata_url, accept="application/xml") return response.text def read_entity_set( self, entity_set: str, *, top: int = 5, extra_params: dict[str, Any] | None = None, ) -> ODataResponse: params: dict[str, Any] = {"$top": top} if extra_params: params.update(extra_params) entity_url = f"{self.settings.service_root}{entity_set}" response = self._request("GET", entity_url, params=params, accept="application/json") elapsed_ms = int(response.elapsed.total_seconds() * 1000) payload = response.json() return ODataResponse( url=response.url, status_code=response.status_code, elapsed_ms=elapsed_ms, payload=payload, ) def read_entity_set_records( self, entity_set: str, *, top: int = 5, extra_params: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: response = self.read_entity_set(entity_set, top=top, extra_params=extra_params) return _extract_records(response.payload) @staticmethod def to_json(data: dict[str, Any]) -> str: return json.dumps(data, ensure_ascii=False, indent=2)