Source code for yadisk.utils

# -*- coding: utf-8 -*-

import asyncio
from collections import defaultdict
import time

from .objects import ErrorObject
from .exceptions import *
from . import settings

from typing import Any, Optional, Union, TypeVar

from .compat import Callable, Awaitable
from .types import AnyResponse

__all__ = ["get_exception", "auto_retry", "async_auto_retry"]

EXCEPTION_MAP = {
    400: defaultdict(
        lambda: BadRequestError,
        {
            "FieldValidationError":   FieldValidationError,
            "authorization_pending":  AuthorizationPendingError,
            "invalid_client":         InvalidClientError,
            "invalid_grant":          InvalidGrantError,
            "bad_verification_code":  BadVerificationCodeError,
            "unsupported_token_type": UnsupportedTokenTypeError
        }
    ),
    401: defaultdict(lambda: UnauthorizedError),
    403: defaultdict(lambda: ForbiddenError),
    404: defaultdict(
        lambda: NotFoundError,
        {
            "DiskNotFoundError":          PathNotFoundError,
            "DiskOperationNotFoundError": OperationNotFoundError
        }
    ),
    406: defaultdict(lambda: NotAcceptableError),
    409: defaultdict(
        lambda: ConflictError,
        {
            "DiskPathDoesntExistsError":              ParentNotFoundError,
            "DiskPathPointsToExistentDirectoryError": DirectoryExistsError,
            "DiskResourceAlreadyExistsError":         PathExistsError,
            "MD5DifferError":                         MD5DifferError
        }
    ),
    413: defaultdict(lambda: PayloadTooLargeError),
    415: defaultdict(lambda: UnsupportedMediaError),
    423: defaultdict(
        lambda: LockedError,
        {
            "DiskResourceLockedError":        ResourceIsLockedError,
            "DiskUploadTrafficLimitExceeded": UploadTrafficLimitExceededError
        }
    ),
    429: defaultdict(lambda: TooManyRequestsError),
    500: defaultdict(lambda: InternalServerError),
    502: defaultdict(lambda: BadGatewayError),
    503: defaultdict(lambda: UnavailableError),
    504: defaultdict(lambda: GatewayTimeoutError),
    507: defaultdict(lambda: InsufficientStorageError)
}

[docs] def get_exception(response: AnyResponse, error: Optional[ErrorObject]) -> YaDiskError: """ Get an exception instance based on response, assuming the request has failed. :param response: an instance of :any:`Response` or :any:`AsyncResponse` :param error: an instance of :any:`ErrorObject` or `None` :returns: an exception instance, subclass of :any:`YaDiskError` """ exc_group = EXCEPTION_MAP.get(response.status, None) if exc_group is None: return UnknownYaDiskError("Unknown Yandex.Disk error") if error is not None: msg = error.message or "<empty>" desc = error.description or "<empty>" error_name = error.error or "<empty>" else: msg = "<empty>" desc = "<empty>" error_name = "<empty>" exc = exc_group[error_name] return exc(error_name, "%s (%s / %s)" % (msg, desc, error_name), response)
T = TypeVar("T")
[docs] def auto_retry(func: Callable[[], T], n_retries: Optional[int] = None, retry_interval: Optional[Union[int, float]] = None) -> T: """ Attempt to perform a request with automatic retries. A retry is triggered by :any:`RequestError` or :any:`RetriableYaDiskError`. :param func: function to run, must not require any arguments :param n_retries: `int`, maximum number of retries :param retry_interval: `int` or `float`, delay between retries (in seconds) :returns: return value of func() """ if n_retries is None: n_retries = settings.DEFAULT_N_RETRIES if retry_interval is None: retry_interval = settings.DEFAULT_RETRY_INTERVAL for i in range(n_retries + 1): try: return func() except (RequestError, RetriableYaDiskError) as e: if i == n_retries: raise e if retry_interval: time.sleep(retry_interval) # This should never be reachable assert False
[docs] async def async_auto_retry(func: Union[Callable[[], T], Callable[[], Awaitable[T]]], n_retries: Optional[int] = None, retry_interval: Optional[Union[int, float]] = None) -> T: """ Attempt to perform a request with automatic retries. A retry is triggered by :any:`RequestError` or :any:`RetriableYaDiskError`. :param func: function to run, must not require any arguments :param n_retries: `int`, maximum number of retries :param retry_interval: `int` or `float`, delay between retries (in seconds) :returns: return value of func() """ if n_retries is None: n_retries = settings.DEFAULT_N_RETRIES if retry_interval is None: retry_interval = settings.DEFAULT_RETRY_INTERVAL is_coro = asyncio.iscoroutinefunction(func) # Suppress false type hint errors callback: Any = func for i in range(n_retries + 1): try: if is_coro: return await callback() else: return callback() except (RequestError, RetriableYaDiskError) as e: if i == n_retries: raise e if retry_interval: await asyncio.sleep(retry_interval) # This should never be reachable assert False