Source code for yadisk.sessions.pycurl_session

# -*- coding: utf-8 -*-
# Copyright © 2024 Ivan Konovalov

# This file is part of a Python library yadisk.

# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, see <http://www.gnu.org/licenses/>.

from io import BytesIO
import json
from typing import Any, Optional

from ..exceptions import (
    RequestError, RequestTimeoutError,
    TooManyRedirectsError, YaDiskConnectionError
)

from .._session import Session, Response
from .._typing_compat import Iterator, Tuple, Dict
from ..utils import CaseInsensitiveDict
from ..types import JSON, ConsumeCallback, HTTPMethod, Headers, Payload, TimeoutParameter
from .. import settings

from urllib.parse import urlencode

import pycurl

__all__ = ["PycURLSession"]


def convert_curl_error(error: pycurl.error) -> RequestError:
    code, msg = error.args

    mapping = {pycurl.E_TOO_MANY_REDIRECTS:      TooManyRedirectsError,
               pycurl.E_COULDNT_CONNECT:         YaDiskConnectionError,
               pycurl.E_NO_CONNECTION_AVAILABLE: YaDiskConnectionError,
               pycurl.E_OPERATION_TIMEDOUT:      RequestTimeoutError}

    exc = mapping.get(code) or RequestError
    return exc(msg)


# see PycurlResponse.download() implementation
MAX_RESPONSE_BUFFER_SIZE = 128 * 1024


class PycURLResponse(Response):
    def __init__(self, curl: pycurl.Curl, response: bytes):
        super().__init__()

        self._curl = curl
        self._response = response

        self._update_status()

    def _update_status(self) -> None:
        self.status = self._curl.getinfo(pycurl.RESPONSE_CODE)

    def _perform(self) -> None:
        try:
            self._curl.perform()
        except pycurl.error as e:
            raise convert_curl_error(e) from e

        self._update_status()

    def _perform_rb(self) -> None:
        try:
            self._response = self._curl.perform_rb()
        except pycurl.error as e:
            raise convert_curl_error(e) from e

        self._update_status()

    def json(self) -> JSON:
        if not self.status:
            self._perform_rb()

        return json.loads(self._response)

    def download(self, consume_callback: ConsumeCallback) -> None:
        buffer = BytesIO()

        def write_cb(chunk: bytes) -> int:
            # Write up to `MAX_RESPONSE_BUFFER_SIZE` bytes of data into an in-memory buffer
            # This is a hack to detect bad HTTP status codes to give
            # `consume_callback` an opportunity to check status before writing
            if buffer.tell() < MAX_RESPONSE_BUFFER_SIZE:
                buffer.write(chunk)
                return len(chunk)
            elif buffer.tell():
                buffer.seek(0)

                chunk_from_buffer = buffer.read()
                consume_callback(chunk_from_buffer)

                buffer.seek(0)
                buffer.truncate(0)

            consume_callback(chunk)
            return len(chunk)

        self._curl.setopt(pycurl.WRITEFUNCTION, write_cb)

        self._perform()

        # Write left over data from the buffer
        if buffer.tell():
            buffer.seek(0)
            consume_callback(buffer.read())

    def close(self) -> None:
        self._curl.close()


class IterableReader:
    def __init__(self, iterator: Iterator[bytes]):
        self.iterator = iterator
        self._current_chunk = b""
        self._position_in_chunk = 0

    def read(self, size=-1) -> bytes:
        if size < 0:
            return self.readall()

        data = b""
        while len(data) < size:
            if self._position_in_chunk >= len(self._current_chunk):
                try:
                    self._current_chunk = next(self.iterator)
                except StopIteration:
                    return data

                self._position_in_chunk = 0

            remaining = size - len(data)

            chunk_fragment = self._current_chunk[self._position_in_chunk:self._position_in_chunk + remaining]

            data += chunk_fragment
            self._position_in_chunk += len(chunk_fragment)

        return data

    def readall(self) -> bytes:
        data = b""

        while True:
            if self._position_in_chunk >= len(self._current_chunk):
                try:
                    self._current_chunk = next(self.iterator)
                except StopIteration:
                    return data

                self._position_in_chunk = 0

            chunk_fragment = self._current_chunk[self._position_in_chunk:]

            data += chunk_fragment
            self._position_in_chunk += len(chunk_fragment)


def convert_timeout(timeout: TimeoutParameter) -> Tuple[float, float]:
    if timeout is ...:
        return convert_timeout(settings.DEFAULT_TIMEOUT)

    if isinstance(timeout, tuple):
        connect_timeout, read_timeout = timeout
    else:
        connect_timeout = read_timeout = timeout

    MAX_TIMEOUT = 4294967  # in seconds

    if connect_timeout is None:
        connect_timeout = MAX_TIMEOUT
    elif connect_timeout <= 0.001:
        # If connect_timeout gets rounded down to 0, the default connect
        # timeout would be applied instead by cURL
        connect_timeout = 0.001

    if read_timeout is None:
        # 0 disables LOW_SPEED_TIME
        read_timeout = 0
    elif read_timeout <= 1.0:
        # If read_timeout gets rounded down to 0, the low speed time will be disabled
        # 1 second is the lowest possible timeout
        read_timeout = 1.0

    return connect_timeout, read_timeout

[docs] class PycURLSession(Session): """ .. _pycurl: https://pypi.org/project/pycurl :any:`Session` implementation using the `pycurl`_ library. To pass `pycurl`-specific arguments from :any:`Client` use :code:`curl_options` keyword argument. Usage example: .. code:: python import yadisk import pycurl with yadisk.Client(..., session="pycurl") as client: client.get_meta( "/my_file.txt", n_retries=5, curl_options={ pycurl.MAX_SEND_SPEED_LARGE: 5 * 1024**2, pycurl.MAX_RECV_SPEED_LARGE: 5 * 1024**2, pycurl.PROXY: "http://localhost:12345", pycurl.MAXREDIRS: 15 } ) """ def __init__(self) -> None: self._share = pycurl.CurlShare() self._share.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_CONNECT) self._share.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS) self._share.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_SSL_SESSION) def send_request( self, method: HTTPMethod, url: str, *, params: Optional[Dict[str, Any]] = None, data: Optional[Payload] = None, headers: Optional[Headers] = None, stream: bool = False, curl_options: Optional[Dict[int, Any]] = None, **kwargs ) -> Response: curl_headers = CaseInsensitiveDict({"connection": "keep-alive"}) curl_headers.update(headers or {}) if params: url = url + "?" + urlencode(params) curl = pycurl.Curl() curl.setopt(pycurl.NOSIGNAL, True) curl.setopt(pycurl.FOLLOWLOCATION, True) curl.setopt(pycurl.URL, url) curl.setopt(pycurl.SHARE, self._share) if "timeout" in kwargs: connect_timeout, read_timeout = convert_timeout(kwargs["timeout"]) curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(connect_timeout * 1000)) curl.setopt(pycurl.LOW_SPEED_TIME, int(read_timeout)) curl.setopt(pycurl.LOW_SPEED_LIMIT, 64) curl.setopt(pycurl.HTTPHEADER, [f"{k}:{v}" for k, v in curl_headers.items() if k and v]) if curl_options is not None: for option, value in curl_options.items(): curl.setopt(option, value) uploading_file = False if data is not None: curl.setopt(pycurl.UPLOAD, True) uploading_file = True curl_data: Any if isinstance(data, bytes): curl_data = BytesIO(data) # Some requests may silently fail without specifying the exact # payload size. This appears to happen with PatchRequest (PUT # /v1/disk/resources). The server claims to have received an # empty string, but when using the test API gateway (which # forwards all requests using httpx) all data is sent # correctly. Weird. This also doesn't seem to affect file # uploads. curl.setopt(pycurl.INFILESIZE, len(data)) elif hasattr(data, "read"): curl_data = data elif isinstance(data, Iterator): curl_data = IterableReader(data) curl.setopt(pycurl.READDATA, curl_data) curl.setopt(pycurl.CUSTOMREQUEST, method) if not stream or uploading_file: try: response = curl.perform_rb() except pycurl.error as e: raise convert_curl_error(e) from e else: response = b"" return PycURLResponse(curl, response) def close(self) -> None: self._share.close()