Source code for yadisk.sessions.requests_session

# -*- coding: utf-8 -*-
# Copyright © 2024 Ivan Konovalov

# This file is part of a Python library yadisk.

# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.

from ..exceptions import (
    RequestError, TooManyRedirectsError,
    RequestTimeoutError, YaDiskConnectionError
)

from .._session import Session, Response
from ..utils import CaseInsensitiveDict
from .._typing_compat import Dict
from ..types import JSON, ConsumeCallback, HTTPMethod, Headers, Payload

from typing import Any, Optional, Union

import threading

import requests

__all__ = ["RequestsSession"]


def convert_requests_exception(exc: requests.RequestException) -> Union[RequestError, requests.RequestException]:
    if isinstance(exc, requests.exceptions.TooManyRedirects):
        return TooManyRedirectsError(str(exc))
    elif isinstance(exc, requests.exceptions.Timeout):
        return RequestTimeoutError(str(exc))
    elif isinstance(exc, requests.exceptions.ConnectionError):
        return YaDiskConnectionError(str(exc))
    elif isinstance(exc, requests.RequestException):
        return RequestError(str(exc))
    else:
        return exc


class RequestsResponse(Response):
    def __init__(self, response: requests.Response):
        super().__init__()

        self._response = response
        self.status = self._response.status_code

    def json(self) -> JSON:
        try:
            return self._response.json()
        except RuntimeError as e:
            raise ValueError(f"Could not parse JSON: {e}") from e

    def download(self, consume_callback: ConsumeCallback) -> None:
        try:
            for chunk in self._response.iter_content(8192):
                consume_callback(chunk)
        except requests.RequestException as e:
            raise convert_requests_exception(e) from e

    def close(self) -> None:
        self._response.close()


[docs] class RequestsSession(Session): """ .. _requests: https://pypi.org/project/requests :any:`Session` implementation using the `requests`_ library. All arguments passed in the constructor are directly forwared to :any:`requests.Session`. :ivar requests_session: underlying instance of :any:`requests.Session` .. note:: Internally, this class creates thread-local instances of :any:`requests.Session`, since it is not currently guaranteed to be thread safe. Calling :any:`Session.close()` will close all thread-local sessions managed by this object. To pass `requests`-specific arguments from :any:`Client` use :code:`requests_args` keyword argument. Usage example: .. code:: python import yadisk with yadisk.Client(..., session="requests") as client: client.get_meta( "/my_file.txt", n_retries=5, requests_args={ "proxies": {"https": "http://example.com:1234"}, "verify": False } ) """ def __init__(self, *args, **kwargs): self._args, self._kwargs = args, kwargs self._local = threading.local() self._sessions = [] @property def requests_session(self) -> requests.Session: if not hasattr(self._local, "session"): self._local.session = requests.Session(*self._args, **self._kwargs) self._sessions.append(self._local.session) return self._local.session def _close_local(self) -> None: if not hasattr(self._local, "session"): return session = self._local.session session.close() self._sessions.remove(session) def send_request( self, method: HTTPMethod, url: str, *, params: Optional[Dict[str, Any]] = None, data: Optional[Payload] = None, headers: Optional[Headers] = None, stream: bool = False, **kwargs ) -> Response: requests_headers = CaseInsensitiveDict(self.requests_session.headers) requests_headers.update(headers or {}) converted_kwargs: Dict[str, Any] = { "params": params, "data": data, "headers": requests_headers, "stream": stream } if "timeout" in kwargs: converted_kwargs["timeout"] = kwargs["timeout"] if "requests_args" in kwargs: converted_kwargs.update(kwargs["requests_args"] or {}) try: return RequestsResponse( self.requests_session.request(method, url, **converted_kwargs) ) except requests.exceptions.RequestException as e: raise convert_requests_exception(e) from e def close(self) -> None: while self._sessions: session = self._sessions.pop() session.close()