# -*- coding: utf-8 -*-
import asyncio
from collections import defaultdict
import time
from .objects import ErrorObject
from .exceptions import *
from . import settings
from typing import Any, Optional, Union, TypeVar
from .compat import Callable, Awaitable
from .types import AnyResponse
__all__ = ["get_exception", "auto_retry", "async_auto_retry"]
EXCEPTION_MAP = {
400: defaultdict(
lambda: BadRequestError,
{
"FieldValidationError": FieldValidationError,
"authorization_pending": AuthorizationPendingError,
"invalid_client": InvalidClientError,
"invalid_grant": InvalidGrantError,
"bad_verification_code": BadVerificationCodeError,
"unsupported_token_type": UnsupportedTokenTypeError
}
),
401: defaultdict(lambda: UnauthorizedError),
403: defaultdict(lambda: ForbiddenError),
404: defaultdict(
lambda: NotFoundError,
{
"DiskNotFoundError": PathNotFoundError,
"DiskOperationNotFoundError": OperationNotFoundError
}
),
406: defaultdict(lambda: NotAcceptableError),
409: defaultdict(
lambda: ConflictError,
{
"DiskPathDoesntExistsError": ParentNotFoundError,
"DiskPathPointsToExistentDirectoryError": DirectoryExistsError,
"DiskResourceAlreadyExistsError": PathExistsError,
"MD5DifferError": MD5DifferError
}
),
413: defaultdict(lambda: PayloadTooLargeError),
415: defaultdict(lambda: UnsupportedMediaError),
423: defaultdict(
lambda: LockedError,
{
"DiskResourceLockedError": ResourceIsLockedError,
"DiskUploadTrafficLimitExceeded": UploadTrafficLimitExceededError
}
),
429: defaultdict(lambda: TooManyRequestsError),
500: defaultdict(lambda: InternalServerError),
502: defaultdict(lambda: BadGatewayError),
503: defaultdict(lambda: UnavailableError),
504: defaultdict(lambda: GatewayTimeoutError),
507: defaultdict(lambda: InsufficientStorageError)
}
[docs]
def get_exception(response: AnyResponse, error: Optional[ErrorObject]) -> YaDiskError:
"""
Get an exception instance based on response, assuming the request has failed.
:param response: an instance of :any:`Response` or :any:`AsyncResponse`
:param error: an instance of :any:`ErrorObject` or `None`
:returns: an exception instance, subclass of :any:`YaDiskError`
"""
exc_group = EXCEPTION_MAP.get(response.status, None)
if exc_group is None:
return UnknownYaDiskError("Unknown Yandex.Disk error")
if error is not None:
msg = error.message or "<empty>"
desc = error.description or "<empty>"
error_name = error.error or "<empty>"
else:
msg = "<empty>"
desc = "<empty>"
error_name = "<empty>"
exc = exc_group[error_name]
return exc(error_name, "%s (%s / %s)" % (msg, desc, error_name), response)
T = TypeVar("T")
[docs]
def auto_retry(func: Callable[[], T],
n_retries: Optional[int] = None,
retry_interval: Optional[Union[int, float]] = None) -> T:
"""
Attempt to perform a request with automatic retries.
A retry is triggered by :any:`RequestError` or :any:`RetriableYaDiskError`.
:param func: function to run, must not require any arguments
:param n_retries: `int`, maximum number of retries
:param retry_interval: `int` or `float`, delay between retries (in seconds)
:returns: return value of func()
"""
if n_retries is None:
n_retries = settings.DEFAULT_N_RETRIES
if retry_interval is None:
retry_interval = settings.DEFAULT_RETRY_INTERVAL
for i in range(n_retries + 1):
try:
return func()
except (RequestError, RetriableYaDiskError) as e:
if i == n_retries:
raise e
if retry_interval:
time.sleep(retry_interval)
# This should never be reachable
assert False
[docs]
async def async_auto_retry(func: Union[Callable[[], T], Callable[[], Awaitable[T]]],
n_retries: Optional[int] = None,
retry_interval: Optional[Union[int, float]] = None) -> T:
"""
Attempt to perform a request with automatic retries.
A retry is triggered by :any:`RequestError` or :any:`RetriableYaDiskError`.
:param func: function to run, must not require any arguments
:param n_retries: `int`, maximum number of retries
:param retry_interval: `int` or `float`, delay between retries (in seconds)
:returns: return value of func()
"""
if n_retries is None:
n_retries = settings.DEFAULT_N_RETRIES
if retry_interval is None:
retry_interval = settings.DEFAULT_RETRY_INTERVAL
is_coro = asyncio.iscoroutinefunction(func)
# Suppress false type hint errors
callback: Any = func
for i in range(n_retries + 1):
try:
if is_coro:
return await callback()
else:
return callback()
except (RequestError, RetriableYaDiskError) as e:
if i == n_retries:
raise e
if retry_interval:
await asyncio.sleep(retry_interval)
# This should never be reachable
assert False