254 lines
7.6 KiB
Python
254 lines
7.6 KiB
Python
# standard imports
|
||
import logging
|
||
import copy
|
||
|
||
# external imports
|
||
from chainlib.chain import ChainSpec
|
||
from chainlib.eth.tx import (
|
||
TxFormat,
|
||
TxFactory,
|
||
Tx,
|
||
receipt,
|
||
)
|
||
from chainlib.eth.connection import RPCConnection
|
||
from chainlib.eth.contract import (
|
||
ABIContractEncoder,
|
||
ABIContractType
|
||
)
|
||
from chainlib.eth.gas import OverrideGasOracle
|
||
from chainlib.eth.nonce import RPCNonceOracle
|
||
from chainlib.eth.address import is_address
|
||
from eth_token_index import TokenUniqueSymbolIndex
|
||
from eth_address_declarator import Declarator
|
||
from eth_address_declarator.declarator import AddressDeclarator
|
||
|
||
logg = logging.getLogger(__name__)
|
||
|
||
|
||
class CICEth:
|
||
|
||
def __init__(self, chain_spec, resources, proof, signer=None, rpc=None, fee_oracle=None, outputs_writer=None):
|
||
"""resources will be modified
|
||
"""
|
||
self.resources = resources
|
||
self.proof = proof
|
||
self.chain_spec = chain_spec
|
||
self.signer = signer
|
||
self.rpc = rpc
|
||
self.fee_oracle = fee_oracle
|
||
self.token_details = None
|
||
self.token_address = None
|
||
self.token_code = None
|
||
self.outputs = []
|
||
self.tx_format = TxFormat.RAW_ARGS
|
||
self.outputs_writer = outputs_writer
|
||
if self.rpc != None:
|
||
self.tx_format = TxFormat.JSONRPC
|
||
elif self.signer != None:
|
||
self.tx_format = TxFormat.RLP_SIGNED
|
||
|
||
|
||
def prepare_token(self, name, symbol, precision, code, extra=[], extra_types=[], positions=None):
|
||
self.token_details = {
|
||
'name': name,
|
||
'symbol': symbol,
|
||
'precision': precision,
|
||
'code': code,
|
||
'extra': extra,
|
||
'extra_types': extra_types,
|
||
'positions': positions,
|
||
}
|
||
|
||
|
||
def __detect_arg_type(self, v):
|
||
typ = None
|
||
try:
|
||
int(v, 10)
|
||
typ = ABIContractType.UINT256
|
||
except TypeError:
|
||
pass
|
||
|
||
if typ == None:
|
||
try:
|
||
vv = strip_0x(v)
|
||
if is_address(vv):
|
||
typ = ABIContractType.ADDRESS
|
||
else:
|
||
typ = ABIContractType.BYTES32
|
||
except ValueError:
|
||
pass
|
||
|
||
if typ == None:
|
||
try:
|
||
v.encode('utf-8')
|
||
typ = ABIContractType.STRING
|
||
except ValueError:
|
||
pass
|
||
|
||
if typ == None:
|
||
raise ValueError('cannot automatically determine type for value {}'.format(v))
|
||
|
||
logg.info('argument {} parsed as abi contract type {}'.format(typ.value))
|
||
|
||
return typ
|
||
|
||
|
||
def __order_args(self):
|
||
args = [
|
||
self.token_details['name'],
|
||
self.token_details['symbol'],
|
||
self.token_details['precision'],
|
||
]
|
||
args_types = [
|
||
ABIContractType.STRING.value,
|
||
ABIContractType.STRING.value,
|
||
ABIContractType.UINT256.value,
|
||
]
|
||
|
||
for i, x in enumerate(self.token_details['extra']):
|
||
args.append(x)
|
||
typ = None
|
||
if self.token_details['extra_types'] != None:
|
||
typ = self.token_details['extra_types'][i]
|
||
else:
|
||
typ = self.__detect_arg_type(x)
|
||
args_types.append(typ)
|
||
|
||
positions = self.token_details['positions']
|
||
if positions == None:
|
||
positions = list(range(len(args)))
|
||
|
||
return (args, args_types, positions)
|
||
|
||
|
||
def add_outputs(self, k, v):
|
||
logg.debug('adding outputs {} {}'.format(k, v))
|
||
self.outputs.append((k, v))
|
||
|
||
|
||
def get_outputs(self):
|
||
return self.outputs
|
||
|
||
|
||
def process_token(self, writer=None):
|
||
if writer == None:
|
||
writer = self.outputs_writer
|
||
|
||
logg.debug('ZZZZZZZZ token details {}'.format(self.token_details))
|
||
(args, args_types, positions) = self.__order_args()
|
||
|
||
enc = ABIContractEncoder()
|
||
|
||
for i in positions:
|
||
getattr(enc, args_types[i])(args[i])
|
||
|
||
code = self.token_details['code'] + enc.get()
|
||
|
||
signer_address = self.resources['token']['key_address']
|
||
nonce_oracle = None
|
||
if self.rpc != None:
|
||
nonce_oracle = RPCNonceOracle(signer_address, conn=self.rpc)
|
||
|
||
c = TxFactory(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.fee_oracle)
|
||
tx = c.template(signer_address, None, use_nonce=True)
|
||
tx = c.set_code(tx, code)
|
||
o = c.finalize(tx, self.tx_format)
|
||
|
||
r = None
|
||
if self.rpc != None:
|
||
r = self.rpc.do(o[1])
|
||
ro = receipt(r)
|
||
rr = self.rpc.do(ro)
|
||
rr = Tx.src_normalize(rr)
|
||
self.token_address = rr['contract_address']
|
||
elif self.signer != None:
|
||
r = o[1]
|
||
|
||
if r == None:
|
||
r = code
|
||
|
||
self.add_outputs('token', r)
|
||
return r
|
||
|
||
|
||
def process_token_index(self, writer=None):
|
||
if writer == None:
|
||
writer = self.outputs_writer
|
||
|
||
signer_address = self.resources['token_index']['key_address']
|
||
contract_address = self.resources['token_index']['reference']
|
||
|
||
gas_oracle = OverrideGasOracle(limit=TokenUniqueSymbolIndex.gas(), conn=self.rpc)
|
||
nonce_oracle = None
|
||
if self.rpc != None:
|
||
nonce_oracle = RPCNonceOracle(signer_address, conn=self.rpc)
|
||
c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
|
||
|
||
o = c.register(contract_address, signer_address, self.token_address, tx_format=self.tx_format)
|
||
r = None
|
||
if self.rpc != None:
|
||
r = self.rpc.do(o[1])
|
||
elif self.signer != None:
|
||
r = o[1]
|
||
else:
|
||
r = o
|
||
|
||
self.add_outputs('token_index', r)
|
||
return r
|
||
|
||
|
||
def process_address_declarator(self, writer=None):
|
||
if writer == None:
|
||
writer = self.outputs_writer
|
||
|
||
signer_address = self.resources['address_declarator']['key_address']
|
||
contract_address = self.resources['address_declarator']['reference']
|
||
|
||
gas_oracle = OverrideGasOracle(limit=AddressDeclarator.gas(), conn=self.rpc)
|
||
nonce_oracle = None
|
||
if self.rpc != None:
|
||
nonce_oracle = RPCNonceOracle(signer_address, conn=self.rpc)
|
||
c = Declarator(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
|
||
|
||
results = []
|
||
for proof in self.proof.get():
|
||
o = c.add_declaration(contract_address, signer_address, self.token_address, proof, tx_format=self.tx_format)
|
||
r = None
|
||
if self.rpc != None:
|
||
r = self.rpc.do(o[1])
|
||
elif self.signer != None:
|
||
r = o[1]
|
||
else:
|
||
r = o
|
||
self.add_outputs('address_declarator', r)
|
||
results.append(r)
|
||
|
||
return results
|
||
|
||
|
||
def process(self, writer=None):
|
||
if writer == None:
|
||
writer = self.outputs_writer
|
||
|
||
tasks = []
|
||
self.token_address = self.resources['token']['reference']
|
||
|
||
if self.token_address == None:
|
||
tasks.append('token')
|
||
|
||
for k in self.resources.keys():
|
||
if k == 'token':
|
||
continue
|
||
if self.resources[k]['reference'] != None:
|
||
tasks.append(k)
|
||
|
||
for task in tasks:
|
||
logg.debug('ciceth adapter process {}'.format(task))
|
||
r = getattr(self, 'process_' + task)(writer=writer)
|
||
|
||
return self.token_address
|
||
|
||
|
||
def new(resources, proof, signer_hint=None):
|
||
return CICEth(resources, proof, signer=None)
|