Compare commits

...

5 Commits

Author SHA1 Message Date
nolash
cd5305b332 Add send cli 2021-04-06 21:29:12 +02:00
nolash
2101a23ccb Remove logline 2021-04-06 21:18:27 +02:00
nolash
af97b7799f Enhance task, queue discovery 2021-04-06 20:06:19 +02:00
Louis Holbrook
7728f38f14 Merge branch 'lash/4k' into 'master'
Make 40k import pass

See merge request grassrootseconomics/cic-internal-integration!86
2021-04-06 15:14:04 +00:00
Louis Holbrook
a305aafc86 Make 40k import pass 2021-04-06 15:14:04 +00:00
22 changed files with 149 additions and 51 deletions

View File

@@ -61,8 +61,8 @@ def balance(tokens, holder_address, chain_spec_dict):
for t in tokens:
address = t['address']
token = ERC20Token(rpc, address)
c = ERC20()
token = ERC20Token(chain_spec, rpc, address)
c = ERC20(chain_spec)
o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o)
t['balance_network'] = c.parse_balance(r)

View File

@@ -171,6 +171,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
except NotFoundEthException as e:
pass
# TODO: apply receipt in tx object to validate and normalize input
if rcpt != None:
success = rcpt['status'] == 1
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
@@ -180,6 +181,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
[
tx_hash_hex,
rcpt['blockNumber'],
rcpt['transactionIndex'],
not success,
],
queue=queue,

View File

@@ -23,7 +23,7 @@ def translate_address(address, trusted_addresses, chain_spec, sender_address=ZER
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
c = AddressDeclarator()
c = AddressDeclarator(chain_spec)
for trusted_address in trusted_addresses:
o = c.declaration(declarator_address, trusted_address, address, sender_address=sender_address)

View File

@@ -20,10 +20,10 @@ def set_sent(chain_spec_dict, tx_hash, fail=False):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_final(chain_spec_dict, tx_hash, block=None, fail=False):
def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.state.set_final(chain_spec, tx_hash, block, fail, session=session)
r = chainqueue.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
session.close()
return r

View File

@@ -39,6 +39,7 @@ class TxFilter(SyncFilter):
self.chain_spec.asdict(),
add_0x(tx_hash_hex),
tx.block.number,
tx.index,
tx.status == Status.ERROR,
],
queue=self.queue,

View File

@@ -80,7 +80,7 @@ straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
# logg.debug('submitting tx {} for retry'.format(tx_hash))
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
[
# [
# tx_hash,
# chain_str,
# LockEnum.QUEUE,

View File

@@ -66,7 +66,6 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def main():
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
@@ -76,7 +75,10 @@ def main():
o = block_latest()
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
block_current = int(r, 16)
block_offset = block_current + 1
stat = init_chain_stat(rpc, block_current)
logg.debug('starting at block {}'.format(block_offset))

View File

@@ -10,7 +10,7 @@ version = (
0,
11,
0,
'alpha.4',
'beta.1',
)
version_object = semver.VersionInfo(

View File

@@ -29,7 +29,7 @@ RUN /usr/local/bin/python -m pip install --upgrade pip
# python merge_requirements.py | tee merged_requirements.txt
#RUN cd cic-base && \
# pip install $pip_extra_index_url_flag -r ./merged_requirements.txt
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a60
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a61
COPY cic-eth/scripts/ scripts/
COPY cic-eth/setup.cfg cic-eth/setup.py ./

View File

@@ -2,7 +2,7 @@ cic-base~=0.1.2a60
celery==4.4.7
crypto-dev-signer~=0.4.14a17
confini~=0.3.6rc3
cic-eth-registry~=0.5.4a11
cic-eth-registry~=0.5.4a12
#cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
@@ -19,7 +19,7 @@ eth-address-index~=0.1.1a7
chainlib~=0.0.2a4
hexathon~=0.0.1a7
chainsyncer~=0.0.1a21
chainqueue~=0.0.1a5
chainqueue~=0.0.1a7
pysha3==1.0.2
coincurve==15.0.0
sarafu-faucet~=0.0.2a16
sarafu-faucet~=0.0.2a19

View File

@@ -37,7 +37,7 @@ def test_tx(
eth_rpc,
eth_signer,
agent_roles,
celery_worker,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')

View File

@@ -37,7 +37,7 @@ def test_tx(
eth_rpc,
eth_signer,
agent_roles,
celery_worker,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')

View File

@@ -38,11 +38,12 @@ def test_transfer_api(
agent_roles,
cic_registry,
register_tokens,
register_lookups,
celery_session_worker,
):
#token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
foo_token_cache = ERC20Token(eth_rpc, foo_token)
foo_token_cache = ERC20Token(default_chain_spec, eth_rpc, foo_token)
api = Api(str(default_chain_spec), callback_param='transfer', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.transfer(custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024, foo_token_cache.symbol)

View File

@@ -116,7 +116,7 @@ def test_register_account(
init_eth_tester.mine_block()
c = AccountRegistry()
c = AccountRegistry(default_chain_spec)
o = c.have(account_registry, eth_empty_accounts[0], sender_address=call_sender)
r = eth_rpc.do(o)
assert int(strip_0x(r), 16) == 1

View File

@@ -43,7 +43,7 @@ def test_filter_process(
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
init_eth_tester.mine_blocks(13)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 1024)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
@@ -56,7 +56,7 @@ def test_filter_process(
# external tx
init_eth_tester.mine_blocks(28)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 512)
eth_rpc.do(o)
o = receipt(tx_hash_hex)

View File

@@ -3,6 +3,7 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@@ -15,6 +16,29 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@@ -22,17 +46,9 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@@ -44,12 +60,17 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result
t = celery.group(signatures)()
return t

View File

@@ -0,0 +1,76 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46
cic_base[full_graph]~=0.1.2a61

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= 0.4.0a2
version= 0.4.0a3
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -45,3 +45,4 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main

View File

@@ -57,9 +57,9 @@ WORKDIR /home/grassroots
USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.2a60
ARG cic_eth_version=0.11.0a4
ARG sarafu_faucet_version=0.0.2a16
ARG cic_base_version=0.1.2a61
ARG cic_eth_version=0.11.0b1
ARG sarafu_faucet_version=0.0.2a19
ARG cic_contracts_version=0.0.2a2
RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \
cic-eth==$cic_eth_version \

View File

@@ -124,10 +124,6 @@ def register_eth(i, u):
return address
def register_ussd(u):
pass
if __name__ == '__main__':
#fi = open(os.path.join(user_out_dir, 'addresses.csv'), 'a')
@@ -155,8 +151,6 @@ if __name__ == '__main__':
sub_chain_str = '{}:{}'.format(chain_spec.common_name(), chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [new_address]
register_ussd(u)
new_address_clean = strip_0x(new_address)
filepath = os.path.join(
user_new_dir,

View File

@@ -1,5 +1,5 @@
cic-base[full_graph]==0.1.2a60
sarafu-faucet==0.0.2a16
cic-eth==0.11.0a4
cic-base[full_graph]==0.1.2a61
sarafu-faucet==0.0.2a17
cic-eth==0.11.0b1
cic-types==0.1.0a10
crypto-dev-signer==0.4.14a17