diff --git a/chaind_eth/runnable/send.py b/chaind_eth/runnable/send.py index a88d556..af9ea2e 100644 --- a/chaind_eth/runnable/send.py +++ b/chaind_eth/runnable/send.py @@ -102,18 +102,22 @@ class Outputter: def do(self, hx): - self.out(hx) + return self.out(hx) def do_standard_output(self, hx): - sys.stdout.write(hx + '\n') + #sys.stdout.write(hx + '\n') + return hx def do_unix_socket(self, hx): s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect(config.get('_SOCKET')) s.send(hx.encode('utf-8')) + r = s.recv(64+4) + logg.debug('r {}'.format(r)) s.close() + return r[4:].decode('utf-8') def main(): @@ -144,7 +148,7 @@ def main(): except StopIteration: break tx_hex = tx_bytes.hex() - out.do(tx_hex) + print(out.do(tx_hex)) if __name__ == '__main__': diff --git a/chaind_eth/runnable/server.py b/chaind_eth/runnable/server.py index 3c904b2..ef02173 100644 --- a/chaind_eth/runnable/server.py +++ b/chaind_eth/runnable/server.py @@ -127,8 +127,8 @@ session_index_backend = SessionIndex(config.get('SESSION_ID')) adapter = EthAdapter(backend, session_index_backend=session_index_backend) -def process_outgoing(chain_spec, adapter, rpc): - dispatcher = Dispatcher(chain_spec, adapter) +def process_outgoing(chain_spec, adapter, rpc, limit=100): + dispatcher = Dispatcher(chain_spec, adapter, limit=limit) session = adapter.create_session() r = dispatcher.process(rpc, session) session.close() @@ -163,7 +163,7 @@ def main(): srvs.settimeout(0.1) data_in = None try: - data_in = srvs.recv(1024) + data_in = srvs.recv(1048576) except BlockingIOError as e: logg.debug('block io error: {}'.format(e)) continue @@ -180,20 +180,34 @@ def main(): logg.debug('recv {} bytes'.format(len(data))) session = backend.create_session() tx_hash = None + signed_tx = None try: - tx_hash = adapter.add(data, chain_spec, session=session) + tx_hash = adapter.add(data_hex, chain_spec, session=session) except ValueError as e: - logg.error('invalid input: {}'.format(e)) + try: + signed_tx = adapter.get(data_hex, chain_spec, session=session) + except ValueError as e: + logg.error('invalid input: {}'.format(e)) - r = 1 if tx_hash != None: session.commit() - r = 0 - try: - r = srvs.send(r.to_bytes(4, byteorder='big')) - logg.debug('{} bytes sent'.format(r)) - except BrokenPipeError: - logg.debug('they just hung up. how rude.') + try: + r = int(0).to_bytes(4, byteorder='big') + r += strip_0x(tx_hash).encode('utf-8') + srvs.send(r) + logg.debug('{} bytes sent'.format(r)) + except BrokenPipeError: + logg.debug('they just hung up. how rude.') + elif signed_tx != None: + r = int(0).to_bytes(4, byteorder='big') + r += strip_0x(signed_tx).encode('utf-8') + try: + r = srvs.send(r) + except BrokenPipeError: + logg.debug('they just hung up. how useless.') + else: + r = srvs.send(int(1).to_bytes(4, byteorder='big')) + session.close() srvs.close() diff --git a/setup.cfg b/setup.cfg index adb3846..6987ffa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,4 +38,3 @@ console_scripts = chaind-eth-server = chaind_eth.runnable.server:main chaind-eth-syncer = chaind_eth.runnable.syncer:main chaind-eth-send = chaind_eth.runnable.send:main - chaind-eth-retry = chaind_eth.runnable.retry:main