diff --git a/apps/cic-eth/cic_eth/queue/state.py b/apps/cic-eth/cic_eth/queue/state.py index 82f222a5..d2b25b98 100644 --- a/apps/cic-eth/cic_eth/queue/state.py +++ b/apps/cic-eth/cic_eth/queue/state.py @@ -119,3 +119,14 @@ def obsolete(chain_spec_dict, tx_hash, final): r = chainqueue.sql.state.obsolete_by_cache(chain_spec, tx_hash, final, session=session) session.close() return r + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def force_set(chain_spec_dict, tx_hash, status): + tx_hash = tx_normalize.tx_hash(tx_hash) + chain_spec = ChainSpec.from_dict(chain_spec_dict) + session = SessionBase.create_session() + r = chainqueue.sql.state.force_set(chain_spec, tx_hash, status, session=session) + session.close() + return r + diff --git a/apps/cic-eth/cic_eth/runnable/recovery/rebuild.py b/apps/cic-eth/cic_eth/runnable/recovery/rebuild.py index 16b0b8e9..453bf399 100644 --- a/apps/cic-eth/cic_eth/runnable/recovery/rebuild.py +++ b/apps/cic-eth/cic_eth/runnable/recovery/rebuild.py @@ -96,7 +96,14 @@ class QueueRecoveryFilter: def filter(self, conn, block, tx, db_session=None): - pass + logg.debug('tx {}'.format(tx)) + for output in tx.outputs: + recipient_bytes = bytes.fromhex(strip_0x(output)) + if self.bloom.check(recipient_bytes): + o = self.account_registry.have(self.account_registry_address, output) + r = self.rpc.do(o) + if self.parse_have(r): + self.add_recipient_tx(tx) def main(): @@ -114,8 +121,6 @@ def main(): syncers = [] - #if SQLBackend.first(chain_spec): - # backend = SQLBackend.initial(chain_spec, block_offset) syncer_backends = SQLBackend.resume(chain_spec, block_offset) if len(syncer_backends) == 0: @@ -147,6 +152,7 @@ def main(): for syncer in syncers: syncer.add_filter(account_filter) + r = syncer.loop(int(loop_interval), conn) if __name__ == '__main__':