Source code for yadisk.sessions.pycurl_session

# -*- coding: utf-8 -*-

from io import BytesIO
import json

from ..exceptions import (
    RequestError, RequestTimeoutError,
    TooManyRedirectsError, YaDiskConnectionError
)

from ..session import Session, Response
from ..compat import Iterator
from ..common import CaseInsensitiveDict
from ..types import JSON, ConsumeCallback, Headers, HTTPMethod

from urllib.parse import urlencode

import pycurl

__all__ = ["PycURLSession"]

def convert_curl_error(error: pycurl.error) -> RequestError:
    code, msg = error.args

    mapping = {pycurl.E_TOO_MANY_REDIRECTS:      TooManyRedirectsError,
               pycurl.E_COULDNT_CONNECT:         YaDiskConnectionError,
               pycurl.E_NO_CONNECTION_AVAILABLE: YaDiskConnectionError,
               pycurl.E_OPERATION_TIMEDOUT:      RequestTimeoutError}

    exc = mapping.get(code) or RequestError
    return exc(msg)

class PycurlResponse(Response):
    def __init__(self, curl: pycurl.Curl, response: bytes):
        self._curl = curl
        self._response = response

    @property
    def status(self) -> int:
        return self._curl.getinfo(pycurl.RESPONSE_CODE)

    def json(self) -> JSON:
        if not self.status:
            try:
                self._response = self._curl.perform_rb()
            except pycurl.error as e:
                raise convert_curl_error(e)

        return json.loads(self._response)

    def download(self, consume_callback: ConsumeCallback) -> None:
        def write_cb(chunk: bytes) -> int:
            consume_callback(chunk)
            return len(chunk)

        self._curl.setopt(pycurl.WRITEFUNCTION, write_cb)

        try:
            self._curl.perform()
        except pycurl.error as e:
            raise convert_curl_error(e)

    def close(self) -> None:
        self._curl.close()

class IterableReader:
    def __init__(self, iterator: Iterator[bytes]):
        self.iterator = iterator
        self._current_chunk = b""
        self._position_in_chunk = 0

    def read(self, size=-1) -> bytes:
        if size < 0:
            return self.readall()

        data = b""
        while len(data) < size:
            if self._position_in_chunk >= len(self._current_chunk):
                try:
                    self._current_chunk = next(self.iterator)
                except StopIteration:
                    return data

                self._position_in_chunk = 0

            remaining = size - len(data)

            chunk_fragment = self._current_chunk[self._position_in_chunk:self._position_in_chunk + remaining]

            data += chunk_fragment
            self._position_in_chunk += len(chunk_fragment)

        return data

    def readall(self) -> bytes:
        data = b""

        while True:
            if self._position_in_chunk >= len(self._current_chunk):
                try:
                    self._current_chunk = next(self.iterator)
                except StopIteration:
                    return data

                self._position_in_chunk = 0

            chunk_fragment = self._current_chunk[self._position_in_chunk:]

            data += chunk_fragment
            self._position_in_chunk += len(chunk_fragment)

[docs] class PycURLSession(Session): """ .. _pycurl: https://pypi.org/project/pycurl :any:`Session` implementation using the `pycurl`_ library. To pass `pycurl`-specific arguments from :any:`Client` use :code:`curl_options` keyword argument. Usage example: .. code:: python import yadisk import pycurl with yadisk.Client(..., session="pycurl") as client: client.get_meta( "/my_file.txt", n_retries=5, curl_options={ pycurl.MAX_SEND_SPEED_LARGE: 5 * 1024**2, pycurl.MAX_RECV_SPEED_LARGE: 5 * 1024**2, pycurl.PROXY: "http://localhost:12345", pycurl.MAXREDIRS: 15 } ) """ def __init__(self): self._share = pycurl.CurlShare() self._headers = CaseInsensitiveDict() self._share.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_CONNECT) self._share.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS) self._share.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_SSL_SESSION) def set_headers(self, headers: Headers) -> None: self._headers.update(headers) def send_request(self, method: HTTPMethod, url: str, **kwargs) -> Response: params = kwargs.get("params", {}) data = kwargs.get("data") stream = kwargs.get("stream", False) headers = CaseInsensitiveDict(self._headers) headers.update(kwargs.get("headers", {})) options = kwargs.get("curl_options", {}) if params: url = url + "?" + urlencode(params) curl = pycurl.Curl() curl.setopt(pycurl.NOSIGNAL, True) curl.setopt(pycurl.FOLLOWLOCATION, True) curl.setopt(pycurl.URL, url) curl.setopt(pycurl.SHARE, self._share) if "timeout" in kwargs: timeout = kwargs["timeout"] if isinstance(timeout, tuple): connect_timeout, read_timeout = timeout else: connect_timeout = read_timeout = timeout MAX_TIMEOUT = 4294967 # in seconds if connect_timeout is None: connect_timeout = MAX_TIMEOUT elif connect_timeout <= 0.001: # If connect_timeout gets rounded down to 0, the default connect # timeout would be applied instead by cURL connect_timeout = 0.001 if read_timeout is None: # 0 disables LOW_SPEED_TIME read_timeout = 0 elif read_timeout <= 1.0: # If read_timeout gets rounded down to 0, the low speed time will be disabled # 1 second is the lowest possible timeout read_timeout = 1.0 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(connect_timeout * 1000)) curl.setopt(pycurl.LOW_SPEED_TIME, int(read_timeout)) curl.setopt(pycurl.LOW_SPEED_LIMIT, 64) curl.setopt(pycurl.HTTPHEADER, [f"{k}:{v}" for k, v in headers.items() if k and v]) for option, value in options.items(): curl.setopt(option, value) if isinstance(data, bytes): data = BytesIO(data) uploading_file = False if data is not None: curl.setopt(pycurl.UPLOAD, True) uploading_file = True if isinstance(data, Iterator): data = IterableReader(data) curl.setopt(pycurl.READDATA, data) curl.setopt(pycurl.CUSTOMREQUEST, method) if not stream or uploading_file: try: response = curl.perform_rb() except pycurl.error as e: raise convert_curl_error(e) else: response = b"" return PycurlResponse(curl, response) def close(self): self._share.close()