Source code for aergo.herapy.utils.converter

# -*- coding: utf-8 -*-

"""Common utility module for converting types."""

import hashlib
import json
import toml
import socket
import ecdsa
import time
from typing import (
    Union,
)

from ..obj.aergo_conf import AergoConfig
from ..grpc import blockchain_pb2
from ..constants import (
    PUBLIC_KEY_UNCOMPRESSED,
    PUBLIC_KEY_COMPRESSED_E,
    PUBLIC_KEY_COMPRESSED_O
)
from .encoding import (
    encode_b58,
    encode_address
)


[docs]def convert_toml_to_aergo_conf(v: str) -> AergoConfig: aergo_conf = AergoConfig() conf = toml.loads(v) for k, v in conf.items(): if isinstance(v, dict): for k2, v2 in v.items(): aergo_conf.add_conf(k2, v2, k) else: aergo_conf.add_conf(k, v) return aergo_conf
[docs]def convert_aergo_conf_to_toml(aergo_conf: AergoConfig) -> str: return toml.dumps(aergo_conf.conf)
[docs]def convert_tx_to_grpc_tx(tx): grpc_tx = blockchain_pb2.Tx() grpc_tx.hash = bytes(tx.tx_hash) if tx.nonce is not None: grpc_tx.body.nonce = tx.nonce if tx.from_address is not None: grpc_tx.body.account = bytes(tx.from_address) if tx.to_address is not None: grpc_tx.body.recipient = bytes(tx.to_address) if tx.amount is not None: grpc_tx.body.amount = bytes(tx.amount) if tx.payload is not None: grpc_tx.body.payload = tx.payload grpc_tx.body.gasLimit = tx.gas_limit grpc_tx.body.gasPrice = bytes(tx.gas_price) grpc_tx.body.type = tx.tx_type.value grpc_tx.body.chainIdHash = tx.chain_id if tx.sign is not None: grpc_tx.body.sign = tx.sign return grpc_tx
[docs]def tx_to_grpc_tx(v): return convert_tx_to_grpc_tx(v)
[docs]def convert_tx_to_json(tx): if tx is None: return None return tx.json()
[docs]def tx_to_json(v): return convert_tx_to_json(v)
[docs]def convert_tx_to_formatted_json(tx): if tx is None: return None return json.dumps(convert_tx_to_json(tx), indent=2)
[docs]def tx_to_formatted_json(v): return convert_tx_to_formatted_json(v)
[docs]def convert_bytes_to_int_str(v): return ''.join('{:d} '.format(x) for x in v)
[docs]def bytes_to_int_str(v): return convert_bytes_to_int_str(v)
[docs]def convert_bytes_to_hex_str(v): return ''.join('0x{:02x} '.format(x) for x in v)
[docs]def convert_ip_bytes_to_str(ip): if isinstance(ip, str): return ip # IPv4 if 4 == len(ip): return socket.inet_ntoa(ip) elif 16 == len(ip) and all(v2 == 0 for v2 in list(ip[:10])) \ and 255 == ip[10] and 255 == ip[11]: return socket.inet_ntoa(ip[12:16]) # IPv6 return socket.inet_ntop(socket.AF_INET6, ip)
[docs]def convert_bigint_to_bytes(number: int) -> bytes: q, r = divmod(len(bin(number)) - 2, 8) bytes_to_fit_number = q if r == 0 else q + 1 return number.to_bytes(bytes_to_fit_number, 'big')
[docs]def bigint_to_bytes(v: int) -> bytes: return convert_bigint_to_bytes(v)
[docs]def convert_public_key_to_bytes( pubkey, curve=ecdsa.SECP256k1, compressed=True ) -> bytes: if not isinstance(pubkey, ecdsa.ecdsa.Public_key): raise TypeError('value is not a valid public key') x = pubkey.point.x() x_bytes = ecdsa.util.number_to_string(x, curve.order) y = pubkey.point.y() if compressed: if y % 2 == 0: head = PUBLIC_KEY_COMPRESSED_E else: head = PUBLIC_KEY_COMPRESSED_O y_bytes = b'' else: head = PUBLIC_KEY_UNCOMPRESSED y_bytes = ecdsa.util.number_to_string(y, curve.order) return head + x_bytes + y_bytes
[docs]def public_key_to_bytes(pubkey, curve=ecdsa.SECP256k1, compressed=True): return convert_public_key_to_bytes(pubkey=pubkey, curve=curve, compressed=compressed)
[docs]def convert_bytes_to_public_key( v: bytes, curve: ecdsa.curves.Curve = ecdsa.SECP256k1 ) -> ecdsa.ecdsa.Public_key: if not isinstance(v, bytes): raise TypeError('value is not bytes') head = v[:1] if head not in (PUBLIC_KEY_UNCOMPRESSED, PUBLIC_KEY_COMPRESSED_O, PUBLIC_KEY_COMPRESSED_E): # can be a smart contract address, so no error raise ValueError("public key is not proper") x_bytes = v[1:curve.baselen + 1] x = ecdsa.util.string_to_number(x_bytes) if PUBLIC_KEY_UNCOMPRESSED == head: y_bytes = v[curve.baselen + 1:] y = ecdsa.util.string_to_number(y_bytes) else: a = curve.curve.a() b = curve.curve.b() p = curve.curve.p() if ecdsa.SECP256k1 == curve: # source: https://stackoverflow.com/questions/43629265/ # deriving-an-ecdsa-uncompressed-public-key-from-a-compressed-one?rq=1 y_square = (pow(x, 3, p) + a * x + b) % p y_square_square_root = pow(y_square, (p + 1) // 4, p) if ((head == PUBLIC_KEY_COMPRESSED_E and y_square_square_root & 1) or (head == PUBLIC_KEY_COMPRESSED_O and not y_square_square_root & 1)): y = (-y_square_square_root) % p else: y = y_square_square_root else: # if supporting more formula, need to implement assert 1 == 0 point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order) return ecdsa.ecdsa.Public_key(curve.generator, point)
[docs]def bytes_to_public_key(v, curve=ecdsa.SECP256k1): return convert_bytes_to_public_key(v, curve=curve)
[docs]def get_hash( *strings, no_rand: bool = False, no_encode: bool = False ) -> Union[str, bytes, None]: m = hashlib.sha256() if not no_rand: m.update(int(time.time()).to_bytes(8, 'little')) for string in strings: if isinstance(string, str): string = string.encode('utf-8') m.update(string) if no_encode: return m.digest() return encode_b58(m.digest())
[docs]def privkey_to_address( privkey: bytes, curve: ecdsa.curves.Curve = ecdsa.SECP256k1, compressed: bool = True ) -> str: d = int.from_bytes(privkey, byteorder='big') pubkey_point = curve.generator * d pubkey = ecdsa.ecdsa.Public_key(curve.generator, pubkey_point) pubkey_bytes = convert_public_key_to_bytes(pubkey, curve, compressed) return encode_address(pubkey_bytes)