import urllib.parse
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from threading import Thread
from typing import Any, Callable, Optional, Union
import requests
from multiversx_sdk.core.address import Address
from multiversx_sdk.core.config import LibraryConfig
from multiversx_sdk.core.constants import (ESDT_CONTRACT_ADDRESS_HEX,
METACHAIN_ID)
from multiversx_sdk.core.tokens import Token
from multiversx_sdk.core.transaction import Transaction
from multiversx_sdk.core.transaction_on_network import TransactionOnNetwork
from multiversx_sdk.core.transaction_status import TransactionStatus
from multiversx_sdk.network_providers.account_awaiter import AccountAwaiter
from multiversx_sdk.network_providers.config import NetworkProviderConfig
from multiversx_sdk.network_providers.constants import (
BASE_USER_AGENT, DEFAULT_ACCOUNT_AWAITING_PATIENCE_IN_MILLISECONDS)
from multiversx_sdk.network_providers.errors import (GenericError,
TransactionFetchingError)
from multiversx_sdk.network_providers.http_resources import (
account_from_proxy_response, account_storage_entry_from_response,
account_storage_from_response, block_from_response,
definition_of_fungible_token_from_query_response,
definition_of_tokens_collection_from_query_response,
network_config_from_response, network_status_from_response,
smart_contract_query_to_vm_query_request,
token_amount_on_network_from_proxy_response,
token_amounts_from_proxy_response,
transaction_cost_estimation_from_response, transaction_from_proxy_response,
transaction_from_simulate_response,
transactions_from_send_multiple_response,
vm_query_response_to_smart_contract_query_response)
from multiversx_sdk.network_providers.interface import INetworkProvider
from multiversx_sdk.network_providers.resources import (
AccountOnNetwork, AccountStorage, AccountStorageEntry, AwaitingOptions,
BlockOnNetwork, FungibleTokenMetadata, GenericResponse, GetBlockArguments,
NetworkConfig, NetworkStatus, TokenAmountOnNetwork,
TokensCollectionMetadata, TransactionCostResponse)
from multiversx_sdk.network_providers.shared import convert_tx_hash_to_string
from multiversx_sdk.network_providers.transaction_awaiter import \
TransactionAwaiter
from multiversx_sdk.network_providers.user_agent import extend_user_agent
from multiversx_sdk.smart_contracts.smart_contract_query import (
SmartContractQuery, SmartContractQueryResponse)
[docs]
class ProxyNetworkProvider(INetworkProvider):
def __init__(self,
url: str,
address_hrp: Optional[str] = None,
config: Optional[NetworkProviderConfig] = None) -> None:
self.url = url
self.address_hrp = address_hrp or LibraryConfig.default_address_hrp
self.config = config if config is not None else NetworkProviderConfig()
self.user_agent_prefix = f"{BASE_USER_AGENT}/proxy"
extend_user_agent(self.user_agent_prefix, self.config)
[docs]
def get_network_config(self) -> NetworkConfig:
"""Fetches the general configuration of the network."""
response = self.do_get_generic('network/config')
return network_config_from_response(response.get('config', {}))
[docs]
def get_network_status(self, shard: int = METACHAIN_ID) -> NetworkStatus:
"""Fetches the current status of the network."""
response = self.do_get_generic(f'network/status/{shard}')
return network_status_from_response(response.get('status', ''))
[docs]
def get_block(self, arguments: GetBlockArguments) -> BlockOnNetwork:
"""Fetches a block by nonce or by hash."""
if not arguments.shard:
raise Exception("Shard not provided. Please set the shard in the arguments.")
if arguments.block_hash:
response = self.do_get_generic(f"block/{arguments.shard}/by-hash/{arguments.block_hash.hex()}")
elif arguments.block_nonce:
response = self.do_get_generic(f"block/{arguments.shard}/by-nonce/{arguments.block_nonce}")
else:
raise Exception("Block hash or block nonce not provided")
return block_from_response(response.get("block", {}))
[docs]
def get_latest_block(self, shard: int = METACHAIN_ID) -> BlockOnNetwork:
"""Fetches the latest block of a shard."""
block_nonce = self.get_network_status(shard).block_nonce
response = self.do_get_generic(f"block/{shard}/by-nonce/{block_nonce}")
return block_from_response(response.get("block", {}))
[docs]
def get_account(self, address: Address) -> AccountOnNetwork:
"""Fetches account information for a given address."""
data: dict[str, bool] = {}
get_guardian_data_thread = Thread(target=self._get_guardian_data, args=(address, data))
get_guardian_data_thread.start()
response = self.do_get_generic(f'address/{address.to_bech32()}')
account = account_from_proxy_response(response.to_dictionary())
get_guardian_data_thread.join(timeout=2)
account.is_guarded = data.get("is_guarded", False)
return account
def _get_guardian_data(self, address: Address, return_data: dict[str, bool]):
guardian_data = self.do_get_generic(f"address/{address.to_bech32()}/guardian-data")
return_data["is_guarded"] = bool(guardian_data.get("guardianData", {}).get("guarded"))
[docs]
def get_account_storage(self, address: Address) -> AccountStorage:
"""
Fetches the storage (key-value pairs) of an account.
When decoding the keys, the errors are ignored. Use the raw values if needed.
"""
response = self.do_get_generic(f"address/{address.to_bech32()}/keys")
return account_storage_from_response(response.to_dictionary())
[docs]
def get_account_storage_entry(self, address: Address, entry_key: str) -> AccountStorageEntry:
"""Fetches a specific storage entry of an account."""
key_as_hex = entry_key.encode().hex()
response = self.do_get_generic(f"address/{address.to_bech32()}/key/{key_as_hex}")
return account_storage_entry_from_response(response.to_dictionary(), entry_key)
[docs]
def await_account_on_condition(
self, address: Address, condition: Callable[[AccountOnNetwork],
bool],
options: Optional[AwaitingOptions] = None) -> AccountOnNetwork:
"""Waits until an account satisfies a given condition."""
if options is None:
options = AwaitingOptions(patience_in_milliseconds=DEFAULT_ACCOUNT_AWAITING_PATIENCE_IN_MILLISECONDS)
awaiter = AccountAwaiter(
fetcher=self,
polling_interval_in_milliseconds=options.polling_interval_in_milliseconds,
timeout_interval_in_milliseconds=options.timeout_in_milliseconds,
patience_time_in_milliseconds=options.patience_in_milliseconds
)
return awaiter.await_on_condition(address=address, condition=condition)
[docs]
def send_transaction(self, transaction: Transaction) -> bytes:
"""Broadcasts a transaction and returns its hash."""
response = self.do_post_generic(
'transaction/send', transaction.to_dictionary())
return bytes.fromhex(response.get('txHash', ''))
[docs]
def simulate_transaction(self, transaction: Transaction, check_signature: bool = False) -> TransactionOnNetwork:
"""Simulates a transaction."""
url = 'transaction/simulate?checkSignature=false'
if check_signature:
url = 'transaction/simulate'
response = self.do_post_generic(url, transaction.to_dictionary())
return transaction_from_simulate_response(transaction, response.to_dictionary().get("result", {}))
[docs]
def estimate_transaction_cost(self, transaction: Transaction) -> TransactionCostResponse:
"""Estimates the cost of a transaction."""
response = self.do_post_generic(
'transaction/cost', transaction.to_dictionary())
return transaction_cost_estimation_from_response(response.to_dictionary())
[docs]
def send_transactions(self, transactions: list[Transaction]) -> tuple[int, list[bytes]]:
"""
Broadcasts multiple transactions and returns a tuple of (number of accepted transactions, list of transaction hashes).
In the returned list, the order of transaction hashes corresponds to the order of transactions in the input list.
If a transaction is not accepted, its hash is empty in the returned list.
"""
transactions_as_dictionaries = [transaction.to_dictionary() for transaction in transactions]
response = self.do_post_generic('transaction/send-multiple', transactions_as_dictionaries)
return transactions_from_send_multiple_response(response.to_dictionary(), len(transactions))
[docs]
def get_transaction(self, transaction_hash: Union[bytes, str]) -> TransactionOnNetwork:
"""Fetches a transaction that was previously broadcasted (maybe already processed by the network)."""
transaction_hash = convert_tx_hash_to_string(transaction_hash)
def get_tx() -> dict[str, Any]:
url = f"transaction/{transaction_hash}?withResults=true"
return self.do_get_generic(url).get('transaction', '')
status_task = None
with ThreadPoolExecutor(max_workers=2) as executor:
try:
status_task = executor.submit(self.get_transaction_status, transaction_hash)
tx_task = executor.submit(get_tx)
process_status = status_task.result(timeout=5)
tx = tx_task.result(timeout=5)
except TimeoutError:
raise TimeoutError("Fetching transaction or process status timed out")
except GenericError as ge:
raise TransactionFetchingError(ge.url, ge.data)
return transaction_from_proxy_response(transaction_hash, tx, process_status)
[docs]
def await_transaction_completed(
self, transaction_hash: Union[bytes, str],
options: Optional[AwaitingOptions] = None) -> TransactionOnNetwork:
"""Waits until the transaction is completely processed."""
transaction_hash = convert_tx_hash_to_string(transaction_hash)
if options is None:
options = AwaitingOptions()
awaiter = TransactionAwaiter(
fetcher=self,
polling_interval_in_milliseconds=options.polling_interval_in_milliseconds,
timeout_interval_in_milliseconds=options.timeout_in_milliseconds,
patience_time_in_milliseconds=options.patience_in_milliseconds
)
return awaiter.await_completed(transaction_hash)
[docs]
def await_transaction_on_condition(self,
transaction_hash: Union[str, bytes],
condition: Callable[[TransactionOnNetwork], bool],
options: Optional[AwaitingOptions] = None) -> TransactionOnNetwork:
"""Waits until a transaction satisfies a given condition."""
transaction_hash = convert_tx_hash_to_string(transaction_hash)
if options is None:
options = AwaitingOptions()
awaiter = TransactionAwaiter(
fetcher=self,
polling_interval_in_milliseconds=options.polling_interval_in_milliseconds,
timeout_interval_in_milliseconds=options.timeout_in_milliseconds,
patience_time_in_milliseconds=options.patience_in_milliseconds
)
return awaiter.await_on_condition(transaction_hash, condition)
[docs]
def get_token_of_account(self, address: Address, token: Token) -> TokenAmountOnNetwork:
"""
Fetches the balance of an account, for a given token.
Able to handle both fungible and non-fungible tokens (NFTs, SFTs, MetaESDTs).
"""
if token.nonce == 0:
response = self.do_get_generic(f"address/{address.to_bech32()}/esdt/{token.identifier}")
else:
response = self.do_get_generic(f"address/{address.to_bech32()}/nft/{token.identifier}/nonce/{token.nonce}")
return token_amount_on_network_from_proxy_response(response.to_dictionary())
[docs]
def get_fungible_tokens_of_account(self, address: Address) -> list[TokenAmountOnNetwork]:
"""
Fetches the balances of an account, for all fungible tokens held by the account.
Pagination isn't explicitly handled by a basic network provider, but can be achieved by using `do_get_generic`.
"""
response = self.do_get_generic(f"address/{address.to_bech32()}/esdt")
all_tokens = token_amounts_from_proxy_response(response.to_dictionary())
return [token for token in all_tokens if token.token.nonce == 0]
[docs]
def get_non_fungible_tokens_of_account(self, address: Address) -> list[TokenAmountOnNetwork]:
"""
Fetches the balances of an account, for all non-fungible tokens held by the account.
Pagination isn't explicitly handled by a basic network provider, but can be achieved by using `do_get_generic`.
"""
response = self.do_get_generic(f"address/{address.to_bech32()}/esdt")
all_tokens = token_amounts_from_proxy_response(response.to_dictionary())
return [token for token in all_tokens if token.token.nonce > 0]
[docs]
def get_definition_of_fungible_token(self, token_identifier: str) -> FungibleTokenMetadata:
"""Fetches the definition of a fungible token."""
encoded_identifier = token_identifier.encode()
query = SmartContractQuery(
contract=Address.new_from_hex(ESDT_CONTRACT_ADDRESS_HEX, self.address_hrp),
function="getTokenProperties",
arguments=[encoded_identifier],
)
query_response = self.query_contract(query)
return definition_of_fungible_token_from_query_response(
query_response.return_data_parts, token_identifier, self.address_hrp
)
[docs]
def get_definition_of_tokens_collection(self, collection_name: str) -> TokensCollectionMetadata:
"""Fetches the definition of a tokens collection."""
encoded_identifier = collection_name.encode()
query = SmartContractQuery(
contract=Address.new_from_hex(ESDT_CONTRACT_ADDRESS_HEX, self.address_hrp),
function="getTokenProperties",
arguments=[encoded_identifier],
)
query_response = self.query_contract(query)
return definition_of_tokens_collection_from_query_response(
query_response.return_data_parts, collection_name, self.address_hrp
)
[docs]
def query_contract(self, query: SmartContractQuery) -> SmartContractQueryResponse:
"""Queries a smart contract."""
request = smart_contract_query_to_vm_query_request(query)
response = self.do_post_generic('vm-values/query', request)
response = response.get('data', '')
return vm_query_response_to_smart_contract_query_response(response, query.function)
[docs]
def get_transaction_status(self, tx_hash: str) -> TransactionStatus:
"""Fetches the status of a transaction."""
response = self.do_get_generic(f'transaction/{tx_hash}/process-status')
return TransactionStatus(response.get('status', ''))
[docs]
def do_get_generic(self, url: str, url_parameters: Optional[dict[str, Any]] = None) -> GenericResponse:
"""Does a generic GET request against the network (handles API enveloping)."""
url = f'{self.url}/{url}'
if url_parameters is not None:
params = urllib.parse.urlencode(url_parameters)
url = f"{url}?{params}"
response = self._do_get(url)
return response
[docs]
def do_post_generic(
self, url: str, data: Any, url_parameters: Optional[dict[str, Any]] = None) -> GenericResponse:
"""Does a generic GET request against the network (handles API enveloping)."""
url = f'{self.url}/{url}'
if url_parameters is not None:
params = urllib.parse.urlencode(url_parameters)
url = f"{url}?{params}"
response = self._do_post(url, data)
return response
def _do_get(self, url: str) -> GenericResponse:
try:
response = requests.get(url, **self.config.requests_options)
response.raise_for_status()
parsed = response.json()
return self._get_data(parsed, url)
except requests.HTTPError as err:
error_data = self._extract_error_from_response(err.response)
raise GenericError(url, error_data)
except requests.ConnectionError as err:
raise GenericError(url, err)
except Exception as err:
raise GenericError(url, err)
def _do_post(self, url: str, payload: Any) -> GenericResponse:
try:
response = requests.post(url, json=payload, **self.config.requests_options)
response.raise_for_status()
parsed = response.json()
return self._get_data(parsed, url)
except requests.HTTPError as err:
error_data = self._extract_error_from_response(err.response)
raise GenericError(url, error_data)
except requests.ConnectionError as err:
raise GenericError(url, err)
except Exception as err:
raise GenericError(url, err)
def _get_data(self, parsed: dict[str, Any], url: str) -> GenericResponse:
err = parsed.get("error")
code = parsed.get("code")
if err:
raise GenericError(url, f"code:{code}, error: {err}")
data: dict[str, Any] = parsed.get("data", dict())
return GenericResponse(data)
def _extract_error_from_response(self, response: Any):
try:
return response.json()
except Exception:
return response.text