# -*- coding: utf-8 -*-
import ecdsa
import hashlib
import json
from google.protobuf.json_format import (
MessageToJson,
Parse
)
from cryptography.exceptions import InvalidTag
from typing import (
Union,
Dict,
Optional
)
from .grpc import blockchain_pb2
from .obj import address as addr
from .obj import aer
from .obj import private_key as pk
from .utils.encoding import (
decode_root,
encode_private_key,
decode_private_key
)
from .utils import merkle_proof as mp
from .utils.encryption import (
encrypt_bytes,
decrypt_bytes,
decrypt_keystore_v1,
encrypt_to_keystore_v1
)
from .errors.general_exception import GeneralException
[docs]class Account:
""" Account can be a user account with private and public key,
or a contract account.
"""
def __init__(
self,
private_key: Union[str, bytes, None] = None,
empty: bool = False
) -> None:
if empty:
self.__private_key = None
self.__address = None
self.__state = None
self.__state_proof = None
return
self.__private_key = pk.PrivateKey(private_key)
self.__address = self.__private_key.address
self.__state = None
self.__state_proof = None
[docs] def json(
self,
password: Union[bytes, str, None] = None,
with_private_key: bool = False
) -> Dict:
state = self.state
is_state_proof = False
if self.__state_proof:
state = MessageToJson(self.__state_proof)
is_state_proof = True
if state:
state = json.loads(state)
account = {
"address": str(self.__address),
"balance": str(self.balance),
"nonce": str(self.nonce),
"state": state,
"is_state_proof": is_state_proof,
}
if password and self.__private_key is not None:
enc_key = Account.encrypt_account(self, password)
account["enc_key"] = encode_private_key(enc_key)
if with_private_key and self.__private_key is not None:
account["priv_key"] = str(self.__private_key)
return account
[docs] @staticmethod
def from_json(
data: Union[Dict, str],
password: Union[bytes, str, None] = None
) -> 'Account':
if isinstance(data, str):
data_dict: Dict = json.loads(data)
data = data_dict
# handle private key or encrypted key
priv_key = data.get('priv_key', None)
enc_key = data.get('enc_key', None)
if priv_key:
account = Account(private_key=priv_key)
elif enc_key:
enc_key = decode_private_key(enc_key)
if not password:
raise GeneralException("Must provide a password to decrypt")
account = Account.decrypt_account(enc_key, password)
else:
account = Account(empty=True)
# handle address
address = data.get('address', None)
if address is None:
raise ValueError("address cannot be None")
elif account.address is not None and address != str(account.address):
raise ValueError("address is irrelevant with the private key")
elif account.address is None:
account.address = address
# handle state of state_proof
state = data.get('state', None)
if state:
state = json.dumps(state)
is_state_proof = data.get('is_state_proof')
if is_state_proof:
state_proof_pb2 = blockchain_pb2.AccountProof()
Parse(state, state_proof_pb2, ignore_unknown_fields=True)
account.state_proof = state_proof_pb2
else:
state_pb2 = blockchain_pb2.State()
Parse(state, state_pb2, ignore_unknown_fields=True)
account.state = state_pb2
return account
[docs] @staticmethod
def decrypt_from_keystore(
keystore: Union[Dict, str],
password: str
) -> 'Account':
if isinstance(keystore, str):
keystore_dict: Dict = json.loads(keystore)
keystore = keystore_dict
try:
privkey_raw = decrypt_keystore_v1(keystore, password)
except AssertionError as e:
raise GeneralException("Failed to decrypt keystore") from e
return Account(private_key=privkey_raw)
[docs] @staticmethod
def encrypt_to_keystore(
account: 'Account',
password: str,
kdf_n: int = 2**18
) -> Dict:
if not account.private_key:
raise GeneralException("Account must have a private key")
try:
keystore = encrypt_to_keystore_v1(
bytes(account.private_key),
str(account.address),
password, kdf_n=kdf_n
)
except AssertionError as e:
raise GeneralException("Failed to encrypt keystore") from e
return keystore
def __str__(self) -> str:
return json.dumps(self.json(), indent=2)
[docs] def sign_msg_hash(self, msg_hash: bytes) -> Optional[bytes]:
return self.__private_key.sign_msg(msg_hash) if \
self.__private_key else None
[docs] def verify_sign(self, msg_hash: bytes, sign: bytes) -> Optional[bool]:
return self.__private_key.verify_sign(msg_hash, sign) if \
self.__private_key else None
@property
def private_key(self) -> Optional[pk.PrivateKey]:
return self.__private_key
@property
def public_key(self) -> Optional[ecdsa.ecdsa.Public_key]:
if self.__private_key is not None:
return self.__private_key.public_key
if self.__address is not None:
return self.__address.public_key
return None
@property
def address(self) -> Optional[addr.Address]:
return self.__address
@address.setter
def address(self, v: Union[str, bytes]) -> None:
if self.__address is not None:
raise ValueError('not empty account')
self.__address = addr.Address(None, empty=True)
self.__address.value = v # type: ignore
@property
def state(self) -> Optional[str]:
if self.__state is None:
return None
return MessageToJson(self.__state)
@state.setter
def state(self, v):
self.__state = v
@property
def state_proof(self):
return self.__state_proof
@state_proof.setter
def state_proof(self, v):
self.__state_proof = v
@property
def nonce(self):
if self.__state is None:
return -1
return self.__state.nonce
@nonce.setter
def nonce(self, v):
if self.__state.nonce > v:
raise ValueError(
"the nonce value should be bigger than the current nonce "
"value: {}".format(self.__state.nonce)
)
self.__state.nonce = v
@property
def balance(self):
if self.__state is None:
return aer.Aer()
return aer.Aer(int.from_bytes(self.__state.balance, 'big'))
@property
def code_hash(self):
if self.__state is None:
return None
return self.__state.codeHash
@property
def storage_root(self):
if self.__state is None:
return None
return self.__state.storageRoot
@property
def sql_recovery_point(self):
if self.__state is None:
return None
return self.__state.sqlRecoveryPoint
[docs] @staticmethod
def encrypt_account(
account: 'Account',
password: Union[str, bytes]
) -> bytes:
"""
https://cryptography.io/en/latest/hazmat/primitives/aead/
:param account: account to export
:return: encrypted account data (bytes)
"""
if not account.private_key:
raise GeneralException("Account must have a private key")
return encrypt_bytes(bytes(account.private_key), password)
[docs] @staticmethod
def decrypt_account(
encrypted_bytes: bytes,
password: Union[str, bytes]
) -> 'Account':
"""
https://cryptography.io/en/latest/hazmat/primitives/aead/
:param encrypted_bytes: encrypted data (bytes) of account
:param password: to decrypt the exported bytes
:return: account instance
"""
try:
dec_value = decrypt_bytes(encrypted_bytes, password)
except InvalidTag as e:
raise GeneralException(
"Fail to decrypt an account. Please check the password."
) from e
return Account(private_key=dec_value)
[docs] def verify_proof(self, root: Union[str, bytes]) -> bool:
""" verify that the given inclusion and exclusion proofs are
correct
"""
if self.__state_proof is None:
return False
if isinstance(root, str) and len(root) != 0:
root = decode_root(root)
if bytes(self.__address) != self.__state_proof.key:
return False
trie_key = hashlib.sha256(bytes(self.address)).digest()
value = hashlib.sha256(
self.__state_proof.state.SerializeToString()).digest()
ap = self.__state_proof.auditPath
if self.__state_proof.bitmap:
height = self.__state_proof.height
bitmap = self.__state_proof.bitmap
if self.__state_proof.inclusion:
return mp.verify_inclusion_c(
ap, height, bitmap, root, trie_key, value)
else:
return mp.verify_exclusion_c(
root, ap, height, bitmap, trie_key,
self.__state_proof.proofKey, self.__state_proof.proofVal
)
else:
if self.__state_proof.inclusion:
return mp.verify_inclusion(ap, root, trie_key, value)
else:
return mp.verify_exclusion(
root, ap, trie_key, self.__state_proof.proofKey,
self.__state_proof.proofVal
)