Add tx sender processing filter
This commit is contained in:
parent
bbf38a5fe5
commit
0a3ab1b7a2
@ -119,3 +119,14 @@ def obsolete(chain_spec_dict, tx_hash, final):
|
||||
r = chainqueue.sql.state.obsolete_by_cache(chain_spec, tx_hash, final, session=session)
|
||||
session.close()
|
||||
return r
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def force_set(chain_spec_dict, tx_hash, status):
|
||||
tx_hash = tx_normalize.tx_hash(tx_hash)
|
||||
chain_spec = ChainSpec.from_dict(chain_spec_dict)
|
||||
session = SessionBase.create_session()
|
||||
r = chainqueue.sql.state.force_set(chain_spec, tx_hash, status, session=session)
|
||||
session.close()
|
||||
return r
|
||||
|
||||
|
@ -96,7 +96,14 @@ class QueueRecoveryFilter:
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
pass
|
||||
logg.debug('tx {}'.format(tx))
|
||||
for output in tx.outputs:
|
||||
recipient_bytes = bytes.fromhex(strip_0x(output))
|
||||
if self.bloom.check(recipient_bytes):
|
||||
o = self.account_registry.have(self.account_registry_address, output)
|
||||
r = self.rpc.do(o)
|
||||
if self.parse_have(r):
|
||||
self.add_recipient_tx(tx)
|
||||
|
||||
|
||||
def main():
|
||||
@ -114,8 +121,6 @@ def main():
|
||||
|
||||
syncers = []
|
||||
|
||||
#if SQLBackend.first(chain_spec):
|
||||
# backend = SQLBackend.initial(chain_spec, block_offset)
|
||||
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
|
||||
|
||||
if len(syncer_backends) == 0:
|
||||
@ -147,6 +152,7 @@ def main():
|
||||
for syncer in syncers:
|
||||
syncer.add_filter(account_filter)
|
||||
|
||||
r = syncer.loop(int(loop_interval), conn)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
Reference in New Issue
Block a user