Source code for yadisk.sessions.aiohttp_session

# -*- coding: utf-8 -*-
# Copyright © 2024 Ivan Konovalov

# This file is part of a Python library yadisk.

# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Any, Union

from ..exceptions import (
    RequestError, TooManyRedirectsError,
    RequestTimeoutError, YaDiskConnectionError
)

from .._async_session import AsyncSession, AsyncResponse
from .._common import is_async_func
from ..utils import CaseInsensitiveDict
from .._typing_compat import Dict
from ..types import (
    JSON, AsyncConsumeCallback, Headers, TimeoutParameter, HTTPMethod, AsyncPayload
)

from .. import settings

import aiohttp
import sys

__all__ = ["AIOHTTPSession"]


def convert_aiohttp_exception(exc: aiohttp.ClientError) -> Union[RequestError, aiohttp.ClientError]:
    if isinstance(exc, aiohttp.TooManyRedirects):
        return TooManyRedirectsError(str(exc))
    elif isinstance(exc, aiohttp.ServerTimeoutError):
        return RequestTimeoutError(str(exc))
    elif isinstance(exc, aiohttp.ClientConnectionError):
        return YaDiskConnectionError(str(exc))
    elif isinstance(exc, aiohttp.ClientError):
        return RequestError(str(exc))
    else:
        return exc


class AIOHTTPResponse(AsyncResponse):
    def __init__(self, response: aiohttp.ClientResponse):
        super().__init__()

        self._response = response
        self.status = response.status

    async def json(self) -> JSON:
        try:
            return await self._response.json()
        except aiohttp.ContentTypeError as e:
            raise ValueError("Expected Content-Type: application/json, got something else") from e
        except aiohttp.ClientError as e:
            raise convert_aiohttp_exception(e) from e

    async def download(self, consume_callback: AsyncConsumeCallback) -> None:
        callback: Any = consume_callback

        try:
            if is_async_func(consume_callback):
                async for chunk in self._response.content.iter_chunked(8192):
                    await callback(chunk)
            else:
                async for chunk in self._response.content.iter_chunked(8192):
                    callback(chunk)
        except aiohttp.ClientError as e:
            raise convert_aiohttp_exception(e) from e

    async def close(self) -> None:
        await self._response.release()


def convert_timeout(timeout: TimeoutParameter) -> Optional[aiohttp.ClientTimeout]:
    if timeout is ...:
        return convert_timeout(settings.DEFAULT_TIMEOUT)
    elif timeout is None:
        return None
    elif isinstance(timeout, (int, float)):
        return aiohttp.ClientTimeout(sock_connect=timeout, sock_read=timeout)

    connect, read = timeout

    return aiohttp.ClientTimeout(sock_connect=connect, sock_read=read)


DEFAULT_USER_AGENT = "Python/%s.%s aiohttp/%s" % (sys.version_info.major,
                                                  sys.version_info.minor,
                                                  aiohttp.__version__)


[docs] class AIOHTTPSession(AsyncSession): """ .. _aiohttp: https://pypi.org/project/aiohttp :any:`AsyncSession` implementation using the `aiohttp`_ library. All arguments passed in the constructor are directly forwared to :any:`aiohttp.ClientSession`. :ivar aiohttp_session: underlying instance of :any:`aiohttp.ClientSession` To pass `aiohttp`-specific arguments from :any:`AsyncClient` use :code:`aiohttp_args` keyword argument. Usage example: .. code:: python import yadisk async def main(): async with yadisk.AsyncClient(..., session="aiohttp") as client: await client.get_meta( "/my_file.txt", n_retries=5, aiohttp_args={ "proxies": {"https": "http://example.com:1234"}, "verify": False } ) """ def __init__(self, *args, **kwargs) -> None: headers = CaseInsensitiveDict({ "User-Agent": DEFAULT_USER_AGENT, "Accept-Encoding": "gzip, deflate", "Accept": "*/*", "Connection": "keep-alive" }) headers.update(kwargs.get("headers") or {}) kwargs["headers"] = headers self._session = aiohttp.ClientSession(*args, **kwargs) @property def aiohttp_session(self) -> aiohttp.ClientSession: return self._session async def send_request( self, method: HTTPMethod, url: str, *, params: Optional[Dict[str, Any]] = None, data: Optional[AsyncPayload] = None, headers: Optional[Headers] = None, **kwargs ) -> AsyncResponse: converted_kwargs: Dict[str, Any] = { "params": params, "data": data, "headers": headers } if "timeout" in kwargs: converted_kwargs["timeout"] = convert_timeout(kwargs["timeout"]) if "aiohttp_args" in kwargs: converted_kwargs.update(kwargs["aiohttp_args"] or {}) try: return AIOHTTPResponse(await self._session.request(method, url, **converted_kwargs)) except aiohttp.ClientError as e: raise convert_aiohttp_exception(e) from e async def close(self) -> None: await self._session.close()