[docs]classSyncMemoryCache(BaseCache):"""Synchronous in-memory cache implementation. This class provides a thread-safe synchronous in-memory cache implementation. It provides methods to set and get data with optional TTL (time-to-live) support. """def__init__(self)->None:"""Initialize the SyncMemoryCache. Returns ------- None """self._data:dict[str,tuple[dict[str,Any],Optional[float]]]={}self._lock=threading.Lock()
[docs]defget(self,key:str)->Optional[dict[str,Any]]:"""Get a value from the cache. Args: key (str): The key to retrieve the value for. Returns: Optional[dict[str, Any]]: The value associated with the key, or None if not found or expired. """withself._lock:entry=self._data.get(key)ifentryisNone:returnNonevalue,expires=entryifexpiresandexpires<time.time():self._data.pop(key)returnNonereturnvalue
[docs]defset(self,key:str,value:dict[str,Any],ttl:Optional[int]=None)->None:"""Set a value in the cache. Args: key (str): The key to associate the value with. value (dict[str, Any]): The value to store. ttl (Optional[int]): The time-to-live in seconds. Defaults to None. Returns: None """withself._lock:expires=time.time()+ttlifttlelseNoneself._data[key]=(value,expires)
[docs]classAsyncMemoryCache(BaseAsyncCache):"""Asynchronous in-memory cache implementation. This class provides an async-safe in-memory cache implementation. It provides methods to set and get data with optional TTL (time-to-live) support. """def__init__(self)->None:"""Initialize the AsyncMemoryCache. Returns ------- None """self._data:dict[str,tuple[dict[str,Any],Optional[float]]]={}self._lock=asyncio.Lock()
[docs]asyncdefaget(self,key:str)->Optional[dict[str,Any]]:"""Get a value from the cache asynchronously. Args: key (str): The key to retrieve the value for. Returns: Optional[dict[str, Any]]: The value associated with the key, or None if not found or expired. """asyncwithself._lock:entry=self._data.get(key)ifentryisNone:returnNonevalue,expires=entryifexpiresandexpires<time.time():delself._data[key]returnNonereturnvalue
[docs]asyncdefaset(self,key:str,value:dict[str,Any],ttl:Optional[int]=None,)->None:"""Set a value in the cache asynchronously. Args: key (str): The key to associate the value with. value (dict[str, Any]): The value to store. ttl (Optional[int]): The time-to-live in seconds. Defaults to None. Returns: None """asyncwithself._lock:expires=time.time()+ttlifttlelseNoneself._data[key]=(value,expires)