Compare commits

..

40 Commits

Author SHA1 Message Date
nolash
6429e18952 Add local docker cluster config sample file 2021-02-21 16:36:32 +01:00
nolash
36e7d53173 Plug remaning postgres leaks 2021-02-21 16:29:34 +01:00
nolash
42c441c82d Add docstrings 2021-02-21 13:52:20 +01:00
nolash
1727f75be8 Factor out sub args from traffic script 2021-02-21 11:00:24 +01:00
nolash
2b535e2f31 Factor out repeated code for most cli apps, along with classes specific to traffic script 2021-02-21 10:52:48 +01:00
nolash
93ae16b578 Short-circuit balance cache 2021-02-20 22:43:30 +01:00
nolash
5108d84635 Rehabilitate import scripts after adjustments for traffic generator 2021-02-20 21:12:46 +01:00
nolash
fdb16130a2 Do a better job of updating the create script 2021-02-20 20:28:14 +01:00
nolash
d8adcd47e1 WIP update cic-eth create script to new redis callback struct 2021-02-20 19:49:21 +01:00
nolash
8dd8db497c WIP add erc20 transfer traffic item 2021-02-20 19:17:08 +01:00
nolash
f2a0ef99ec WIP Add callback reception in traffic handler, change redis callback to same dict as http 2021-02-20 11:29:11 +01:00
nolash
e7958aaf9e Add canonical account create callback 2021-02-20 10:47:01 +01:00
nolash
7119d2e7ec Introduce traffic tasker, handling chain state retrieval 2021-02-20 09:39:28 +01:00
nolash
47b107c776 Add traffic router, redis subscription, dynamic traffic item module loading 2021-02-19 23:11:25 +01:00
nolash
2a9c74080f WIP hacking together traffic generator 2021-02-19 21:12:33 +01:00
nolash
bac4fb78ce More dependency yak shaving to change to new chainspec format 2021-02-19 15:36:49 +01:00
nolash
2c7c5ac0c4 Updte registry dependency 2021-02-19 14:40:51 +01:00
nolash
4cee277922 Updates related to chainspec changes 2021-02-19 14:01:55 +01:00
nolash
a92037e8f5 Bump registry, eth versions 2021-02-19 13:08:22 +01:00
nolash
3ea3ae6f2a WIP Fix more typos in daemons 2021-02-19 08:59:01 +01:00
nolash
8e65322462 Merge remote-tracking branch 'origin/master' into lash/import-scripts-refactor 2021-02-19 08:16:39 +01:00
nolash
4762856653 Move old cic-cache 2021-02-19 08:11:48 +01:00
nolash
8a43d67c72 Fix requirements file, add debug database flag to docker compose 2021-02-19 07:58:27 +01:00
nolash
107ae0b88e Revert session changes 2021-02-18 20:20:56 +01:00
nolash
9c687db9c0 Correct cic-types version 2021-02-18 19:59:06 +01:00
nolash
8449ff8b58 Rehabiilitate import scripts to handle sempo exports after changes 2021-02-18 16:38:09 +01:00
nolash
3431991565 Update README 2021-02-18 15:27:13 +01:00
nolash
c415a6b180 Add date of birth generation 2021-02-18 14:30:42 +01:00
nolash
3725130f82 Calculate correct content length in meta uploader, add create script 2021-02-18 14:10:59 +01:00
nolash
d3ace4cd65 WIP user generation revamp 2021-02-18 10:11:46 +01:00
nolash
00643f4cea Make import balance script run indepedently of import users commence 2021-02-18 00:20:32 +01:00
nolash
b19e8f2133 Update syncer engine 2021-02-17 12:49:13 +01:00
nolash
c99c26def4 Bump version 2021-02-17 12:01:04 +01:00
nolash
9dea78571d Merge branch 'master' into lash/import-scripts-refactor 2021-02-17 11:30:58 +01:00
nolash
8819a5d976 Add missing import scripts 2021-02-13 22:18:40 +01:00
nolash
32e578e96f Poke at 10-ish tx throttle slowing down migrations 2021-02-12 21:03:38 +01:00
nolash
08e8c58c30 Add import meta script 2021-02-12 20:40:18 +01:00
nolash
a9880c05a0 Remove dead code 2021-02-12 18:42:16 +01:00
nolash
6105703ca5 Add improved import scrips 2021-02-12 17:45:41 +01:00
nolash
1e148d28c0 Remove redundant dev folder 2021-02-12 17:40:51 +01:00
71 changed files with 969 additions and 896 deletions

View File

@@ -8,7 +8,6 @@ from cic_registry import zero_address
# local imports
from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.error import LockedError
@@ -117,10 +116,9 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
@celery_app.task()
def check_lock(chained_input, chain_str, lock_flags, address=None):
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
r = Lock.check(chain_str, lock_flags, address=zero_address)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
r |= Lock.check(chain_str, lock_flags, address=address)
if r > 0:
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
raise LockedError(r)

View File

@@ -457,7 +457,6 @@ class AdminApi:
tx_unpacked = unpack_signed_raw_tx(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
tx['gas_price'] = tx_unpacked['gasPrice']
tx['gas_limit'] = tx_unpacked['gas']
tx['data'] = tx_unpacked['data']
s = celery.signature(
'cic_eth.queue.tx.get_state_log',

View File

@@ -20,10 +20,5 @@ def tcp(self, result, destination, status_code):
(host, port) = destination.split(':')
logg.debug('tcp callback to {} {}'.format(host, port))
s.connect((host, int(port)))
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
s.send(json.dumps(data).encode('utf-8'))
s.send(json.dumps(result).encode('utf-8'))
s.close()

View File

@@ -1,28 +0,0 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -1,28 +0,0 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -114,6 +114,7 @@ class SessionBase(Model):
@staticmethod
def release_session(session=None):
session.flush()
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))

View File

@@ -55,9 +55,11 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Lock)
q = localsession.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -69,8 +71,7 @@ class Lock(SessionBase):
lock.address = address
lock.blockchain = chain_str
if tx_hash != None:
session.flush()
q = session.query(Otx)
q = localsession.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
if otx != None:
@@ -79,11 +80,12 @@ class Lock(SessionBase):
lock.flags |= flags
r = lock.flags
session.add(lock)
session.commit()
SessionBase.release_session(session)
localsession.add(lock)
localsession.commit()
if session == None:
localsession.close()
return r
@@ -108,9 +110,11 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Lock)
q = localsession.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -120,13 +124,14 @@ class Lock(SessionBase):
if lock != None:
lock.flags &= ~flags
if lock.flags == 0:
session.delete(lock)
localsession.delete(lock)
else:
session.add(lock)
localsession.add(lock)
r = lock.flags
session.commit()
localsession.commit()
SessionBase.release_session(session)
if session == None:
localsession.close()
return r
@@ -151,20 +156,22 @@ class Lock(SessionBase):
:rtype: number
"""
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Lock)
q = localsession.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
q = q.filter(Lock.flags.op('&')(flags)==flags)
lock = q.first()
if session == None:
localsession.close()
r = 0
if lock != None:
r = lock.flags & flags
SessionBase.release_session(session)
return r

View File

@@ -21,9 +21,12 @@ class Nonce(SessionBase):
@staticmethod
def get(address, session=None):
session = SessionBase.bind_session(session)
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Nonce)
q = localsession.query(Nonce)
q = q.filter(Nonce.address_hex==address)
nonce = q.first()
@@ -31,29 +34,28 @@ class Nonce(SessionBase):
if nonce != None:
nonce_value = nonce.nonce;
SessionBase.release_session(session)
if session == None:
localsession.close()
return nonce_value
@staticmethod
def __get(session, address):
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
def __get(conn, address):
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
nonce = r.fetchone()
session.flush()
if nonce == None:
return None
return nonce[0]
@staticmethod
def __set(session, address, nonce):
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
session.flush()
def __set(conn, address, nonce):
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
@staticmethod
def next(address, initial_if_not_exists=0, session=None):
def next(address, initial_if_not_exists=0):
"""Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@@ -65,32 +67,20 @@ class Nonce(SessionBase):
:returns: Nonce
:rtype: number
"""
session = SessionBase.bind_session(session)
SessionBase.release_session(session)
session.begin_nested()
#conn = Nonce.engine.connect()
conn = Nonce.engine.connect()
if Nonce.transactional:
#session.execute('BEGIN')
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
session.flush()
nonce = Nonce.__get(session, address)
conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
nonce = Nonce.__get(conn, address)
logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None:
nonce = initial_if_not_exists
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
session.flush()
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__set(session, address, nonce+1)
#if Nonce.transactional:
#session.execute('COMMIT')
#session.execute('UNLOCK TABLE nonce')
#conn.close()
session.commit()
session.commit()
SessionBase.release_session(session)
Nonce.__set(conn, address, nonce+1)
if Nonce.transactional:
conn.execute('COMMIT')
conn.close()
return nonce

View File

@@ -79,13 +79,6 @@ class Otx(SessionBase):
return r
def __status_not_set(self, status):
r = not(self.status & status)
if r:
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
"""Set block number transaction was mined in.
@@ -327,32 +320,6 @@ class Otx(SessionBase):
SessionBase.release_session(session)
def dequeue(self, session=None):
"""Marks that a process to execute send attempt is underway
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_not_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__reset_status(StatusBits.QUEUED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
"""Marks that transaction was mined but code execution did not succeed.
@@ -406,6 +373,18 @@ class Otx(SessionBase):
else:
self.__set_status(StatusEnum.OBSOLETED, session)
# if confirmed:
# if self.status != StatusEnum.OBSOLETED:
# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.CANCELLED, session)
# elif self.status != StatusEnum.OBSOLETED:
# if self.status > StatusEnum.SENT:
# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)

View File

@@ -40,14 +40,12 @@ class AccountRole(SessionBase):
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
role = AccountRole.get_role(tag, session)
r = zero_address
if role != None:
r = role.address_hex
session.flush()
SessionBase.release_session(session)
return r
@@ -65,8 +63,6 @@ class AccountRole(SessionBase):
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
session.flush()
SessionBase.release_session(session)
@@ -78,6 +74,7 @@ class AccountRole(SessionBase):
q = session.query(AccountRole)
q = q.filter(AccountRole.tag==tag)
r = q.first()
session.flush()
return r
@@ -96,12 +93,10 @@ class AccountRole(SessionBase):
"""
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
role = AccountRole.get_role(tag, session)
if role == None:
role = AccountRole(tag)
role.address_hex = address_hex
session.flush()
SessionBase.release_session(session)

View File

@@ -85,27 +85,26 @@ class TxCache(SessionBase):
:param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex
"""
session = SessionBase.bind_session(session)
localsession = SessionBase.bind_session(session)
q = session.query(TxCache)
q = localsession.query(TxCache)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_original)
txc = q.first()
if txc == None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
session.flush()
q = session.query(Otx)
q = localsession.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_new)
otx = q.first()
if otx == None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache(
@@ -116,21 +115,18 @@ class TxCache(SessionBase):
txc.destination_token_address,
int(txc.from_value),
int(txc.to_value),
session=session,
)
session.add(txc_new)
session.commit()
localsession.add(txc_new)
localsession.commit()
SessionBase.release_session(session)
SessionBase.release_session(localsession)
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
localsession = SessionBase.bind_session(session)
tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if tx == None:
SessionBase.release_session(session)
SessionBase.release_session(localsession)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
@@ -147,5 +143,5 @@ class TxCache(SessionBase):
self.date_updated = self.date_created
self.date_checked = self.date_created
SessionBase.release_session(session)
SessionBase.release_session(localsession)

View File

@@ -8,7 +8,6 @@ from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from erc20_single_shot_faucet import Faucet
from cic_registry import zero_address
from hexathon import strip_0x
# local import
from cic_eth.eth import RpcClient
@@ -22,7 +21,6 @@ from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError
from cic_eth.task import CriticalSQLAlchemyTask
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -36,7 +34,6 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
session=None,
):
"""Register an Ethereum account address with the on-chain account registry
@@ -59,7 +56,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(),
'value': 0,
})
return tx_add
@@ -69,7 +66,6 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
session=None,
):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
@@ -90,7 +86,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(),
'value': 0,
})
return tx_add
@@ -105,12 +101,11 @@ def unpack_register(data):
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
f = data[2:10]
if f != '0a3b0a4f':
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[8:]
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
@@ -125,19 +120,17 @@ def unpack_gift(data):
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
f = data[2:10]
if f != '63e4bff4':
raise ValueError('Invalid gift data ({})'.format(f))
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[8:]
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
# TODO: Separate out nonce initialization task
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def create(password, chain_str):
"""Creates and stores a new ethereum account in the keystore.
@@ -156,13 +149,9 @@ def create(password, chain_str):
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
# TODO: this can safely be set to zero, since we are randomly creating account
n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session()
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==a)
o = q.first()
session.flush()
o = session.query(Nonce).filter(Nonce.address_hex==a).first()
if o == None:
o = Nonce()
o.address_hex = a
@@ -173,7 +162,7 @@ def create(password, chain_str):
return a
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, throws=(RoleMissingError,))
def register(self, account_address, chain_str, writer_address=None):
"""Creates a transaction to add the given address to the accounts index.
@@ -189,22 +178,20 @@ def register(self, account_address, chain_str, writer_address=None):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session()
if writer_address == None:
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER')
if writer_address == zero_address:
session.close()
raise RoleMissingError(account_address)
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c)
tx_add = txf.add(account_address, chain_spec, session=session)
session.close()
tx_add = txf.add(account_address, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
gas_budget = tx_add['gas'] * tx_add['gasPrice']
@@ -222,7 +209,7 @@ def register(self, account_address, chain_str, writer_address=None):
return account_address
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True)
def gift(self, account_address, chain_str):
"""Creates a transaction to invoke the faucet contract for the given address.
@@ -337,7 +324,7 @@ def cache_gift_data(
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def cache_account_data(
tx_hash_hex,
tx_signed_raw_hex,

View File

@@ -32,10 +32,10 @@ class TxFactory:
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self, session=None):
def next_nonce(self):
"""Returns the current cached nonce value, and increments it for next transaction.
:returns: Nonce
:rtype: number
"""
return self.nonce_oracle.next(session=session)
return self.nonce_oracle.next()

View File

@@ -14,10 +14,10 @@ class NonceOracle():
self.default_nonce = default_nonce
def next(self, session=None):
def next(self):
"""Get next unique nonce.
:returns: Nonce
:rtype: number
"""
return Nonce.next(self.address, self.default_nonce, session=session)
return Nonce.next(self.address, self.default_nonce)

View File

@@ -33,7 +33,7 @@ def sign_tx(tx, chain_str):
return (tx_hash_hex, tx_transfer_signed['raw'],)
def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data
@@ -44,7 +44,6 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
:type queue: str
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
:type cache_task: str
:raises: sqlalchemy.exc.DatabaseError
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
@@ -52,13 +51,25 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
logg.debug('adding queue tx {}'.format(tx_hash_hex))
# s = celery.signature(
# 'cic_eth.queue.tx.create',
# [
# tx['nonce'],
# tx['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# chain_str,
# ],
# queue=queue,
# )
# TODO: consider returning this as a signature that consequtive tasks can be linked to
queue_create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
session=session,
)
if cache_task != None:

View File

@@ -8,8 +8,6 @@ import web3
from cic_registry import CICRegistry
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
from hexathon import strip_0x
from chainlib.status import Status as TxStatus
# platform imports
from cic_eth.db.models.tx import TxCache
@@ -21,10 +19,6 @@ from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -126,12 +120,11 @@ def unpack_transfer(data):
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
f = data[2:10]
if f != contract_function_signatures['transfer']:
raise ValueError('Invalid transfer data ({})'.format(f))
d = data[8:]
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
@@ -147,12 +140,11 @@ def unpack_transferfrom(data):
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
f = data[2:10]
if f != contract_function_signatures['transferfrom']:
raise ValueError('Invalid transferFrom data ({})'.format(f))
d = data[8:]
d = data[10:]
return {
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
@@ -169,19 +161,18 @@ def unpack_approve(data):
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
f = data[2:10]
if f != contract_function_signatures['approve']:
raise ValueError('Invalid approval data ({})'.format(f))
d = data[8:]
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
@celery_app.task(base=CriticalWeb3Task)
@celery_app.task()
def balance(tokens, holder_address, chain_str):
"""Return token balances for a list of tokens for given address
@@ -208,7 +199,7 @@ def balance(tokens, holder_address, chain_str):
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses
@@ -262,7 +253,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True)
def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address
@@ -316,7 +307,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
return tx_hash_hex
@celery_app.task(base=CriticalWeb3Task)
@celery_app.task()
def resolve_tokens_by_symbol(token_symbols, chain_str):
"""Returns contract addresses of an array of ERC20 token symbols
@@ -339,7 +330,7 @@ def resolve_tokens_by_symbol(token_symbols, chain_str):
return tokens
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def otx_cache_transfer(
tx_hash_hex,
tx_signed_raw_hex,
@@ -363,7 +354,7 @@ def otx_cache_transfer(
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def cache_transfer_data(
tx_hash_hex,
tx,
@@ -399,7 +390,7 @@ def cache_transfer_data(
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def otx_cache_approve(
tx_hash_hex,
tx_signed_raw_hex,
@@ -423,7 +414,7 @@ def otx_cache_approve(
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def cache_approve_data(
tx_hash_hex,
tx,
@@ -479,8 +470,6 @@ class ExtendedTx:
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
self.status = TxStatus.PENDING.name
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
@@ -508,18 +497,10 @@ class ExtendedTx:
self.destination_token_value = destination_value
def set_status(self, n):
if n:
self.status = TxStatus.ERROR.name
else:
self.status = TxStatus.SUCCESS.name
self.status_code = n
def to_dict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'to_dict']:
continue
o[attr] = getattr(self, attr)
return o

View File

@@ -32,10 +32,6 @@ from cic_eth.eth.nonce import NonceOracle
from cic_eth.error import AlreadyFillingGasError
from cic_eth.eth.util import tx_hex_string
from cic_eth.admin.ctrl import lock_send
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -44,7 +40,7 @@ MAX_NONCE_ATTEMPTS = 3
# TODO this function is too long
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, throws=(OutOfGasError))
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
@@ -135,7 +131,7 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True)
def hashes_to_txs(self, tx_hashes):
"""Return a list of raw signed transactions from the local transaction queue corresponding to a list of transaction hashes.
@@ -317,8 +313,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
@celery_app.task(bind=True, base=CriticalWeb3Task)
@celery_app.task(bind=True)
def send(self, txs, chain_str):
"""Send transactions to the network.
@@ -356,6 +351,13 @@ def send(self, txs, chain_str):
c = RpcClient(chain_spec)
r = None
try:
r = c.w3.eth.send_raw_transaction(tx_hex)
except Exception as e:
raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m)
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
@@ -364,14 +366,6 @@ def send(self, txs, chain_str):
],
queue=queue,
)
try:
r = c.w3.eth.send_raw_transaction(tx_hex)
except requests.exceptions.ConnectionError as e:
raise(e)
except Exception as e:
raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m)
s_set_sent.apply_async()
tx_tail = txs[1:]
@@ -386,8 +380,7 @@ def send(self, txs, chain_str):
return r.hex()
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
@celery_app.task(bind=True, throws=(AlreadyFillingGasError))
def refill_gas(self, recipient_address, chain_str):
"""Executes a native token transaction to fund the recipient's gas expenditures.
@@ -544,7 +537,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
return tx_hash_hex
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,))
def sync_tx(self, tx_hash_hex, chain_str):
queue = self.request.delivery_info['routing_key']
@@ -628,7 +621,7 @@ def resume_tx(self, txpending_hash_hex, chain_str):
return txpending_hash_hex
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def otx_cache_parse_tx(
tx_hash_hex,
tx_signed_raw_hex,
@@ -655,7 +648,7 @@ def otx_cache_parse_tx(
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def cache_gas_refill_data(
tx_hash_hex,
tx,

View File

@@ -149,9 +149,6 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
txs_by_block = {}
chain_spec = ChainSpec.from_chain_str(chain_str)
if isinstance(tx_batches, dict):
tx_batches = [tx_batches]
for b in tx_batches:
for v in b.values():
tx = None

View File

@@ -14,7 +14,6 @@ from cic_eth.db.enum import (
StatusBits,
dead,
)
from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app
@@ -36,7 +35,7 @@ def __balance_outgoing_compatible(token_address, holder_address, chain_str):
return delta
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def balance_outgoing(tokens, holder_address, chain_str):
"""Retrieve accumulated value of unprocessed transactions sent from the given address.
@@ -74,7 +73,7 @@ def __balance_incoming_compatible(token_address, receiver_address, chain_str):
return delta
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def balance_incoming(tokens, receipient_address, chain_str):
"""Retrieve accumulated value of unprocessed transactions to be received by the given address.

View File

@@ -10,7 +10,6 @@ from cic_registry.chain import ChainSpec
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx
from cic_eth.error import NotLocalTxError
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
celery_app = celery.current_app
@@ -18,7 +17,7 @@ logg = logging.getLogger()
# TODO: This method does not belong in the _queue_ module, it operates across queue and network
@celery_app.task(base=CriticalSQLAlchemyAndWeb3Task)
@celery_app.task()
def tx_times(tx_hash, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)

View File

@@ -26,7 +26,6 @@ from cic_eth.db.enum import (
is_alive,
dead,
)
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
from cic_eth.error import NotLocalTxError
from cic_eth.error import LockedError
@@ -87,7 +86,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
# TODO: Replace set_* with single task for set status
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_sent_status(tx_hash, fail=False):
"""Used to set the status after a send attempt
@@ -119,7 +118,7 @@ def set_sent_status(tx_hash, fail=False):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_final_status(tx_hash, block=None, fail=False):
"""Used to set the status of an incoming transaction result.
@@ -175,7 +174,7 @@ def set_final_status(tx_hash, block=None, fail=False):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_cancel(tx_hash, manual=False):
"""Used to set the status when a transaction is cancelled.
@@ -207,7 +206,7 @@ def set_cancel(tx_hash, manual=False):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_rejected(tx_hash):
"""Used to set the status when the node rejects sending a transaction to network
@@ -233,7 +232,7 @@ def set_rejected(tx_hash):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_fubar(tx_hash):
"""Used to set the status when an unexpected error occurs.
@@ -259,7 +258,7 @@ def set_fubar(tx_hash):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_manual(tx_hash):
"""Used to set the status when queue is manually changed
@@ -285,7 +284,7 @@ def set_manual(tx_hash):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_ready(tx_hash):
"""Used to mark a transaction as ready to be sent to network
@@ -311,25 +310,7 @@ def set_ready(tx_hash):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_dequeue(tx_hash):
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.dequeue(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def set_waitforgas(tx_hash):
"""Used to set the status when a transaction must be deferred due to gas refill
@@ -355,7 +336,7 @@ def set_waitforgas(tx_hash):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def get_state_log(tx_hash):
logs = []
@@ -374,7 +355,7 @@ def get_state_log(tx_hash):
return logs
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def get_tx_cache(tx_hash):
"""Returns an aggregate dictionary of outgoing transaction data and metadata
@@ -423,7 +404,7 @@ def get_tx_cache(tx_hash):
return tx
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def get_lock(address=None):
"""Retrieve all active locks
@@ -461,7 +442,7 @@ def get_lock(address=None):
return locks
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def get_tx(tx_hash):
"""Retrieve a transaction queue record by transaction hash
@@ -472,9 +453,7 @@ def get_tx(tx_hash):
:rtype: dict
"""
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if tx == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
@@ -490,7 +469,7 @@ def get_tx(tx_hash):
return o
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def get_nonce_tx(nonce, sender, chain_id):
"""Retrieve all transactions for address with specified nonce
@@ -673,7 +652,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
return txs
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task()
def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None):
"""Returns all local queue transactions for a given Ethereum address

View File

@@ -1,86 +0,0 @@
# standard imports
import logging
import copy
# external imports
from cic_registry import CICRegistry
from eth_token_index import TokenUniqueSymbolIndex
from eth_accounts_index import AccountRegistry
from chainlib.chain import ChainSpec
from cic_registry.chain import ChainRegistry
from cic_registry.helper.declarator import DeclaratorOracleAdapter
logg = logging.getLogger(__name__)
class TokenOracle:
def __init__(self, conn, chain_spec, registry):
self.tokens = []
self.chain_spec = chain_spec
self.registry = registry
token_registry_contract = CICRegistry.get_contract(chain_spec, 'TokenRegistry', 'Registry')
self.getter = TokenUniqueSymbolIndex(conn, token_registry_contract.address())
def get_tokens(self):
token_count = self.getter.count()
if token_count == len(self.tokens):
return self.tokens
for i in range(len(self.tokens), token_count):
token_address = self.getter.get_index(i)
t = self.registry.get_address(self.chain_spec, token_address)
token_symbol = t.symbol()
self.tokens.append(t)
logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address))
return copy.copy(self.tokens)
class AccountsOracle:
def __init__(self, conn, chain_spec, registry):
self.accounts = []
self.chain_spec = chain_spec
self.registry = registry
accounts_registry_contract = CICRegistry.get_contract(chain_spec, 'AccountRegistry', 'Registry')
self.getter = AccountRegistry(conn, accounts_registry_contract.address())
def get_accounts(self):
accounts_count = self.getter.count()
if accounts_count == len(self.accounts):
return self.accounts
for i in range(len(self.accounts), accounts_count):
account = self.getter.get_index(i)
self.accounts.append(account)
logg.debug('adding account {}'.format(account))
return copy.copy(self.accounts)
def init_registry(config, w3):
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry, True)
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
raise ValueError('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
return CICRegistry

View File

@@ -21,21 +21,14 @@ import cic_eth
from cic_eth.eth import RpcClient
from cic_eth.db import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import StatusBits
from cic_eth.db.enum import LockEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import (
get_upcoming_tx,
set_dequeue,
)
from cic_eth.queue.tx import get_upcoming_tx
from cic_eth.admin.ctrl import lock_send
from cic_eth.sync.error import LoopDone
from cic_eth.eth.tx import send as task_tx_send
from cic_eth.error import (
PermanentTxError,
TemporaryTxError,
NotLocalTxError,
)
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.eth.util import unpack_signed_raw_tx_hex
logging.basicConfig(level=logging.WARNING)
@@ -116,11 +109,6 @@ class DispatchSyncer:
for k in txs.keys():
tx_raw = txs[k]
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
try:
set_dequeue(tx['hash'])
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
@@ -141,13 +129,12 @@ class DispatchSyncer:
)
s_check.link(s_send)
t = s_check.apply_async()
logg.info('processed {}'.format(k))
def loop(self, w3, interval):
while run:
txs = {}
typ = StatusBits.QUEUED
typ = StatusEnum.READYSEND
utxs = get_upcoming_tx(typ, chain_id=self.chain_id)
for k in utxs.keys():
txs[k] = utxs[k]

View File

@@ -5,46 +5,37 @@ import logging
import web3
import celery
from cic_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
from chainlib.eth.address import to_checksum
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
# local imports
from .base import SyncFilter
from cic_eth.eth.token import (
unpack_transfer,
unpack_transferfrom,
)
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.token import ExtendedTx
from .base import SyncFilter
logg = logging.getLogger(__name__)
logg = logging.getLogger()
transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '63e4bff4' # keccak256(giveTo(address))
transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address))
class CallbackFilter(SyncFilter):
trusted_addresses = []
def __init__(self, chain_spec, method, queue):
def __init__(self, method, queue):
self.queue = queue
self.method = method
self.chain_spec = chain_spec
def call_back(self, transfer_type, result):
logg.debug('result {}'.format(result))
s = celery.signature(
self.method,
[
result,
transfer_type,
int(result['status_code'] == 0),
int(rcpt.status == 0),
],
queue=self.queue,
)
@@ -59,85 +50,58 @@ class CallbackFilter(SyncFilter):
# )
# s_translate.link(s)
# s_translate.apply_async()
t = s.apply_async()
return s
s.apply_async()
def parse_data(self, tx):
transfer_type = None
def parse_data(self, tx, rcpt):
transfer_type = 'transfer'
transfer_data = None
logg.debug('have payload {}'.format(tx.payload))
method_signature = tx.payload[:8]
method_signature = tx.input[:10]
logg.debug('tx status {}'.format(tx.status))
if method_signature == transfer_method_signature:
transfer_data = unpack_transfer(tx.payload)
transfer_data = unpack_transfer(tx.input)
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
elif method_signature == transferfrom_method_signature:
transfer_type = 'transferfrom'
transfer_data = unpack_transferfrom(tx.payload)
transfer_data = unpack_transferfrom(tx.input)
transfer_data['token_address'] = tx['to']
# TODO: do not rely on logs here
elif method_signature == giveto_method_signature:
transfer_type = 'tokengift'
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum(token_address)
logg.debug('resolved method {}'.format(transfer_type))
if transfer_data != None:
transfer_data['status'] = tx.status
transfer_data = unpack_gift(tx.input)
for l in rcpt.logs:
if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=l.data)
token_address_bytes = l.topics[2][32-20:]
transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex())
transfer_data['from'] = rcpt.to
return (transfer_type, transfer_data)
def filter(self, conn, block, tx, db_session=None):
chain_str = str(self.chain_spec)
def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
chain_str = str(chain_spec)
transfer_data = self.parse_data(tx, rcpt)
transfer_data = None
transfer_type = None
try:
(transfer_type, transfer_data) = self.parse_data(tx)
except TypeError:
logg.debug('invalid method data length for tx {}'.format(tx.hash))
if len(tx.input) < 10:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash']))
return
if len(tx.payload) < 8:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash))
return
logg.debug('checking callbacks filter input {}'.format(tx.payload[:8]))
logg.debug('checking callbacks filter input {}'.format(tx.input[:10]))
if transfer_data != None:
logg.debug('wtfoo {}'.format(transfer_data))
token_symbol = None
result = None
try:
tokentx = ExtendedTx(tx.hash, self.chain_spec)
tokentx = ExtendedTx(self.chain_spec)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
if transfer_data['status'] == 0:
tokentx.set_status(1)
else:
tokentx.set_status(0)
t = self.call_back(transfer_type, tokentx.to_dict())
logg.info('callback success task id {} tx {}'.format(t, tx.hash))
self.call_back(tokentx.to_dict())
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash))
def __str__(self):
return 'cic-eth callbacks'
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash.hex()))

View File

@@ -3,7 +3,6 @@ import logging
# third-party imports
from cic_registry.chain import ChainSpec
from hexathon import add_0x
# local imports
from cic_eth.db.enum import StatusBits
@@ -14,19 +13,20 @@ from cic_eth.queue.tx import get_paused_txs
from cic_eth.eth.task import create_check_gas_and_send_task
from .base import SyncFilter
logg = logging.getLogger(__name__)
logg = logging.getLogger()
class GasFilter(SyncFilter):
def __init__(self, chain_spec, queue=None):
def __init__(self, gas_provider, queue=None):
self.queue = queue
self.chain_spec = chain_spec
self.gas_provider = gas_provider
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
tx_hash_hex = add_0x(tx.hash)
if tx.value > 0:
def filter(self, w3, tx, rcpt, chain_str, session=None):
logg.debug('applying gas filter')
tx_hash_hex = tx.hash.hex()
if tx['value'] > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient)
@@ -35,26 +35,23 @@ class GasFilter(SyncFilter):
r = q.first()
if r == None:
logg.debug('unsolicited gas refill tx {}'.format(tx_hash_hex))
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
SessionBase.release_session(session)
return
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session)
chain_spec = ChainSpec.from_chain_str(chain_str)
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id(), session=session)
SessionBase.release_session(session)
logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
if len(txs) > 0:
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
s = create_check_gas_and_send_task(
list(txs.values()),
str(self.chain_spec),
str(chain_str),
r[0],
0,
tx_hashes_hex=list(txs.keys()),
queue=self.queue,
)
s.apply_async()
def __str__(self):
return 'eic-eth gasfilter'

View File

@@ -4,48 +4,36 @@ import logging
# third-party imports
import celery
from chainlib.eth.address import to_checksum
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from .base import SyncFilter
logg = logging.getLogger(__name__)
logg = logging.getLogger()
account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256))
class RegistrationFilter(SyncFilter):
def __init__(self, chain_spec, queue):
self.chain_spec = chain_spec
def __init__(self, queue):
self.queue = queue
def filter(self, conn, block, tx, db_session=None):
def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying registration filter')
registered_address = None
logg.debug('register filter checking log {}'.format(tx.logs))
for l in tx.logs:
event_topic_hex = l['topics'][0]
for l in rcpt['logs']:
event_topic_hex = l['topics'][0].hex()
if event_topic_hex == account_registry_add_log_hash:
# TODO: use abi conversion method instead
address_hex = strip_0x(l['topics'][1])[64-40:]
address = to_checksum(add_0x(address_hex))
address_bytes = l.topics[1][32-20:]
address = to_checksum(address_bytes.hex())
logg.debug('request token gift to {}'.format(address))
s = celery.signature(
'cic_eth.eth.account.gift',
[
address,
str(self.chain_spec),
str(chain_spec),
],
queue=self.queue,
)
s.apply_async()
def __str__(self):
return 'cic-eth account registration'

View File

@@ -3,47 +3,39 @@ import logging
# third-party imports
import celery
from hexathon import (
add_0x,
)
# local imports
from cic_eth.db.models.otx import Otx
from chainsyncer.db.models.base import SessionBase
from chainlib.status import Status
from cic_eth.db.models.base import SessionBase
from .base import SyncFilter
logg = logging.getLogger(__name__)
logg = logging.getLogger()
class TxFilter(SyncFilter):
def __init__(self, chain_spec, queue):
def __init__(self, queue):
self.queue = queue
self.chain_spec = chain_spec
def filter(self, conn, block, tx, db_session=None):
db_session = SessionBase.bind_session(db_session)
tx_hash_hex = tx.hash
otx = Otx.load(add_0x(tx_hash_hex), session=db_session)
def filter(self, w3, tx, rcpt, chain_spec, session=None):
session = SessionBase.bind_session(session)
logg.debug('applying tx filter')
tx_hash_hex = tx.hash.hex()
otx = Otx.load(tx_hash_hex, session=session)
SessionBase.release_session(session)
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.info('local tx match {}'.format(otx.tx_hash))
SessionBase.release_session(db_session)
logg.info('otx found {}'.format(otx.tx_hash))
s = celery.signature(
'cic_eth.queue.tx.set_final_status',
[
add_0x(tx_hash_hex),
tx.block.number,
tx.status == Status.ERROR,
tx_hash_hex,
rcpt.blockNumber,
rcpt.status == 0,
],
queue=self.queue,
)
t = s.apply_async()
return t
def __str__(self):
return 'cic-eth erc20 transfer filter'

View File

@@ -0,0 +1,207 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
import celery
import rlp
import web3
from web3 import HTTPProvider, WebsocketProvider
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from cic_registry import zero_address
from cic_registry.chain import ChainRegistry
from cic_registry.error import UnknownContractError
from cic_bancor.bancor import BancorRegistryClient
# local imports
import cic_eth
from cic_eth.eth import RpcClient
from cic_eth.db import SessionBase
from cic_eth.db import Otx
from cic_eth.db import TxConvertTransfer
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_paused_txs
from cic_eth.sync import Syncer
from cic_eth.sync.error import LoopDone
from cic_eth.db.error import UnknownConvertError
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.sync.backend import SyncerBackend
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.account import unpack_gift
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
GasFilter,
TxFilter,
RegistrationFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
c = RpcClient(chain_spec)
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry, True)
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main():
global chain_spec, c, queue
if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None:
CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True)
syncers = []
block_offset = c.w3.eth.blockNumber
chain = str(chain_spec)
if SyncerBackend.first(chain):
from cic_eth.sync.history import HistorySyncer
backend = SyncerBackend.initial(chain, block_offset)
syncer = HistorySyncer(backend)
syncers.append(syncer)
if args.mode == 'head':
from cic_eth.sync.head import HeadSyncer
block_sync = SyncerBackend.live(chain, block_offset+1)
syncers.append(HeadSyncer(block_sync))
elif args.mode == 'history':
from cic_eth.sync.history import HistorySyncer
backends = SyncerBackend.resume(chain, block_offset+1)
for backend in backends:
syncers.append(HistorySyncer(backend))
if len(syncers) == 0:
logg.info('found no unsynced history. terminating')
sys.exit(0)
else:
sys.stderr.write("unknown mode '{}'\n".format(args.mode))
sys.exit(1)
# bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry')
# bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec)
# bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR'))
# bancor_registry.load()
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_queue = queue
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(queue)
registration_filter = RegistrationFilter(queue)
gas_filter = GasFilter(c.gas_provider(), queue)
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.filter.append(gas_filter.filter)
syncer.filter.append(registration_filter.filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.filter.append(tx_filter.filter)
#syncer.filter.append(convert_filter)
for cf in callback_filters:
syncer.filter.append(cf.filter)
try:
syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')))
except LoopDone as e:
sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e))
i += 1
sys.exit(0)
if __name__ == '__main__':
main()

View File

@@ -1,168 +0,0 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
import celery
import rlp
import web3
from web3 import HTTPProvider, WebsocketProvider
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_registry import CICRegistry
from chainlib.chain import ChainSpec
from cic_registry import zero_address
from cic_registry.chain import ChainRegistry
from cic_registry.error import UnknownContractError
from chainlib.eth.connection import HTTPConnection
from chainlib.eth.block import (
block_latest,
)
from hexathon import (
strip_0x,
)
from chainsyncer.backend import SyncerBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_eth.registry import init_registry
from cic_eth.eth import RpcClient
from cic_eth.db import Otx
from cic_eth.db import TxConvertTransfer
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_paused_txs
#from cic_eth.sync import Syncer
#from cic_eth.sync.error import LoopDone
from cic_eth.db.error import UnknownConvertError
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.account import unpack_gift
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
GasFilter,
TxFilter,
RegistrationFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.y, '_KEYSTORE_FILE', True)
config.add(args.q, '_CELERY_QUEUE', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main():
# parse chain spec object
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# set up registry
w3 = cic_base.rpc.create(config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored
registry = init_registry(config, w3)
# Connect to blockchain with chainlib
conn = HTTPConnection(config.get('ETH_PROVIDER'))
o = block_latest()
r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('starting at block {}'.format(block_offset))
syncers = []
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
try:
syncers.append(HistorySyncer(syncer_backend))
logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend))
except AttributeError:
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_queue = config.get('_CELERY_QUEUE')
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.add_filter(gas_filter)
syncer.add_filter(registration_filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.add_filter(tx_filter)
for cf in callback_filters:
syncer.add_filter(cf)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
sys.exit(0)
if __name__ == '__main__':
main()

View File

@@ -18,7 +18,6 @@ import web3
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from cic_registry.chain import ChainRegistry
from hexathon import add_0x
# local imports
from cic_eth.api import AdminApi
@@ -37,8 +36,8 @@ default_abi_dir = '/usr/share/local/cic/solidity/abi'
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', type=str, help='CIC registry address')
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
@@ -62,16 +61,12 @@ config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
}
# override args
config.dict_override(args_override, 'cli args')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(add_0x(args.query), '_QUERY', True)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
@@ -153,20 +148,21 @@ def render_lock(o, **kwargs):
# TODO: move each command to submodule
def main():
logg.debug('len {}'.format(len(args.query)))
txs = []
renderer = render_tx
if len(config.get('_QUERY')) > 66:
txs = [admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'))]
elif len(config.get('_QUERY')) > 42:
txs = [admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'))]
elif len(config.get('_QUERY')) == 42:
txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False)
if len(args.query) > 66:
txs = [admin_api.tx(chain_spec, tx_raw=args.query)]
elif len(args.query) > 42:
txs = [admin_api.tx(chain_spec, tx_hash=args.query)]
elif len(args.query) == 42:
txs = admin_api.account(chain_spec, args.query, include_recipient=False)
renderer = render_account
elif len(config.get('_QUERY')) >= 4 and config.get('_QUERY')[:4] == 'lock':
elif len(args.query) >= 4 and args.query[:4] == 'lock':
txs = admin_api.get_lock()
renderer = render_lock
else:
raise ValueError('cannot parse argument {}'.format(config.get('_QUERY')))
raise ValueError('cannot parse argument {}'.format(args.query))
if len(txs) == 0:
logg.info('no matches found')

View File

@@ -100,7 +100,7 @@ class MinedSyncer(Syncer):
logg.debug('got blocks {}'.format(e))
for block in e:
block_number = self.process(c.w3, block.hex())
logg.debug('processed block {} {}'.format(block_number, block.hex()))
logg.info('processed block {} {}'.format(block_number, block.hex()))
self.bc_cache.disconnect()
if len(e) > 0:
time.sleep(self.yield_delay)

View File

@@ -1,33 +0,0 @@
# import
import requests
# external imports
import celery
import sqlalchemy
class CriticalTask(celery.Task):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
)
class CriticalWeb3Task(CriticalTask):
autoretry_for = (
requests.exceptions.ConnectionError,
)
class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError,
)

View File

@@ -10,7 +10,7 @@ version = (
0,
10,
0,
'alpha.36',
'alpha.30',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
DEBUG=1

View File

@@ -16,7 +16,7 @@ ARG root_requirement_file='requirements.txt'
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
@@ -42,6 +42,7 @@ COPY cic-eth/config/ /usr/local/etc/cic-eth/
COPY cic-eth/cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
COPY cic-eth/crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
RUN apt-get install -y git && \
git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi

View File

@@ -0,0 +1,5 @@
#!/bin/bash
. ./db.sh
/usr/local/bin/cic-eth-managerd $@

View File

@@ -1,5 +0,0 @@
#!/bin/bash
. ./db.sh
/usr/local/bin/cic-eth-trackerd $@

View File

@@ -1,8 +1,8 @@
web3==5.12.2
celery==4.4.7
crypto-dev-signer~=0.4.13rc3
confini~=0.3.6rc3
cic-registry~=0.5.3a22
crypto-dev-signer~=0.4.13rc2
confini~=0.3.6b1
cic-registry~=0.5.3a20
cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
@@ -18,7 +18,5 @@ eth-gas-proxy==0.0.1a4
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.0a8
chainlib~=0.0.1a19
chainlib~=0.0.1a16
hexathon~=0.0.1a3
chainsyncer~=0.0.1a19
cic-base==0.1.1a10

View File

@@ -45,7 +45,7 @@ scripts =
console_scripts =
# daemons
cic-eth-taskerd = cic_eth.runnable.daemons.tasker:main
cic-eth-trackerd = cic_eth.runnable.daemons.tracker:main
cic-eth-managerd = cic_eth.runnable.daemons.manager:main
cic-eth-dispatcherd = cic_eth.runnable.daemons.dispatcher:main
cic-eth-retrierd = cic_eth.runnable.daemons.retry:main
# tools

View File

@@ -37,7 +37,7 @@ def test_refill_gas(
eth_empty_accounts,
):
provider_address = AccountRole.get_address('GAS_GIFTER', init_database)
provider_address = AccountRole.get_address('GAS_GIFTER')
receiver_address = eth_empty_accounts[0]
c = init_rpc
@@ -93,7 +93,7 @@ def test_refill_deduplication(
eth_empty_accounts,
):
provider_address = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS', init_database)
provider_address = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
receiver_address = eth_empty_accounts[0]
c = init_rpc

View File

@@ -27,14 +27,14 @@ def test_states_initial(
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 13,
'nonce': 42,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': 42,
'chainId': 666,
'data': '',
}
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'foo:bar:42', None)
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'Foo:666', None)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
assert otx.status == StatusEnum.PENDING.value
@@ -43,7 +43,7 @@ def test_states_initial(
'cic_eth.eth.tx.check_gas',
[
[tx_hash_hex],
'foo:bar:42',
'Foo:666',
[tx_raw_signed_hex],
init_w3.eth.accounts[0],
8000000,
@@ -67,7 +67,7 @@ def test_states_initial(
'cic_eth.eth.tx.check_gas',
[
[tx_hash_hex],
'foo:bar:42',
'Foo:666',
[tx_raw_signed_hex],
init_w3.eth.accounts[0],
8000000,
@@ -94,14 +94,14 @@ def test_states_failed(
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 13,
'nonce': 42,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': 42,
'chainId': 666,
'data': '',
}
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'foo:bar:42', None)
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'Foo:666', None)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
otx.sendfail(session=init_database)
@@ -112,7 +112,7 @@ def test_states_failed(
'cic_eth.eth.tx.check_gas',
[
[tx_hash_hex],
'foo:bar:42',
'Foo:666',
[tx_raw_signed_hex],
init_w3.eth.accounts[0],
8000000,

View File

@@ -67,7 +67,7 @@ def test_callback_tcp(
logg.debug('recived {} '.format(data))
o = json.loads(echo)
try:
assert o['result'] == data
assert o == data
except Exception as e:
self.exception = e
@@ -130,7 +130,7 @@ def test_callback_redis(
o = json.loads(echo['data'])
logg.debug('recived {} '.format(o))
try:
assert o['result'] == data
assert o == data
except Exception as e:
self.exception = e

View File

@@ -9,18 +9,18 @@ def test_db_role(
foo = AccountRole.set('foo', eth_empty_accounts[0])
init_database.add(foo)
init_database.commit()
assert AccountRole.get_address('foo', init_database) == eth_empty_accounts[0]
assert AccountRole.get_address('foo') == eth_empty_accounts[0]
bar = AccountRole.set('bar', eth_empty_accounts[1])
init_database.add(bar)
init_database.commit()
assert AccountRole.get_address('bar', init_database) == eth_empty_accounts[1]
assert AccountRole.get_address('bar') == eth_empty_accounts[1]
foo = AccountRole.set('foo', eth_empty_accounts[2])
init_database.add(foo)
init_database.commit()
assert AccountRole.get_address('foo', init_database) == eth_empty_accounts[2]
assert AccountRole.get_address('bar', init_database) == eth_empty_accounts[1]
assert AccountRole.get_address('foo') == eth_empty_accounts[2]
assert AccountRole.get_address('bar') == eth_empty_accounts[1]
tag = AccountRole.role_for(eth_empty_accounts[2])
assert tag == 'foo'

View File

@@ -26,7 +26,7 @@ def test_set(
'data': '',
'chainId': 1,
}
(tx_hash, tx_signed) = sign_tx(tx_def, 'foo:bar:1')
(tx_hash, tx_signed) = sign_tx(tx_def, 'Foo:1')
otx = Otx(
tx_def['nonce'],
tx_def['from'],
@@ -82,7 +82,7 @@ def test_clone(
'data': '',
'chainId': 1,
}
(tx_hash, tx_signed) = sign_tx(tx_def, 'foo:bar:1')
(tx_hash, tx_signed) = sign_tx(tx_def, 'Foo:1')
otx = Otx(
tx_def['nonce'],
tx_def['from'],

View File

@@ -14,11 +14,11 @@ def test_unpack(
'gas': 21000,
'gasPrice': 200000000,
'data': '0x',
'chainId': 42,
'chainId': 8995,
}
(tx_hash, tx_raw) = sign_tx(tx, 'foo:bar:42')
(tx_hash, tx_raw) = sign_tx(tx, 'Foo:8995')
tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), 42)
tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), 8995)
assert tx_hash == tx_recovered['hash']

View File

@@ -0,0 +1,43 @@
# standard imports
import logging
# local imports
from cic_eth.sync.head import HeadSyncer
from cic_eth.sync.backend import SyncerBackend
logg = logging.getLogger()
def test_head(
init_rpc,
init_database,
init_eth_tester,
mocker,
eth_empty_accounts,
):
#backend = SyncBackend(eth_empty_accounts[0], 'foo')
block_number = init_rpc.w3.eth.blockNumber
backend = SyncerBackend.live('foo:666', block_number)
syncer = HeadSyncer(backend)
#init_eth_tester.mine_block()
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[0],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_one = init_rpc.w3.eth.sendTransaction(tx)
block_number = init_rpc.w3.eth.blockNumber
backend.set(block_number, 0)
b = syncer.get(init_rpc.w3)
tx = init_rpc.w3.eth.getTransactionByBlock(b[0], 0)
assert tx.hash.hex() == tx_hash_one.hex()

View File

@@ -0,0 +1,194 @@
# standard imports
import logging
# third-party imports
import pytest
from web3.exceptions import BlockNotFound
from cic_registry import CICRegistry
# local imports
from cic_eth.sync.history import HistorySyncer
from cic_eth.sync.head import HeadSyncer
#from cic_eth.sync import Syncer
from cic_eth.db.models.otx import OtxSync
from cic_eth.db.models.base import SessionBase
from cic_eth.sync.backend import SyncerBackend
logg = logging.getLogger()
class FinishedError(Exception):
pass
class DebugFilter:
def __init__(self, address):
self.txs = []
self.monitor_to_address = address
def filter(self, w3, tx, rcpt, chain_spec):
logg.debug('sync filter {}'.format(tx['hash'].hex()))
if tx['to'] == self.monitor_to_address:
self.txs.append(tx)
# hack workaround, latest block hash not found in eth_tester for some reason
if len(self.txs) == 2:
raise FinishedError('intentionally finished on tx {}'.format(tx))
def test_history(
init_rpc,
init_database,
init_eth_tester,
#celery_session_worker,
eth_empty_accounts,
):
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[0],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_one = init_rpc.w3.eth.sendTransaction(tx)
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[1],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_two = init_rpc.w3.eth.sendTransaction(tx)
init_eth_tester.mine_block()
block_number = init_rpc.w3.eth.blockNumber
live_syncer = SyncerBackend.live('foo:666', 0)
HeadSyncer(live_syncer)
history_syncers = SyncerBackend.resume('foo:666', block_number)
for history_syncer in history_syncers:
logg.info('history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target()))
backend = history_syncers[0]
syncer = HistorySyncer(backend)
fltr = DebugFilter(eth_empty_accounts[0])
syncer.filter.append(fltr.filter)
logg.debug('have txs {} {}'.format(tx_hash_one.hex(), tx_hash_two.hex()))
try:
syncer.loop(0.1)
except FinishedError:
pass
except BlockNotFound as e:
logg.error('the last block given in loop does not seem to exist :/ {}'.format(e))
check_hashes = []
for h in fltr.txs:
check_hashes.append(h['hash'].hex())
assert tx_hash_one.hex() in check_hashes
assert tx_hash_two.hex() in check_hashes
def test_history_multiple(
init_rpc,
init_database,
init_eth_tester,
#celery_session_worker,
eth_empty_accounts,
):
block_number = init_rpc.w3.eth.blockNumber
live_syncer = SyncerBackend.live('foo:666', block_number)
HeadSyncer(live_syncer)
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[0],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_one = init_rpc.w3.eth.sendTransaction(tx)
init_eth_tester.mine_block()
block_number = init_rpc.w3.eth.blockNumber
history_syncers = SyncerBackend.resume('foo:666', block_number)
for history_syncer in history_syncers:
logg.info('halfway history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target()))
live_syncer = SyncerBackend.live('foo:666', block_number)
HeadSyncer(live_syncer)
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[1],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_two = init_rpc.w3.eth.sendTransaction(tx)
init_eth_tester.mine_block()
block_number = init_rpc.w3.eth.blockNumber
history_syncers = SyncerBackend.resume('foo:666', block_number)
live_syncer = SyncerBackend.live('foo:666', block_number)
HeadSyncer(live_syncer)
for history_syncer in history_syncers:
logg.info('history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target()))
assert len(history_syncers) == 2
backend = history_syncers[0]
syncer = HistorySyncer(backend)
fltr = DebugFilter(eth_empty_accounts[0])
syncer.filter.append(fltr.filter)
try:
syncer.loop(0.1)
except FinishedError:
pass
except BlockNotFound as e:
logg.error('the last block given in loop does not seem to exist :/ {}'.format(e))
check_hashes = []
for h in fltr.txs:
check_hashes.append(h['hash'].hex())
assert tx_hash_one.hex() in check_hashes
backend = history_syncers[1]
syncer = HistorySyncer(backend)
fltr = DebugFilter(eth_empty_accounts[0])
syncer.filter.append(fltr.filter)
try:
syncer.loop(0.1)
except FinishedError:
pass
except BlockNotFound as e:
logg.error('the last block given in loop does not seem to exist :/ {}'.format(e))
check_hashes = []
for h in fltr.txs:
check_hashes.append(h['hash'].hex())
assert tx_hash_two.hex() in check_hashes
history_syncers = SyncerBackend.resume('foo:666', block_number)
assert len(history_syncers) == 0

View File

@@ -0,0 +1,79 @@
# third-party imports
import pytest
# local imports
from cic_eth.db.models.sync import BlockchainSync
from cic_eth.sync.backend import SyncerBackend
def test_scratch(
init_database,
):
with pytest.raises(ValueError):
s = SyncerBackend('Testchain:666', 13)
syncer = SyncerBackend.live('Testchain:666', 13)
s = SyncerBackend('Testchain:666', syncer.object_id)
def test_live(
init_database,
):
s = SyncerBackend.live('Testchain:666', 13)
s.connect()
assert s.db_object.target() == None
s.disconnect()
assert s.get() == (13, 0)
s.set(14, 1)
assert s.get() == (14, 1)
def test_resume(
init_database,
):
live = SyncerBackend.live('Testchain:666', 13)
live.set(13, 2)
resumes = SyncerBackend.resume('Testchain:666', 26)
assert len(resumes) == 1
resume = resumes[0]
assert resume.get() == (13, 2)
resume.set(13, 4)
assert resume.get() == (13, 4)
assert resume.start() == (13, 2)
assert resume.target() == 26
def test_unsynced(
init_database,
):
live = SyncerBackend.live('Testchain:666', 13)
live.set(13, 2)
resumes = SyncerBackend.resume('Testchain:666', 26)
live = SyncerBackend.live('Testchain:666', 26)
resumes[0].set(18, 12)
resumes = SyncerBackend.resume('Testchain:666', 42)
assert len(resumes) == 2
assert resumes[0].start() == (13, 2)
assert resumes[0].get() == (18, 12)
assert resumes[0].target() == 26
assert resumes[1].start() == (26, 0)
assert resumes[1].get() == (26, 0)
assert resumes[1].target() == 42

View File

@@ -7,22 +7,24 @@ def test_unpack(
):
tx = {
'nonce': 13,
'nonce': 42,
'from': init_w3_conn.eth.accounts[0],
'to': init_w3_conn.eth.accounts[1],
'data': '0xdeadbeef',
'value': 1024,
'gas': 23000,
'gasPrice': 1422521,
'chainId': 42,
'chainId': 1337,
}
(tx_hash, tx_signed) = sign_tx(tx, 'foo:bar:42')
(tx_hash, tx_signed) = sign_tx(tx, 'Foo:1337')
tx_unpacked = unpack_signed_raw_tx_hex(tx_signed, 42)
tx_unpacked = unpack_signed_raw_tx_hex(tx_signed, 1337)
for k in tx.keys():
assert tx[k] == tx_unpacked[k]
tx_str = tx_hex_string(tx_signed, 42)
assert tx_str == 'tx nonce 13 from 0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf to 0x2B5AD5c4795c026514f8317c7a215E218DcCD6cF hash 0x23ba3c2b400fbddcacc77d99644bfb17ac4653a69bfa46e544801fbd841b8f1e'
tx_str = tx_hex_string(tx_signed, 1337)
assert tx_str == 'tx nonce 42 from 0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf to 0x2B5AD5c4795c026514f8317c7a215E218DcCD6cF hash 0xe5aba32b1a7255d035faccb70cd8bb92c8c4a2f6bbea3f655bc5a8b802bbaa91'

View File

@@ -1,6 +1,5 @@
node_modules
dist
dist-web
dist-server
scratch
tests

View File

@@ -1,7 +1,6 @@
* 0.0.7-pending
- Add immutable content
- Add db lock on server
- Add ArgPair and KeyStore to src exports
* 0.0.6
- Add server build
* 0.0.5

View File

@@ -3,8 +3,11 @@ FROM node:15.3.0-alpine3.10
WORKDIR /tmp/src/cic-meta
COPY cic-meta/package.json \
cic-meta/package-lock.json \
./
RUN npm install
COPY cic-meta/src/ src/
COPY cic-meta/tests/ tests/
COPY cic-meta/scripts/ scripts/
@@ -12,8 +15,6 @@ COPY cic-meta/scripts/ scripts/
RUN alias tsc=node_modules/typescript/bin/tsc
RUN npm install
COPY cic-meta/.config/ /usr/local/etc/cic-meta/
# COPY cic-meta/scripts/server/initdb/server.postgres.sql /usr/local/share/cic-meta/sql/server.sql

View File

@@ -1,6 +1,6 @@
{
"name": "cic-client-meta",
"version": "0.0.7-alpha.2",
"version": "0.0.6",
"lockfileVersion": 1,
"requires": true,
"dependencies": {

View File

@@ -1,12 +1,12 @@
{
"name": "cic-client-meta",
"version": "0.0.7-alpha.3",
"version": "0.0.7-alpha.2",
"description": "Signed CRDT metadata graphs for the CIC network",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"test": "mocha -r node_modules/node-localstorage/register -r ts-node/register tests/*.ts",
"build": "node_modules/typescript/bin/tsc -d --outDir dist src/index.ts",
"build": "node_modules/typescript/bin/tsc -d --outDir dist && npm run build-server",
"build-server": "tsc -d --outDir dist-server scripts/server/*.ts",
"pack": "node_modules/typescript/bin/tsc -d --outDir dist && webpack",
"clean": "rm -rf dist",
@@ -32,12 +32,6 @@
"webpack-cli": "^4.2.0"
},
"author": "Louis Holbrook <dev@holbrook.no>",
"contributors": [
{
"name": "Spencer Ofwiti",
"email": "maxspencer56@gmail.com"
}
],
"license": "GPL-3.0-or-later",
"engines": {
"node": "~15.3.0"

View File

@@ -101,18 +101,6 @@ function parseDigest(url) {
async function processRequest(req, res) {
let digest = undefined;
const headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS, POST, GET, PUT",
"Access-Control-Max-Age": 2592000, // 30 days
"Access-Control-Allow-Headers": 'Access-Control-Allow-Origin, Content-Type, x-cic-automerge'
};
if (req.method === "OPTIONS") {
res.writeHead(200, headers);
res.end();
return;
}
if (!['PUT', 'GET', 'POST'].includes(req.method)) {
res.writeHead(405, {"Content-Type": "text/plain"});
@@ -213,7 +201,6 @@ async function processRequest(req, res) {
const responseContentLength = (new TextEncoder().encode(content)).length;
res.writeHead(200, {
"Access-Control-Allow-Origin": "*",
"Content-Type": contentType,
"Content-Length": responseContentLength,
});

View File

@@ -1,5 +1,5 @@
export { PGPSigner, PGPKeyStore, Signer, KeyStore } from './auth';
export { ArgPair,  Envelope, Syncable } from './sync';
export { PGPSigner, PGPKeyStore } from './auth';
export { Envelope, Syncable } from './sync';
export { User } from './assets/user';
export { Phone } from './assets/phone';
export { Config } from './config';

View File

@@ -93,7 +93,7 @@ function orderDict(src) {
return dst;
}
class Syncable implements JSONSerializable, Authoritative, Signable {
class Syncable implements JSONSerializable, Authoritative {
id: string
timestamp: number

View File

@@ -46,5 +46,3 @@ RUN cd cic-ussd && \
# copy config and migration files to definitive file so they can be referenced in path definitions for running scripts
COPY cic-ussd/.config/ /usr/local/etc/cic-ussd/
COPY cic-ussd/cic_ussd/db/migrations/ /usr/local/share/cic-ussd/alembic
WORKDIR /root

View File

@@ -105,14 +105,63 @@ RUN cd root && \
RUN cd cic-bancor/python && \
pip install --extra-index-url $pip_extra_index_url .
RUN echo installing common python tooling
ARG cic_python_commit=9bd1d5319aad8791ebf353707eabbb7b3e7ab5d0
ARG cic_python_url=https://gitlab.com/grassrootseconomics/cic-python.git/
RUN echo Install sum of python dependencies across all components && \
git clone --depth 1 $cic_python_url cic-python && \
cd cic-python && \
git fetch --depth 1 origin $cic_python_commit && \
git checkout $cic_python_commit && \
pip install --extra-index-url $pip_extra_index_url -r requirements.txt
RUN apt-get install -y cargo
ARG cic_base_version=0.1.1a10
RUN pip install --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version
RUN echo Install dev-only provisions
ARG cryptocurrency_cli_tools_version=0.0.4
RUN pip install --extra-index-url $pip_extra_index_url cryptocurrency-cli-tools==$cryptocurrency_cli_tools_version
ARG cic_registry_version=0.5.3a22
RUN echo Install smart contract interface implementations, least frequently changed first
ARG giftable_erc20_token_version=0.0.7b12
RUN pip install --extra-index-url $pip_extra_index_url giftable-erc20-token==$giftable_erc20_token_version
ARG eth_accounts_index_version=0.0.10a9
RUN pip install --extra-index-url $pip_extra_index_url eth-accounts-index==$eth_accounts_index_version
#ARG erc20_approval_escrow_version=0.3.0a4
#RUN pip install --extra-index-url $pip_extra_index_url erc20-approval-escrow==$erc20_approval_escrow_version
ARG erc20_transfer_authorization_version=0.3.0a9
RUN pip install --extra-index-url $pip_extra_index_url erc20-transfer-authorization==$erc20_transfer_authorization_version
#ARG erc20_single_shot_faucet_version=0.2.0a5
#RUN pip install --extra-index-url $pip_extra_index_url erc20-single-shot-faucet==$erc20_single_shot_faucet_version
ARG sarafu_faucet_version=0.0.1a11
RUN pip install --extra-index-url $pip_extra_index_url sarafu-faucet==$sarafu_faucet_version
ARG eth_address_index_version=0.1.0a10
RUN pip install --extra-index-url $pip_extra_index_url eth-address-index==$eth_address_index_version
RUN echo Install cic specific python packages
ARG cic_registry_version=0.5.3a20+build.30d0026c
RUN pip install --extra-index-url $pip_extra_index_url cic-registry==$cic_registry_version
RUN echo Install misc helpers
ARG crypto_dev_signer_version=0.4.13rc2
RUN pip install --extra-index-url $pip_extra_index_url crypto-dev-signer==$crypto_dev_signer_version
ARG eth_gas_proxy_version=0.0.1a4
RUN pip install --extra-index-url $pip_extra_index_url eth-gas-proxy==$eth_gas_proxy_version
ARG cic_contracts_version=0.0.2a2
RUN pip install --extra-index-url $pip_extra_index_url cic-contracts==$cic_contracts_version
ARG chainlib_version=0.0.1a16
RUN pip install --extra-index-url $pip_extra_index_url chainlib==$chainlib_version
ARG chainsyncer_version=0.0.1a8
RUN pip install --extra-index-url $pip_extra_index_url chainsyncer==$chainsyncer_version
WORKDIR /root
COPY contract-migration/testdata/pgp testdata/pgp
@@ -128,4 +177,6 @@ COPY contract-migration/seed_cic_eth.sh seed_cic_eth.sh
COPY contract-migration/sarafu_declaration.json sarafu_declaration.json
COPY contract-migration/keystore keystore
LABEL version="4"
ENTRYPOINT [ "/bin/bash" ]

0
apps/contract-migration/reset.sh Executable file → Normal file
View File

View File

@@ -49,7 +49,7 @@ This will monitor new mined blocks and send balances to the newly created accoun
Without any modifications to the cluster and config files:
`python import_users.py -c config --redis-host-callback redis <datadir>`
`python -c config --redis-host-callback redis <datadir>`
** A note on the The callback**: The script uses a redis callback to retrieve the newly generated custodial address. This is the redis server _from the perspective of the cic-eth component_.

View File

@@ -48,7 +48,7 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--token-symbol', default='SRF', type=str, dest='token_symbol', help='Token symbol to use for trnsactions')
@@ -101,7 +101,6 @@ else:
chain_spec = ChainSpec.from_chain_str(chain_str)
old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
@@ -147,7 +146,7 @@ class Handler:
logg.error('no import record of address {}'.format(recipient))
return
u = Person.deserialize(o)
original_address = u.identities[old_chain_spec.engine()]['{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id())][0]
original_address = u.identities['evm'][old_chain_spec_str][0]
balance = self.balances[original_address]
# TODO: store token object in handler ,get decimals from there
@@ -192,7 +191,7 @@ class BlockGetter:
def progress_callback(block_number, tx_index, s):
sys.stdout.write(str(s).ljust(200) + "\n")
sys.stdout.write(s.ljust(200) + "\n")

View File

@@ -20,7 +20,7 @@ from hexathon import (
from chainlib.eth.address import to_checksum
from cic_types.models.person import Person
from cic_eth.api.api_task import Api
from chainlib.chain import ChainSpec
from cic_registry.chain import ChainSpec
from cic_types.processor import generate_metadata_pointer
logging.basicConfig(level=logging.WARNING)
@@ -88,7 +88,7 @@ batch_delay = args.batch_delay
def register_eth(i, u):
redis_channel = str(uuid.uuid4())
ps.subscribe(redis_channel)
#ps.get_message()
ps.get_message()
api = Api(
config.get('CIC_CHAIN_SPEC'),
queue=args.q,
@@ -98,24 +98,18 @@ def register_eth(i, u):
)
t = api.create_account(register=True)
while True:
ps.get_message()
m = ps.get_message(timeout=args.timeout)
address = None
if m['type'] == 'subscribe':
logg.debug('skipping subscribe message')
continue
try:
r = json.loads(m['data'])
address = r['result']
break
except TypeError as e:
if m == None:
logg.critical('empty response from redis callback (did the service crash?)')
else:
logg.critical('unexpected response from redis callback: {}'.format(m))
sys.exit(1)
logg.debug('[{}] register eth {} {}'.format(i, u, address))
ps.get_message()
m = ps.get_message(timeout=args.timeout)
try:
r = json.loads(m['data'])
address = r['result']
except TypeError as e:
if m == None:
logg.critical('empty response from redis callback (did the service crash?)')
else:
logg.critical('unexpected response from redis callback: {}'.format(m))
sys.exit(1)
logg.debug('[{}] register eth {} {}'.format(i, u, address))
return address
@@ -148,8 +142,7 @@ if __name__ == '__main__':
new_address = register_eth(i, u)
if u.identities.get('evm') == None:
u.identities['evm'] = {}
sub_chain_str = '{}:{}'.format(chain_spec.common_name(), chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [new_address]
u.identities['evm'][chain_str] = [new_address]
register_ussd(u)

View File

@@ -1,3 +1,12 @@
cic-base[full_graph]==0.1.1a10
cic-eth==0.10.0a36
cic-types==0.1.0a8
psycopg2==2.8.6
chainlib~=0.0.1a15
chainsyncer~=0.0.1a10
cic-eth==0.10.0a30+build.fdb16130
cic-registry~=0.5.3a19
confini~=0.3.6rc3
celery==4.4.7
redis==3.5.3
hexathon~=0.0.1a3
faker==4.17.1
cic-types==0.1.0a7+build.1c254367
eth-accounts-index~=0.0.10a10

View File

@@ -11,7 +11,7 @@ import csv
import json
import urllib
# external imports
# external impotts
import celery
import eth_abi
import confini
@@ -19,9 +19,9 @@ from hexathon import (
strip_0x,
add_0x,
)
from cic_registry.chain import ChainSpec
from chainsyncer.backend import MemBackend
from chainsyncer.driver import HeadSyncer
from chainlib.chain import ChainSpec
from chainlib.eth.connection import HTTPConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.block import (
@@ -53,7 +53,7 @@ config_dir = '/usr/local/etc/cic-syncer'
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
@@ -139,7 +139,6 @@ class Verifier:
o = self.erc20_tx_factory.erc20_balance(self.token_address, address)
r = self.conn.do(o)
actual_balance = int(strip_0x(r), 16)
balance = int(balance / 1000000) * 1000000
logg.debug('balance for {}: {}'.format(address, balance))
if balance != actual_balance:
raise VerifierError((actual_balance, balance), 'balance')
@@ -255,6 +254,7 @@ def main():
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10**12
i = 0
while True:
l = f.readline()
@@ -266,7 +266,7 @@ def main():
sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r")
except ValueError:
break
balance = int(r[1].rstrip())
balance = int(int(r[1].rstrip()) / remove_zeros)
balances[address] = balance
i += 1
@@ -294,10 +294,8 @@ def main():
u = Person.deserialize(o)
logg.debug('data {}'.format(u.identities['evm']))
subchain_str = '{}:{}'.format(chain_spec.common_name(), chain_spec.network_id())
new_address = u.identities['evm'][subchain_str][0]
subchain_str = '{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id())
old_address = u.identities['evm'][subchain_str][0]
new_address = u.identities['evm'][chain_str][0]
old_address = u.identities['evm'][old_chain_str][0]
balance = balances[old_address]
logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance))

View File

@@ -31,7 +31,7 @@ set -e
set -a
# We need to not install these here...
pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a36 chainlib==0.0.1a19 cic-contracts==0.0.2a2
pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a30+build.fdb16130 chainlib==0.0.1a16
>&2 echo "create account for gas gifter"
old_gas_provider=$DEV_ETH_ACCOUNT_GAS_PROVIDER

0
apps/contract-migration/wait-for-it.sh Executable file → Normal file
View File

View File

@@ -2,18 +2,18 @@ africastalking==1.2.3
alembic==1.4.2
bcrypt==3.2.0
celery==4.4.7
confini==0.3.6rc3
crypto-dev-signer==0.4.13rc3
confini==0.3.6a1
crypto-dev-signer==0.4.13rc2
cryptography==3.2.1
ecuth==0.4.5a1
eth-accounts-index==0.0.10a10
eth-address-index==0.1.0a10
eth-accounts-index==0.0.10a7
eth-address-index==0.1.0a7
eth-tester==0.5.0b3
erc20-transfer-authorization==0.3.0a9
erc20-approval-escrow==0.3.0a5
erc20-single-shot-faucet==0.2.0a6
faker==4.17.1
http-hoba-auth==0.2.0
moolb==0.1.1.b2
moolb==0.1.0
phonenumbers==8.12.12
psycopg2==2.8.6
py-eth==0.1.1
@@ -40,8 +40,6 @@ websockets==8.1
yaml-acl==0.0.1
rlp==2.0.1
cryptocurrency-cli-tools==0.0.4
giftable-erc20-token==0.0.7b12
giftable-erc20-token==0.0.7b7
hexathon==0.0.1a3
chainlib==0.0.1a19
chainsyncer==0.0.1a19
cic-registry==0.5.3.a22
chainlib==0.0.1a12

View File

@@ -40,12 +40,11 @@ before_script:
- echo "$CI_REGISTRY_PASSWORD" | docker login -u "$CI_REGISTRY_USER" $CI_REGISTRY --password-stdin
variables:
CI_DEBUG_TRACE: "true"
IMAGE_TAG_BASE: $CI_REGISTRY_IMAGE/$APP_NAME:$CI_COMMIT_BRANCH-$CI_COMMIT_SHORT_SHA
LATEST_TAG: $CI_REGISTRY_IMAGE/$APP_NAME:latest
IMAGE_TAG: $CI_REGISTRY_IMAGE/$APP_NAME:$CI_COMMIT_BRANCH-$CI_COMMIT_SHORT_SHA
LATEST_TAG: $CI_REGISTRY_IMAGE/$APP_NAME:$CI_COMMIT_BRANCH-latest
script:
- export IMAGE_TAG="$IMAGE_TAG_BASE-$(date +%F.%H%M%S)"
- docker build -t $IMAGE_TAG -f $DOCKERFILE_PATH .
- docker push "$IMAGE_TAG
- docker push $IMAGE_TAG
- docker tag $IMAGE_TAG $LATEST_TAG
- docker push $LATEST_TAG
rules:

View File

@@ -161,7 +161,7 @@ services:
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
/usr/local/bin/cic-cache-tracker -v
/usr/local/bin/cic-cache-tracker -vv
volumes:
- contract-config:/tmp/cic/config/:ro
@@ -240,7 +240,44 @@ services:
./start_tasker.sh -q cic-eth -vv
# command: [/bin/sh, "./start_tasker.sh", -q, cic-eth, -vv ]
cic-eth-tracker:
cic-eth-manager-head:
build:
context: apps/
dockerfile: cic-eth/docker/Dockerfile
environment:
ETH_PROVIDER: http://eth:8545
DATABASE_USER: ${DATABASE_USER:-grassroots}
DATABASE_HOST: ${DATABASE_HOST:-postgres}
DATABASE_PASSWORD: ${DATABASE_PASSWORD:-tralala}
DATABASE_NAME: ${DATABASE_NAME_CIC_CACHE:-cic_eth}
DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: 1
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS
#BANCOR_DIR: $BANCOR_DIR
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
depends_on:
- eth
- postgres
- redis
deploy:
restart_policy:
condition: on-failure
volumes:
- contract-config:/tmp/cic/config/:ro
command:
- /bin/bash
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_manager.sh head -v
# command: "/root/start_manager.sh head -vv"
cic-eth-manager-history:
build:
context: apps/
dockerfile: cic-eth/docker/Dockerfile
@@ -264,19 +301,18 @@ services:
- eth
- postgres
- redis
deploy:
restart_policy:
condition: on-failure
#deploy:
#restart_policy:
# condition: on-failure
volumes:
- contract-config:/tmp/cic/config/:ro
- contract-config:/tmp/cic/config/:ro
command:
- /bin/bash
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_tracker.sh -v -c /usr/local/etc/cic-eth
# command: "/root/start_manager.sh head -vv"
./start_manager.sh history -v
# command: "/root/start_manager.sh history -vv"
cic-eth-dispatcher:
build:
@@ -291,13 +327,14 @@ services:
DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS
#BANCOR_DIR: $BANCOR_DIR
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
DATABASE_DEBUG: ${DATABASE_DEBUG:-false}
DATABASE_DEBUG: ${DATABASE_DEBUG:-true}
depends_on:
- eth