Source code for project_config.utils.http

"""HTTP/s utilities."""

import time
import typing as t
import urllib.request

from project_config.cache import Cache
from project_config.exceptions import ProjectConfigException


[docs]class ProjectConfigHTTPError(ProjectConfigException): """HTTP error."""
[docs]class ProjectConfigTimeoutError(ProjectConfigHTTPError): """Timeout error."""
[docs]def _GET(url: str, timeout: int = 10, sleep: float = 1.0) -> str: start = time.time() end = start + (timeout or 0.01) err = None while time.time() < end: try: return ( # type: ignore urllib.request.urlopen(urllib.request.Request(url)) .read() .decode("utf-8") ) except ( urllib.error.URLError, urllib.error.HTTPError, urllib.error.ContentTooShortError, ) as exc: err = exc.__str__() time.sleep(sleep) error_reason = "" if not err else f" Possibly caused by: {err}" raise ProjectConfigTimeoutError( f"Impossible to fetch '{url}' after {timeout} seconds.{error_reason}", )
[docs]def GET(url: str, use_cache: bool = True, **kwargs: t.Any) -> str: """Perform an HTTP/s GET request and return the result. Args: url (str): URL to which the request will be targeted. use_cache (bool): Specify if the cache must be used requesting the resource. """ if use_cache: result = Cache.get(url) if result is None: result = _GET(url, **kwargs) Cache.set(url, result) else: result = _GET(url, **kwargs) return result # type: ignore