Source code for yadisk.api.api_request

# -*- coding: utf-8 -*-

from ..common import CaseInsensitiveDict

from ..exceptions import InvalidResponseError

from ..utils import auto_retry, async_auto_retry
from .. import settings

from typing import Any, Optional, Union, TypeVar, TYPE_CHECKING
from ..compat import Set, Dict
import json

from ..types import AnySession, JSON, HTTPMethod

if TYPE_CHECKING:
    from ..session import Response
    from ..async_session import AsyncResponse

__all__ = ["APIRequest"]

# For cases when None can't be used
_DEFAULT_TIMEOUT = object()

[docs] class APIRequest(object): """ Base class for all API requests. :param session: an instance of :any:`Session` :param args: `dict` of arguments, that will be passed to `process_args` :param timeout: `float` or `tuple`, request timeout :param headers: `dict` or `None`, additional request headers :param n_retries: `int`, maximum number of retries :param retry_interval: delay between retries in seconds :param kwargs: other arguments for :any:`Session.send_request` :ivar url: `str`, request URL :ivar method: `str`, request method :ivar content_type: `str`, Content-Type header ("application/x-www-form-urlencoded" by default) :ivar timeout: `float` or `tuple`, request timeout :ivar n_retries: `int`, maximum number of retries :ivar success_codes: `list`-like, list of response codes that indicate request's success :ivar retry_interval: `float`, delay between retries in seconds """ url: str = "" method: Optional[HTTPMethod] = None content_type: str = "application/x-www-form-urlencoded" timeout = _DEFAULT_TIMEOUT n_retries: Optional[int] = None success_codes: Set[int] = {200} retry_interval: Optional[Union[int, float]] = None data: Union[Dict, bytes] params: Dict[str, Any] send_kwargs: Dict[str, Any] session: Any response: Optional[Any] T = TypeVar("T") def __init__(self, session: AnySession, args: dict, **kwargs): n_retries = kwargs.pop("n_retries", None) retry_interval = kwargs.pop("retry_interval", None) headers = kwargs.pop("headers", {}) if headers is None: headers = {} try: timeout = kwargs["timeout"] except KeyError: timeout = self.timeout if timeout is _DEFAULT_TIMEOUT: timeout = settings.DEFAULT_TIMEOUT kwargs["timeout"] = timeout if n_retries is None: n_retries = self.n_retries if n_retries is None: n_retries = settings.DEFAULT_N_RETRIES if retry_interval is None: retry_interval = self.retry_interval if retry_interval is None: retry_interval = settings.DEFAULT_RETRY_INTERVAL self.session = session self.args = args self.send_kwargs = kwargs self.timeout = timeout self.n_retries = n_retries self.retry_interval = retry_interval self.headers = headers self.response = None self.data = {} self.content = None self.params = {} self.process_args(**self.args) def process_args(self, **kwargs) -> None: raise NotImplementedError def _prepare_send_args(self) -> Dict[str, Any]: headers = CaseInsensitiveDict() headers["Content-Type"] = self.content_type headers.update(self.headers) if self.data: if isinstance(self.data, Dict): data = json.dumps(self.data).encode("utf8") else: data = self.data else: data = None kwargs = dict(self.send_kwargs) kwargs.update({"headers": headers, "data": data, "params": self.params}) return kwargs def _attempt(self) -> None: assert self.method is not None assert self.url kwargs = self._prepare_send_args() self.response = self.session.send_request(self.method, self.url, **kwargs) success = self.response.status in self.success_codes if not success: raise self.response.get_exception() async def _async_attempt(self) -> None: assert self.method is not None assert self.url kwargs = self._prepare_send_args() self.response = await self.session.send_request(self.method, self.url, **kwargs) success = self.response.status in self.success_codes if not success: raise await self.response.get_exception()
[docs] def send(self) -> "Response": """ Actually send the request :returns: :any:`Response` (`self.response`) """ auto_retry(self._attempt, self.n_retries, self.retry_interval) assert self.response is not None return self.response
[docs] def process_json(self, js: JSON, **kwargs) -> T: """ Process the JSON response. :param js: `dict` or `None`, JSON response :param kwargs: extra arguments (optional) :returns: processed response, can be anything """ raise NotImplementedError
[docs] def process(self, **kwargs) -> T: """ Process the response. :param kwargs: extra arguments (optional) :returns: depends on `self.process_json()` """ assert self.response is not None try: result = self.response.json() except (ValueError, RuntimeError): result = None try: return self.process_json(result, **kwargs) except ValueError as e: raise InvalidResponseError(f"Server returned invalid response: {e}")
[docs] async def asend(self) -> "AsyncResponse": """ Actually send the request :returns: :any:`AsyncResponse` (`self.response`) """ await async_auto_retry(self._async_attempt, self.n_retries, self.retry_interval) assert self.response is not None return self.response
[docs] async def aprocess(self, **kwargs) -> T: """ Process the response. :param kwargs: extra arguments (optional) :returns: depends on `self.process_json()` """ assert self.response is not None try: result = await self.response.json() except (ValueError, RuntimeError): result = None try: return self.process_json(result, **kwargs) except ValueError as e: raise InvalidResponseError(f"Server returned invalid response: {e}")