Source code for mirror_api.reflected_response
from json import JSONDecodeError
from typing import Dict, Optional, Union
from fastapi import Request
from pydantic import BaseModel, Field
[docs]class ReflectedResponse(BaseModel):
method: str
url: str
headers: Union[Dict[str, str], None]
cookies: Union[Dict[str, str], None]
json_: Union[dict, list, None] = Field(alias="json")
body: str
form: Union[Dict[str, str], None]
api_path: str
[docs] @classmethod
async def from_request(cls, request: Request, api_path: str) -> "ReflectedResponse":
body_bytes = await request.body()
return cls(
method=request.method,
url=str(request.url),
headers=dict(request.headers) if request.headers else None,
cookies=dict(request.cookies) if request.cookies else None,
json=await _get_json_from_request(request),
body=body_bytes.decode(),
form=await _get_form_from_request(request),
api_path=api_path,
)
async def _get_json_from_request(request: Request) -> Optional[dict]:
try:
return await request.json()
except JSONDecodeError:
return None
async def _get_form_from_request(request: Request) -> Optional[dict]:
data = await request.form()
if not data:
return None
return dict(data)