from __future__ import annotations
import hashlib
import hmac
import logging
import time
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable, Literal
from urllib import parse
import aiohttp
import requests
from pydantic import BaseModel, ValidationError
from typing_extensions import Self
from bingx_py.config import cache_config
from . import exceptions
from .caching.base import BaseAsyncCache, BaseCache
if TYPE_CHECKING:
from types import TracebackType
HttpMethod = Literal["GET", "POST", "PUT", "DELETE"]
logger = logging.getLogger("bingx-py")
[docs]
class HttpClient(ABC):
"""Base class for HTTP clients with caching support."""
def __init__(
self,
base_url: str,
cache: BaseCache | BaseAsyncCache | None = None,
default_cache_ttl: int = 300,
) -> None:
"""Initialize the HTTP client.
Args:
base_url (str): The base URL of the API.
cache (Optional[BaseCache | BaseAsyncCache]): The cache instance to use. If not provided, the global cache is used.
default_cache_ttl (int): The default time-to-live (TTL) for cached data in seconds. Defaults to 300 seconds.
Returns:
None
"""
self.base_url = base_url
self.cache = cache if cache else cache_config.get_cache()
self.default_cache_ttl = default_cache_ttl
self._session: requests.Session | None = None
self._async_session: aiohttp.ClientSession | None = None
[docs]
def connect(self) -> None:
"""Initialize the synchronous HTTP session.
Returns:
None
"""
logger.debug("Initializing synchronous session.")
self._session = requests.Session()
[docs]
def close(self) -> None:
"""Close the synchronous HTTP session.
Returns
-------
None
"""
if self._session:
logger.debug("Closing synchronous session.")
self._session.close()
[docs]
async def connect_async(self) -> None:
"""Initialize the asynchronous HTTP session.
Returns
-------
None
"""
logger.debug("Initializing asynchronous session.")
self._async_session = aiohttp.ClientSession()
[docs]
async def close_async(self) -> None:
"""Close the asynchronous HTTP session.
Returns
-------
None
"""
if self._async_session:
logger.debug("Closing asynchronous session.")
await self._async_session.close()
def __enter__(self) -> Self:
"""Enter the synchronous context manager and initialize the session.
Returns
-------
Self: The instance of the HTTP client.
"""
logger.debug("Entering synchronous context manager.")
self._session = requests.Session()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
"""Exit the synchronous context manager and close the session.
Args:
exc_type (Optional[Type[BaseException]]): The exception type, if any.
exc (Optional[BaseException]): The exception instance, if any.
tb (Optional[TracebackType]): The traceback, if any.
Returns:
None
"""
logger.debug("Exiting synchronous context manager.")
if self._session:
self._session.close()
async def __aenter__(self) -> Self:
"""Enter the asynchronous context manager and initialize the session.
Returns
-------
Self: The instance of the HTTP client.
"""
logger.debug("Entering asynchronous context manager.")
self._async_session = aiohttp.ClientSession()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
"""Exit the asynchronous context manager and close the session.
Args:
exc_type (Optional[Type[BaseException]]): The exception type, if any.
exc (Optional[BaseException]): The exception instance, if any.
tb (Optional[TracebackType]): The traceback, if any.
Returns:
None
"""
logger.debug("Exiting asynchronous context manager.")
if self._async_session:
await self._async_session.close()
def _generate_cache_key(
self,
method: HttpMethod,
endpoint: str,
params: dict[str, Any] | None = None,
unique_cache_attribute: str | None = None,
) -> str:
"""Generate a unique cache key based on the request details.
Args:
method (HttpMethod): The HTTP method (e.g., GET, POST).
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
Returns:
str: A unique cache key.
"""
key_parts = [method, endpoint]
if params:
key_parts.append(parse.urlencode(sorted(params.items())))
if unique_cache_attribute:
key_parts.append(unique_cache_attribute)
cache_key = ":".join(key_parts)
logger.debug(f"Generated cache key: {cache_key}")
return cache_key
def _request(
self,
method: HttpMethod,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
use_cache: bool = False,
unique_cache_attribute: str | None = None,
) -> dict[str, Any]:
"""Perform a synchronous HTTP request with optional caching.
Args:
method (HttpMethod): The HTTP method (e.g., GET, POST).
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
use_cache (bool): Whether to use caching for the request. Defaults to False.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
Returns:
Dict[str, Any]: The response data.
Raises:
------
RuntimeError: If the synchronous session is not initialized.
ValueError: If caching is attempted for non-GET requests without enabling unsafe caching.
"""
if not self._session:
msg = "Synchronous session is not initialized. Use context manager (with)."
raise RuntimeError(
msg,
)
if use_cache and method != "GET" and not cache_config.is_unsafe_cache_enabled():
msg = "Cache is supported only for GET requests. If you want to cache data with another method, use config.enable_unsafe_cache"
raise ValueError(
msg,
)
# bound `cache_key` to None to avoid generate cache key without `use_cache`
cache_key = None
# Try to get data from cache
if use_cache:
if self.cache:
if isinstance(self.cache, BaseCache):
# Generate cache key
cache_key = self._generate_cache_key(
method,
endpoint,
params,
unique_cache_attribute,
)
logger.debug(f"Checking cache for key: {cache_key}")
if (cached_data := self.cache.get(cache_key)) is not None:
logger.debug("Cache hit. Returning cached data.")
return cached_data
else:
logger.warning(
"Can`t use async cache. You made a synchronous request with an async cache. Consider providing an async cache via the `cache` parameter or setting it globally in the config.",
)
else:
logger.warning(
"Can`t use cache cause cache instance doesn`t set up via the `cache` parameter or via globally setting in the config.",
)
# Make request
url = f"{self.base_url}{endpoint}"
logger.debug(
f"Making {method} request to {url} with params: {params}",
)
response = self._session.request(
method,
url,
params=params,
headers=headers,
)
response.raise_for_status()
try:
data = response.json()
except requests.JSONDecodeError:
data: dict[str, Any] = {}
logger.debug(f"Response\nUrl: {response.url}\nData: {data}")
self._check_errors(data)
# Save data to cache
if use_cache and self.cache and isinstance(self.cache, BaseCache) and cache_key:
logger.debug(f"Saving data to cache with key: {cache_key}")
self.cache.set(cache_key, data, ttl=self.default_cache_ttl)
return data
async def _async_request(
self,
method: HttpMethod,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
use_cache: bool = False,
unique_cache_attribute: str | None = None,
) -> dict[str, Any]:
"""Perform an asynchronous HTTP request with optional caching.
Args:
method (HttpMethod): The HTTP method (e.g., GET, POST).
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
use_cache (bool): Whether to use caching for the request. Defaults to False.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
Returns:
Dict[str, Any]: The response data.
Raises:
------
RuntimeError: If the asynchronous session is not initialized.
ValueError: If caching is attempted for non-GET requests without enabling unsafe caching.
"""
if not self._async_session:
msg = "Asynchronous session is not initialized. Use context manager (async with)."
raise RuntimeError(
msg,
)
# bound `cache_key` to None to avoid generate cache key without `use_cache`
cache_key = None
if use_cache and method != "GET" and not cache_config.is_unsafe_cache_enabled():
msg = "Cache is supported only for GET requests. If you want to cache data with another method, use need to invoke `config.enable_unsafe_cache`"
raise ValueError(
msg,
)
# Try to get data from cache
if use_cache:
if self.cache:
# Generate cache key
cache_key = self._generate_cache_key(
method,
endpoint,
params,
unique_cache_attribute,
)
if isinstance(self.cache, BaseAsyncCache):
logger.debug(f"Checking async cache for key: {cache_key}")
if (cached_data := await self.cache.aget(cache_key)) is not None:
logger.debug("Cache hit. Returning cached data.")
return cached_data
else:
logger.warning(
"You made a asynchronous request with an sync cache. This may be slower. Consider providing an async cache via the `cache` parameter or setting it globally in the config.",
)
if (cached_data := self.cache.get(cache_key)) is not None:
logger.debug("Cache hit. Returning cached data.")
return cached_data
else:
logger.warning(
"Can`t use cache cause cache instance doesn`t set up via the `cache` parameter or via globally setting in the config.",
)
# Make request
url = f"{self.base_url}{endpoint}"
logger.debug(f"Making async {method} request to {url} with params: {params}")
async with self._async_session.request(
method,
url,
params=params,
headers=headers,
) as response:
response.raise_for_status()
data = await response.json(content_type=None)
logger.debug(f"Recieve response with data:\n{data}")
self._check_errors(data or {})
# Save data to cache
if use_cache and self.cache and cache_key is not None:
if isinstance(self.cache, BaseAsyncCache):
logger.debug(f"Saving data to async cache with key: {cache_key}")
await self.cache.aset(cache_key, data, ttl=self.default_cache_ttl)
else:
logger.warning(
"You made a asynchronous request with an sync cache. This may be slower. Consider providing an async cache via the `cache` parameter or setting it globally in the config.",
)
logger.debug(f"Saving data to sync cache with key: {cache_key}")
self.cache.set(cache_key, data, ttl=self.default_cache_ttl)
return data
@abstractmethod
def _check_errors(self, data: dict[str, Any]) -> None:
"""Check the API response for errors and raise exceptions if necessary.
Args:
data (Dict[str, Any]): The parsed response data from the API.
Raises:
------
NotImplementedError: If the method is not implemented in a subclass.
"""
msg = "Method _check_errors must be implemented"
raise NotImplementedError(msg)
[docs]
class BingXHttpClient(HttpClient):
"""HTTP client for interacting with the BingX API."""
def __init__(
self,
api_key: str,
api_secret: str,
base_url: str,
cache: BaseCache | BaseAsyncCache | None = None,
default_cache_ttl: int = 300,
) -> None:
"""Initialize the BingX HTTP client.
Args:
api_key (str): The API key for authentication.
api_secret (str): The API secret for signing requests.
base_url (str): The base URL of the BingX API.
cache (Optional[BaseCache | BaseAsyncCache]): The cache instance to use. If not provided, the global cache is used.
default_cache_ttl (int): The default time-to-live (TTL) for cached data in seconds. Defaults to 300 seconds.
Returns:
None
"""
self._api_key = api_key
self._api_secret = api_secret
super().__init__(base_url, cache, default_cache_ttl)
def _parse_params(self, params: dict[str, Any]) -> str:
"""Parse request parameters to a string.
Args:
params (Dict[str, Any]): The request parameters.
Returns:
str: The request parameters as serialized string.
"""
params_string = "&".join(
f"{k}={str(v).replace(' ', '') if isinstance(v, list) else v}" # type: ignore
for k, v in sorted(params.items())
if v
)
if params_string != "":
return params_string + "×tamp=" + str(int(time.time() * 1000))
return params_string + "timestamp=" + str(int(time.time() * 1000))
def _sign_request(self, params_str: str) -> str:
"""Prepare and sign the request parameters.
Args:
params_str (str): The request parameters as serialized string.
Returns:
str: The signature of request parameters.
"""
# Generate HMAC SHA256 signature
return hmac.new(
self._api_secret.encode("utf-8"),
params_str.encode("utf-8"),
hashlib.sha256,
).hexdigest()
def _generate_cache_key(
self,
method: HttpMethod,
endpoint: str,
params: dict[str, Any] | None = None,
unique_cache_attribute: str | None = None,
) -> str:
"""Generate a unique cache key based on the request details.
Exclude timestamp and signature from the cache key.
Args:
method (HttpMethod): The HTTP method (e.g., GET, POST).
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
Returns:
str: A unique cache key.
"""
key_parts = [method, endpoint]
if params:
filtered_params = {
k: v for k, v in params.items() if k not in {"signature", "timestamp"}
}
key_parts.append(parse.urlencode(sorted(filtered_params.items())))
if unique_cache_attribute:
key_parts.append(unique_cache_attribute)
cache_key = ":".join(key_parts)
logger.debug(f"Generated cache key: {cache_key}")
return cache_key
def _request(
self,
method: HttpMethod,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
use_cache: bool = False,
unique_cache_attribute: str | None = None,
override_signature: Callable[[str], str] | None = None,
) -> dict[str, Any]:
"""Perform a signed synchronous HTTP request.
Args:
method (HttpMethod): The HTTP method (e.g., GET, POST).
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
use_cache (bool): Whether to use caching for the request. Defaults to False.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
override_signature (Optional[Callable[[Dict[str, Any]], Dict[str, Any]]]): An optional function to override the default signing behavior.
Returns:
Dict[str, Any]: The response data.
"""
headers = headers or {}
headers["X-BX-APIKEY"] = self._api_key
params = params or {}
params_str = self._parse_params(params)
if override_signature:
signature = override_signature(params_str)
else:
signature = self._sign_request(params_str)
endpoint += f"?{params_str}&signature={signature}"
return super()._request(
method,
endpoint,
headers=headers,
use_cache=use_cache,
unique_cache_attribute=unique_cache_attribute,
)
async def _async_request(
self,
method: HttpMethod,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
use_cache: bool = False,
unique_cache_attribute: str | None = None,
override_signature: Callable[[str], str] | None = None,
) -> dict[str, Any]:
"""Perform a signed asynchronous HTTP request.
Args:
method (HttpMethod): The HTTP method (e.g., GET, POST).
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
use_cache (bool): Whether to use caching for the request. Defaults to False.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
override_signature (Optional[Callable[[Dict[str, Any]], Dict[str, Any]]]): An optional function to override the default signing behavior.
Returns:
Dict[str, Any]: The response data.
"""
headers = headers or {}
headers["X-BX-APIKEY"] = self._api_key
params = params or {}
params_str = self._parse_params(params)
if override_signature:
signature = override_signature(params_str)
else:
signature = self._sign_request(params_str)
endpoint += f"?{params_str}&signature={signature}"
return await super()._async_request(
method,
endpoint,
headers=headers,
use_cache=use_cache,
unique_cache_attribute=unique_cache_attribute,
)
def _check_errors(self, data: dict[str, Any]) -> None:
"""Check the API response for errors and raise exceptions if necessary.
Args:
data (Dict[str, Any]): The parsed response data from the API.
Raises:
------
exceptions.APIError: If the API response contains an error.
"""
if "code" not in data or data["code"] == 0:
return # No error
error_code = data["code"]
error_message = data.get("msg", "No error message provided")
timestamp = data.get("timestamp")
logger.debug(
f"API error detected. Code: {error_code}, Message: {error_message}, Timestamp: {timestamp}",
)
# Map error codes to specific exception classes
error_mapping: dict[int, type[exceptions.APIError]] = {
# 4XX Errors
400: exceptions.BadRequestError,
401: exceptions.UnauthorizedError,
403: exceptions.ForbiddenError,
404: exceptions.NotFoundError,
429: exceptions.TooManyRequestsError,
418: exceptions.IPBannedError,
# 5XX Errors
500: exceptions.InternalServerError,
504: exceptions.GatewayTimeoutError,
# Common Business Errors
100001: exceptions.SignatureVerificationFailedError,
100500: exceptions.InternalSystemError,
80012: exceptions.OperationError,
80014: exceptions.InvalidParameterError,
80016: exceptions.OrderNotFoundError,
80017: exceptions.PositionNotFoundError,
80020: exceptions.RiskForbiddenError,
100004: exceptions.PermissionDeniedError,
100419: exceptions.IPWhitelistError,
101204: exceptions.InsufficientMarginError,
80013: exceptions.OrderLimitReachedError,
80018: exceptions.OrderAlreadyFilledError,
80019: exceptions.OrderProcessingError,
100412: exceptions.NullSignatureError,
100413: exceptions.IncorrectAPIKeyError,
100421: exceptions.TimestampError,
100410: exceptions.RateLimitError,
101209: exceptions.MaxPositionValueError,
101212: exceptions.PendingOrdersError,
101215: exceptions.MakerOrderError,
101414: exceptions.MaxLeverageError,
101415: exceptions.TradingPairSuspendedError,
101460: exceptions.LiquidationPriceError,
101500: exceptions.RPCTimeoutError,
101514: exceptions.SuspendedFromOpeningPositionsError,
109201: exceptions.DuplicateOrderError,
101211: exceptions.OrderPriceError,
101400: exceptions.TradeValidationError,
80001: exceptions.TradeExecutionError,
}
# Get the appropriate exception class or fall back to the base APIError
exception_class = error_mapping.get(error_code, exceptions.APIError)
# Raise the exception with the error message and timestamp
if exception_class == exceptions.APIError:
raise exception_class(
message=error_message,
timestamp=timestamp,
code=error_code,
)
raise exception_class(message=error_message, timestamp=timestamp) # type: ignore
# ------------------------------
# HTTP Method Shortcuts (Synchronous)
# ------------------------------
[docs]
def get(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
use_cache: bool = False,
unique_cache_attribute: str | None = None,
) -> dict[str, Any]:
"""Perform a synchronous GET request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
use_cache (bool): Whether to use caching for the request. Defaults to False.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
Returns:
Dict[str, Any]: The response data.
"""
return self._request(
"GET",
endpoint,
params=params,
headers=headers,
use_cache=use_cache,
unique_cache_attribute=unique_cache_attribute,
)
[docs]
def post(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Perform a synchronous POST request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
Returns:
Dict[str, Any]: The response data.
"""
return self._request(
"POST",
endpoint,
params=params,
headers=headers,
)
[docs]
def put(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Perform a synchronous PUT request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
Returns:
Dict[str, Any]: The response data.
"""
return self._request(
"PUT",
endpoint,
params=params,
headers=headers,
)
[docs]
def delete(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Perform a synchronous DELETE request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
Returns:
Dict[str, Any]: The response data.
"""
return self._request(
"DELETE",
endpoint,
params=params,
headers=headers,
)
# ------------------------------
# HTTP Method Shortcuts (Asynchronous)
# ------------------------------
[docs]
async def async_get(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
use_cache: bool = False,
unique_cache_attribute: str | None = None,
) -> dict[str, Any]:
"""Perform an asynchronous GET request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
use_cache (bool): Whether to use caching for the request. Defaults to False.
unique_cache_attribute (Optional[str]): An optional unique attribute for the cache key.
Returns:
Dict[str, Any]: The response data.
"""
return await self._async_request(
"GET",
endpoint,
params=params,
headers=headers,
use_cache=use_cache,
unique_cache_attribute=unique_cache_attribute,
)
[docs]
async def async_post(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Perform an asynchronous POST request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
Returns:
Dict[str, Any]: The response data.
"""
return await self._async_request(
"POST",
endpoint,
params=params,
headers=headers,
)
[docs]
async def async_put(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Perform an asynchronous PUT request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
Returns:
Dict[str, Any]: The response data.
"""
return await self._async_request(
"PUT",
endpoint,
params=params,
headers=headers,
)
[docs]
async def async_delete(
self,
endpoint: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Perform an asynchronous DELETE request.
Args:
endpoint (str): The API endpoint.
params (Optional[Dict[str, Any]]): The request parameters.
headers (Optional[Dict[str, str]]): The request headers.
Returns:
Dict[str, Any]: The response data.
"""
return await self._async_request(
"DELETE",
endpoint,
params=params,
headers=headers,
)
[docs]
def save_convert(
self,
data: dict[str, Any],
pydantic_model: type[BaseModel],
) -> Any:
"""Convert response data to a specified Pydantic model.
Args:
data (Dict[str, Any]): The data to be converted.
pydantic_model (Type[BaseModel]): The Pydantic model class for conversion.
Returns:
Any: An instance of the provided Pydantic model.
Raises:
ConversionError: If conversion to the Pydantic model fails due to validation errors.
"""
try:
return pydantic_model(**data)
except ValidationError as e:
logger.warning(f"Failed to convert data: {e}.")
logger.warning("nRaising ConversionError with initial data.")
raise exceptions.ConversionError(data, pydantic_model) from e