Source code for multiversx_sdk.network_providers.account_awaiter

import logging
import time
from typing import Callable, Optional, Protocol, Union

from multiversx_sdk.core.address import Address
from multiversx_sdk.network_providers.constants import (
    DEFAULT_ACCOUNT_AWAITING_PATIENCE_IN_MILLISECONDS,
    DEFAULT_ACCOUNT_AWAITING_POLLING_TIMEOUT_IN_MILLISECONDS,
    DEFAULT_ACCOUNT_AWAITING_TIMEOUT_IN_MILLISECONDS)
from multiversx_sdk.network_providers.errors import \
    ExpectedAccountConditionNotReachedError
from multiversx_sdk.network_providers.resources import AccountOnNetwork

ONE_SECOND_IN_MILLISECONDS = 1000

logger = logging.getLogger("account_awaiter")


[docs] class IAccountFetcher(Protocol):
[docs] def get_account(self, address: Address) -> AccountOnNetwork: ...
[docs] class AccountAwaiter: """AccountAwaiter allows one to await until a specific event occurs on a given address.""" def __init__(self, fetcher: IAccountFetcher, polling_interval_in_milliseconds: Optional[int] = None, timeout_interval_in_milliseconds: Optional[int] = None, patience_time_in_milliseconds: Optional[int] = None) -> None: """ Args: fetcher (IAccountFetcher): Used to fetch the account of the network. polling_interval_in_milliseconds (Optional[int]): The polling interval, in milliseconds. timeout_interval_in_milliseconds (Optional[int]): The timeout, in milliseconds. patience_time_in_milliseconds (Optional[int]): The patience, an extra time (in milliseconds) to wait, after the account has reached its desired condition. """ self.fetcher = fetcher if polling_interval_in_milliseconds is None: self.polling_interval_in_milliseconds = DEFAULT_ACCOUNT_AWAITING_POLLING_TIMEOUT_IN_MILLISECONDS else: self.polling_interval_in_milliseconds = polling_interval_in_milliseconds if timeout_interval_in_milliseconds is None: self.timeout_interval_in_milliseconds = DEFAULT_ACCOUNT_AWAITING_TIMEOUT_IN_MILLISECONDS else: self.timeout_interval_in_milliseconds = timeout_interval_in_milliseconds if patience_time_in_milliseconds is None: self.patience_time_in_milliseconds = DEFAULT_ACCOUNT_AWAITING_PATIENCE_IN_MILLISECONDS else: self.patience_time_in_milliseconds = patience_time_in_milliseconds
[docs] def await_on_condition(self, address: Address, condition: Callable[[AccountOnNetwork], bool]) -> AccountOnNetwork: """Waits until the condition is satisfied.""" def do_fetch(): return self.fetcher.get_account(address) return self._await_conditionally( is_satisfied=condition, do_fetch=do_fetch, error=ExpectedAccountConditionNotReachedError() )
def _await_conditionally(self, is_satisfied: Callable[[AccountOnNetwork], bool], do_fetch: Callable[[], AccountOnNetwork], error: Exception) -> AccountOnNetwork: is_condition_satisfied = False fetched_data: Union[AccountOnNetwork, None] = None max_number_of_retries = self.timeout_interval_in_milliseconds // self.polling_interval_in_milliseconds number_of_retries = 0 while number_of_retries < max_number_of_retries: try: fetched_data = do_fetch() is_condition_satisfied = is_satisfied(fetched_data) if is_condition_satisfied: break except Exception as ex: raise ex number_of_retries += 1 time.sleep(self.polling_interval_in_milliseconds / ONE_SECOND_IN_MILLISECONDS) if fetched_data is None or not is_condition_satisfied: raise error if self.patience_time_in_milliseconds: time.sleep(self.patience_time_in_milliseconds / ONE_SECOND_IN_MILLISECONDS) return do_fetch() return fetched_data