Refactor import scripts

This commit is contained in:
Louis Holbrook
2021-02-21 15:41:37 +00:00
parent 1274958493
commit 96b4ad4a72
91 changed files with 5872 additions and 1261 deletions

View File

@@ -6,10 +6,13 @@
# standard imports
import logging
# third-party imports
# external imports
import celery
from cic_registry.chain import ChainSpec
#from cic_registry.chain import ChainSpec
from cic_registry import CICRegistry
from chainlib.chain import ChainSpec
# local imports
from cic_eth.eth.factory import TxFactory
from cic_eth.db.enum import LockEnum

View File

@@ -18,7 +18,11 @@ logg = celery_app.log.get_default_logger()
def redis(self, result, destination, status_code):
(host, port, db, channel) = destination.split(':')
r = redis_interface.Redis(host=host, port=port, db=db)
s = json.dumps(result)
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel))
r.publish(channel, s)
r.publish(channel, json.dumps(data))
r.close()

View File

@@ -114,6 +114,7 @@ class SessionBase(Model):
@staticmethod
def release_session(session=None):
session.flush()
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))

View File

@@ -24,9 +24,10 @@ class AccountRole(SessionBase):
tag = Column(Text)
address_hex = Column(String(42))
# TODO:
@staticmethod
def get_address(tag):
def get_address(tag, session):
"""Get Ethereum address matching the given tag
:param tag: Tag
@@ -34,14 +35,24 @@ class AccountRole(SessionBase):
:returns: Ethereum address, or zero-address if tag does not exist
:rtype: str, 0x-hex
"""
role = AccountRole.get_role(tag)
if role == None:
return zero_address
return role.address_hex
if session == None:
raise ValueError('nested bind session calls will not succeed as the first call to release_session in the stack will leave the db object detached further down the stack. We will need additional reference count.')
session = SessionBase.bind_session(session)
role = AccountRole.get_role(tag, session)
r = zero_address
if role != None:
r = role.address_hex
SessionBase.release_session(session)
return r
@staticmethod
def get_role(tag):
def get_role(tag, session=None):
"""Get AccountRole model object matching the given tag
:param tag: Tag
@@ -49,20 +60,26 @@ class AccountRole(SessionBase):
:returns: Role object, if found
:rtype: cic_eth.db.models.role.AccountRole
"""
session = AccountRole.create_session()
role = AccountRole.__get_role(session, tag)
session.close()
#return role.address_hex
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
SessionBase.release_session(session)
return role
@staticmethod
def __get_role(session, tag):
return session.query(AccountRole).filter(AccountRole.tag==tag).first()
def __get_role(tag, session):
q = session.query(AccountRole)
q = q.filter(AccountRole.tag==tag)
r = q.first()
session.flush()
return r
@staticmethod
def set(tag, address_hex):
def set(tag, address_hex, session=None):
"""Persist a tag to Ethereum address association.
This will silently overwrite the existing value.
@@ -74,16 +91,16 @@ class AccountRole(SessionBase):
:returns: Role object
:rtype: cic_eth.db.models.role.AccountRole
"""
#session = AccountRole.create_session()
#role = AccountRole.__get(session, tag)
role = AccountRole.get_role(tag) #session, tag)
session = SessionBase.bind_session(session)
role = AccountRole.get_role(tag, session)
if role == None:
role = AccountRole(tag)
role.address_hex = address_hex
#session.add(role)
#session.commit()
#session.close()
return role #address_hex
SessionBase.release_session(session)
return role
@staticmethod
@@ -95,20 +112,17 @@ class AccountRole(SessionBase):
:returns: Role tag, or None if no match
:rtype: str or None
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(AccountRole)
q = session.query(AccountRole)
q = q.filter(AccountRole.address_hex==address)
role = q.first()
tag = None
if role != None:
tag = role.tag
if session == None:
localsession.close()
SessionBase.release_session(session)
return tag

View File

@@ -52,7 +52,10 @@ class GasOracle():
:returns: Etheerum account address
:rtype: str, 0x-hex
"""
return AccountRole.get_address('GAS_GIFTER')
session = SessionBase.create_session()
a = AccountRole.get_address('GAS_GIFTER', session)
session.close()
return a
def gas_price(self, category='safe'):

View File

@@ -399,7 +399,7 @@ def refill_gas(self, recipient_address, chain_str):
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!='0x00')
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
session.close()

View File

@@ -76,8 +76,9 @@ def main():
t = api.create_account(register=register)
ps.get_message()
m = ps.get_message(timeout=args.timeout)
print(json.loads(m['data']))
o = ps.get_message(timeout=args.timeout)
m = json.loads(o['data'])
print(m['result'])
if __name__ == '__main__':

View File

@@ -91,6 +91,8 @@ run = True
class DispatchSyncer:
yield_delay = 0.005
def __init__(self, chain_spec):
self.chain_spec = chain_spec
self.chain_id = chain_spec.chain_id()
@@ -138,7 +140,10 @@ class DispatchSyncer:
txs[k] = utxs[k]
self.process(w3, txs)
time.sleep(interval)
if len(utxs) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
def main():

View File

@@ -5,9 +5,10 @@ import logging
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db.enum import StatusBits
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db import Otx
from cic_eth.db.models.otx import Otx
from cic_eth.queue.tx import get_paused_txs
from cic_eth.eth.task import create_check_gas_and_send_task
from .base import SyncFilter
@@ -39,7 +40,7 @@ class GasFilter(SyncFilter):
return
chain_spec = ChainSpec.from_chain_str(chain_str)
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session)
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id(), session=session)
SessionBase.release_session(session)

View File

@@ -15,6 +15,10 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f
class RegistrationFilter(SyncFilter):
def __init__(self, queue):
self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying registration filter')
registered_address = None
@@ -30,6 +34,6 @@ class RegistrationFilter(SyncFilter):
address,
str(chain_spec),
],
queue=queue,
queue=self.queue,
)
s.apply_async()

View File

@@ -178,7 +178,7 @@ def main():
tx_filter = TxFilter(queue)
registration_filter = RegistrationFilter()
registration_filter = RegistrationFilter(queue)
gas_filter = GasFilter(c.gas_provider(), queue)

View File

@@ -32,6 +32,7 @@ from cic_eth.admin import ctrl
from cic_eth.eth.rpc import RpcClient
from cic_eth.eth.rpc import GasOracle
from cic_eth.queue import tx
from cic_eth.queue import balance
from cic_eth.callbacks import Callback
from cic_eth.callbacks import http
from cic_eth.callbacks import tcp
@@ -49,6 +50,7 @@ argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='web3 provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
@@ -68,6 +70,7 @@ config.process()
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'ETH_PROVIDER': getattr(args, 'p'),
'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'),
}
@@ -228,7 +231,7 @@ def main():
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
chain_registry.add_oracle('naive_erc20_oracle', oracle)
chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
#chain_spec = CICRegistry.default_chain_spec

View File

@@ -21,7 +21,7 @@ class HistorySyncer(MinedSyncer):
:param mx: Maximum number of blocks to return in one call
:type mx: int
"""
def __init__(self, bc_cache, mx=20):
def __init__(self, bc_cache, mx=500):
super(HistorySyncer, self).__init__(bc_cache)
self.max = mx

View File

@@ -23,6 +23,8 @@ class MinedSyncer(Syncer):
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
"""
yield_delay = 0.005
def __init__(self, bc_cache):
super(MinedSyncer, self).__init__(bc_cache)
self.block_offset = 0
@@ -100,5 +102,8 @@ class MinedSyncer(Syncer):
block_number = self.process(c.w3, block.hex())
logg.info('processed block {} {}'.format(block_number, block.hex()))
self.bc_cache.disconnect()
time.sleep(interval)
if len(e) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
logg.info("Syncer no longer set to run, gracefully exiting")

View File

@@ -10,7 +10,7 @@ version = (
0,
10,
0,
'alpha.27',
'alpha.30',
)
version_object = semver.VersionInfo(

View File

@@ -2,4 +2,4 @@
registry_address =
chain_spec =
tx_retry_delay =
trust_address =
trust_address =

View File

@@ -0,0 +1,2 @@
[bancor]
dir = /usr/local/share/cic/bancor

View File

@@ -0,0 +1,3 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379

View File

@@ -0,0 +1,4 @@
[cic]
registry_address =
chain_spec = evm:bloxberg:8996
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -0,0 +1,2 @@
[custody]
account_index_address =

View File

@@ -0,0 +1,9 @@
[database]
NAME=cic_eth
USER=postgres
PASSWORD=tralala
HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=1

View File

@@ -0,0 +1,2 @@
[dispatcher]
loop_interval = 0.9

View File

@@ -0,0 +1,8 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:63545
gas_provider_address =
#chain_id =
abi_dir = /home/lash/src/ext/cic/grassrootseconomics/cic-contracts/abis
account_accounts_index_writer =

View File

@@ -0,0 +1,4 @@
[redis]
host = localhost
port = 63379
db = 0

View File

@@ -0,0 +1,5 @@
[signer]
socket_path = /tmp/crypto-dev-signer/jsonrpc.ipc
secret = deedbeef
database_name = signer_test
dev_keys_path =

View File

@@ -0,0 +1,6 @@
[SSL]
enable_client = false
cert_file =
key_file =
password =
ca_file =

View File

@@ -0,0 +1,2 @@
[SYNCER]
loop_interval = 1

View File

@@ -0,0 +1,3 @@
[tasks]
transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop
trace_queue_status = 1

View File

@@ -2,13 +2,13 @@ web3==5.12.2
celery==4.4.7
crypto-dev-signer~=0.4.13rc2
confini~=0.3.6b1
cic-registry~=0.5.3a18
cic-registry~=0.5.3a20
cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
websockets==8.1
requests~=2.24.0
eth_accounts_index~=0.0.10a7
eth_accounts_index~=0.0.10a10
erc20-approval-escrow~=0.3.0a5
erc20-single-shot-faucet~=0.2.0a6
rlp==2.0.1
@@ -18,5 +18,5 @@ eth-gas-proxy==0.0.1a4
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.0a8
chainlib~=0.0.1a12
chainlib~=0.0.1a16
hexathon~=0.0.1a3