mx_http.py 873 B

12345678910111213141516171819202122232425262728
  1. """将带 _mx_meta 的妙想服务结果转换为统一 HTTP 响应"""
  2. from __future__ import annotations
  3. import copy
  4. from typing import Any
  5. from app.utils.mx_quota import MX_QUOTA_HINT
  6. from app.utils.response import error_response, success_response
  7. def mx_result_to_http(result: dict, *, http_error_code: int = 500) -> dict:
  8. """
  9. result 可含 _mx_meta:
  10. from_cache, quota_exhausted, cache_ttl_seconds, channel, hint
  11. """
  12. payload = copy.deepcopy(result)
  13. meta = payload.pop("_mx_meta", None)
  14. if not payload.get("success"):
  15. err = payload.get("error") or "请求失败"
  16. return error_response(code=http_error_code, message=err)
  17. msg = "success"
  18. if isinstance(meta, dict) and meta.get("quota_exhausted"):
  19. msg = meta.get("hint") or MX_QUOTA_HINT
  20. return success_response(data=payload, message=msg, meta=meta)