# -*- coding: utf-8 -*-
"""Communication(grpc) module."""
import grpc
from typing import (
Optional,
List
)
from .grpc import account_pb2, blockchain_pb2, rpc_pb2, rpc_pb2_grpc, raft_pb2
from .utils.converter import convert_tx_to_grpc_tx
from .obj.transaction import Transaction
[docs]class Comm:
def __init__(
self,
target: Optional[str] = None,
tls_ca_cert: Optional[bytes] = None,
tls_cert: Optional[bytes] = None,
tls_key: Optional[bytes] = None
):
self.__target = target
self.__tls_ca_cert = tls_ca_cert
self.__tls_cert = tls_cert
self.__tls_key = tls_key
self.__channel = None
self.__rpc_stub = None
[docs] def connect(self):
self.disconnect()
if self.__tls_cert is not None:
creds = grpc.ssl_channel_credentials(
root_certificates=self.__tls_ca_cert,
private_key=self.__tls_key, certificate_chain=self.__tls_cert
)
self.__channel = grpc.secure_channel(self.__target, creds)
else:
self.__channel = grpc.insecure_channel(self.__target)
self.__rpc_stub = rpc_pb2_grpc.AergoRPCServiceStub(self.__channel)
[docs] def disconnect(self):
if self.__channel is not None:
self.__channel.close()
[docs] def create_account(self, address: bytes, passphrase: str):
if self.__rpc_stub is None:
return None
account = account_pb2.Account(address=address)
return self.__rpc_stub.CreateAccount(
request=rpc_pb2.Personal(account=account, passphrase=passphrase)
)
[docs] def get_account_state(self, address: bytes):
if self.__rpc_stub is None:
return None
rpc_account = account_pb2.Account()
rpc_account.address = address
return self.__rpc_stub.GetState(rpc_account)
[docs] def get_account_state_proof(
self,
address: bytes,
root: bytes,
compressed: bool
):
if self.__rpc_stub is None:
return None
account_and_root = rpc_pb2.AccountAndRoot(Account=address,
Root=root,
Compressed=compressed)
return self.__rpc_stub.GetStateAndProof(account_and_root)
[docs] def get_chain_info(self):
if self.__rpc_stub is None:
return None
return self.__rpc_stub.GetChainInfo(rpc_pb2.Empty())
[docs] def get_consensus_info(self):
if self.__rpc_stub is None:
return None
return self.__rpc_stub.GetConsensusInfo(rpc_pb2.Empty())
[docs] def get_node_info(self, keys: Optional[str]):
if self.__rpc_stub is None:
return None
# currently not working
key_params = rpc_pb2.KeyParams()
if keys is not None:
key_params.key.extend(keys)
return self.__rpc_stub.GetServerInfo(key_params)
[docs] def receive_event_stream(
self,
sc_address: bytes,
event_name: str,
start_block_no: int,
end_block_no: int,
with_desc: bool,
arg_filter: Optional[bytes],
recent_block_cnt: int
):
if self.__rpc_stub is None:
return None
filter = blockchain_pb2.FilterInfo()
filter.contractAddress = sc_address
filter.eventName = event_name
filter.blockfrom = start_block_no
filter.blockto = end_block_no
filter.desc = with_desc
if arg_filter is not None:
filter.argFilter = arg_filter
filter.recentBlockCnt = recent_block_cnt
return self.__rpc_stub.ListEventStream(filter)
[docs] def get_events(
self,
sc_address: bytes,
event_name: str,
start_block_no: int,
end_block_no: int,
with_desc: bool,
arg_filter: Optional[bytes],
recent_block_cnt: int
):
if self.__rpc_stub is None:
return None
filter = blockchain_pb2.FilterInfo()
filter.contractAddress = sc_address
filter.eventName = event_name
if start_block_no > 0:
filter.blockfrom = start_block_no
if end_block_no > 0:
filter.blockto = end_block_no
filter.desc = with_desc
if arg_filter is not None:
filter.argFilter = arg_filter
filter.recentBlockCnt = recent_block_cnt
return self.__rpc_stub.ListEvents(filter)
[docs] def receive_block_stream(self):
if self.__rpc_stub is None:
return None
return self.__rpc_stub.ListBlockStream(rpc_pb2.Empty())
[docs] def get_blockchain_status(self):
if self.__rpc_stub is None:
return None
return self.__rpc_stub.Blockchain(rpc_pb2.Empty())
[docs] def get_accounts(self):
if self.__rpc_stub is None:
return None
return self.__rpc_stub.GetAccounts(rpc_pb2.Empty())
[docs] def get_block(self, query: bytes):
if self.__rpc_stub is None:
return None
v = rpc_pb2.SingleBytes()
v.value = query
return self.__rpc_stub.GetBlock(v)
[docs] def get_peers(self):
if self.__rpc_stub is None:
return None
return self.__rpc_stub.GetPeers(rpc_pb2.Empty())
[docs] def get_node_state(self, timeout: int):
if self.__rpc_stub is None:
return None
single_bytes = rpc_pb2.SingleBytes()
single_bytes.value = timeout.to_bytes(8, byteorder='little')
return self.__rpc_stub.NodeState(single_bytes)
[docs] def get_tx(self, tx_hash: bytes):
if self.__rpc_stub is None:
return None
single_bytes = rpc_pb2.SingleBytes()
single_bytes.value = tx_hash
return self.__rpc_stub.GetTX(single_bytes)
[docs] def get_block_tx(self, tx_hash: bytes):
if self.__rpc_stub is None:
return None
single_bytes = rpc_pb2.SingleBytes()
single_bytes.value = tx_hash
return self.__rpc_stub.GetBlockTX(single_bytes)
[docs] def unlock_account(self, address: bytes, passphrase: str):
if self.__rpc_stub is None:
return None
account = account_pb2.Account(address=address)
personal = rpc_pb2.Personal(passphrase=passphrase, account=account)
return self.__rpc_stub.UnlockAccount(request=personal)
[docs] def lock_account(self, address: bytes, passphrase: str):
if self.__rpc_stub is None:
return None
account = account_pb2.Account(address=address)
personal = rpc_pb2.Personal(passphrase=passphrase, account=account)
return self.__rpc_stub.LockAccount(request=personal)
# This RPC is for making and sending Tx inside a node.
# Don't use it for sending TX which is made by a client.
[docs] def send_tx(self, unsigned_tx: Transaction):
if self.__rpc_stub is None:
return None
return self.__rpc_stub.SendTX(convert_tx_to_grpc_tx(unsigned_tx))
# This RPC is for sending signed Txs made by a client.
[docs] def commit_txs(self, signed_txs: List[Transaction]):
if self.__rpc_stub is None:
return None
rpc_txs = []
for signed_tx in signed_txs:
rpc_txs.append(convert_tx_to_grpc_tx(signed_tx))
rpc_tx_list = blockchain_pb2.TxList()
rpc_tx_list.txs.extend(rpc_txs)
return self.__rpc_stub.CommitTX(rpc_tx_list)
[docs] def get_receipt(self, tx_hash: bytes):
if self.__rpc_stub is None:
return None
v = rpc_pb2.SingleBytes()
v.value = tx_hash
return self.__rpc_stub.GetReceipt(v)
[docs] def query_contract(self, sc_address: bytes, query_info: bytes):
query = blockchain_pb2.Query()
if self.__rpc_stub is None:
return None
query.contractAddress = sc_address
query.queryinfo = query_info
return self.__rpc_stub.QueryContract(query)
[docs] def query_contract_state(
self,
sc_address: bytes,
storage_keys: List[bytes],
root: bytes,
compressed: bool
):
if self.__rpc_stub is None:
return None
state_query = blockchain_pb2.StateQuery(contractAddress=sc_address,
storageKeys=storage_keys,
root=root,
compressed=compressed)
return self.__rpc_stub.QueryContractState(state_query)
[docs] def get_conf_change_progress(self, block_height: int):
if self.__rpc_stub is None:
return None
v = rpc_pb2.SingleBytes()
v.value = block_height.to_bytes(8, byteorder='little')
return self.__rpc_stub.GetConfChangeProgress(v)
[docs] def get_enterprise_config(self, key: str):
if self.__rpc_stub is None:
return None
v = rpc_pb2.EnterpriseConfigKey()
v.key = key
return self.__rpc_stub.GetEnterpriseConfig(v)
[docs] def get_name_info(self, name: str, block_height: int):
if self.__rpc_stub is None:
return None
n = rpc_pb2.Name()
n.name = name
n.blockNo = block_height
return self.__rpc_stub.GetNameInfo(n)
[docs] def get_abi(self, addr_bytes: bytes):
if self.__rpc_stub is None:
return None
v = rpc_pb2.SingleBytes()
v.value = addr_bytes
return self.__rpc_stub.GetABI(v)
[docs] def add_raft_member(
self,
request_id: int,
member_id: int,
member_name: str,
member_address: str,
member_peer_id: bytes
):
if self.__rpc_stub is None:
return None
ch = raft_pb2.MembershipChange()
ch.type = raft_pb2.ADD_MEMBER
ch.requestID = request_id
ch.attr = raft_pb2.MemberAttr()
ch.attr.ID = member_id
ch.attr.name = member_name
ch.attr.address = member_address
ch.attr.peerID = member_peer_id
return self.__rpc_stub.ChangeMembership(ch)
[docs] def del_raft_member(
self,
request_id: int,
member_id: int,
member_name: str,
member_address: str,
member_peer_id: bytes
):
if self.__rpc_stub is None:
return None
ch = raft_pb2.MembershipChange()
ch.type = raft_pb2.REMOVE_MEMBER
ch.requestID = request_id
ch.attr = raft_pb2.MemberAttr()
ch.attr.ID = member_id
ch.attr.name = member_name
ch.attr.address = member_address
ch.attr.peerID = member_peer_id
return self.__rpc_stub.ChangeMembership(ch)