cic-cli/cic/ext/eth/__init__.py

205 lines
6.0 KiB
Python

# standard imports
import logging
import copy
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.tx import (
TxFormat,
TxFactory,
)
from chainlib.eth.connection import RPCConnection
from chainlib.eth.contract import (
ABIContractEncoder,
ABIContractType
)
from chainlib.eth.address import is_address
from eth_token_index import TokenUniqueSymbolIndex
from eth_address_declarator import Declarator
logg = logging.getLogger(__name__)
class CICEth:
def __init__(self, chain_spec, resources, proof, signer=None, rpc=None, nonce_oracle=None, fee_oracle=None):
"""resources will be modified
"""
self.resources = resources
self.proof = proof
self.chain_spec = chain_spec
self.signer = signer
self.rpc = rpc
self.nonce_oracle = nonce_oracle
self.fee_oracle = fee_oracle
self.token_details = None
self.token_address = None
self.token_code = None
self.outputs = []
self.tx_format = TxFormat.RAW_ARGS
if self.rpc != None:
self.tx_format = TxFormat.JSONRPC
elif self.signer != None:
self.tx_format = TxFormat.RLP_SIGNED
def prepare_token(self, name, symbol, precision, code, extra=[], extra_types=[], positions=None):
self.token_details = {
'name': name,
'symbol': symbol,
'precision': precision,
'code': code,
'extra': extra,
'extra_types': extra_types,
'positions': positions,
}
def __detect_arg_type(self, v):
typ = None
try:
int(v, 10)
typ = ABIContractType.UINT256
except TypeError:
pass
if typ == None:
try:
vv = strip_0x(v)
if is_address(vv):
typ = ABIContractType.ADDRESS
else:
typ = ABIContractType.BYTES32
except ValueError:
pass
if typ == None:
try:
v.encode('utf-8')
typ = ABIContractType.STRING
except ValueError:
pass
if typ == None:
raise ValueError('cannot automatically determine type for value {}'.format(v))
logg.info('argument {} parsed as abi contract type {}'.format(typ.value))
return typ
def __order_args(self):
args = [
self.token_details['name'],
self.token_details['symbol'],
self.token_details['precision'],
]
args_types = [
ABIContractType.STRING.value,
ABIContractType.STRING.value,
ABIContractType.UINT256.value,
]
for i, x in enumerate(self.token_details['extra']):
args.append(x)
typ = None
if self.token_details['extra_types'] != None:
typ = self.token_details['extra_types'][i]
else:
typ = self.__detect_arg_type(x)
args_types.append(typ)
positions = self.token_details['positions']
if positions == None:
positions = list(range(len(args)))
return (args, args_types, positions)
def process_token(self):
(args, args_types, positions) = self.__order_args()
enc = ABIContractEncoder()
for i in positions:
getattr(enc, args_types[i])(args[i])
code = self.token_details['code'] + enc.get()
signer_address = self.resources['token']['key_address']
c = TxFactory(self.chain_spec, signer=self.signer, nonce_oracle=self.nonce_oracle, gas_oracle=self.fee_oracle)
tx = c.template(signer_address, None, use_nonce=True)
tx = c.set_code(tx, code)
o = c.finalize(tx, self.tx_format)
r = None
if self.rpc != None:
r = self.rpc.do(o[1])
elif self.signer != None:
r = o[1]
if r == None:
r = code
self.outputs.append(r)
return r
def process_token_index(self):
c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=self.nonce_oracle, gas_oracle=self.fee_oracle)
contract_address = self.resources['token_index']['reference']
signer_address = self.resources['token_index']['key_address']
o = c.register(contract_address, signer_address, self.token_address, tx_format=self.tx_format)
r = None
if self.rpc != None:
r = self.rpc.do(o[1])
elif self.signer != None:
r = o[1]
else:
r = o
self.outputs.append(r)
return r
def process_address_declarator(self):
c = Declarator(self.chain_spec, signer=self.signer, nonce_oracle=self.nonce_oracle, gas_oracle=self.fee_oracle)
contract_address = self.resources['address_declarator']['reference']
signer_address = self.resources['address_declarator']['key_address']
r = []
for proof in self.proof.get():
o = c.add_declaration(contract_address, signer_address, self.token_address, proof, tx_format=self.tx_format)
if self.rpc != None:
r.append(self.rpc.do(o[1]))
elif self.signer != None:
r.append(o[1])
else:
r.append(o)
self.outputs += r
return r
def process(self):
tasks = []
self.token_address = self.resources['token']['reference']
if self.token_address == None:
tasks.append('token')
for k in self.resources.keys():
if k == 'token':
continue
if self.resources[k]['reference'] != None:
tasks.append(k)
for task in tasks:
getattr(self, 'process_' + task)()
def new(resources, proof, signer_hint=None):
return CICEth(resources, proof, signer=None)