diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py index e92a2c6..968a587 100644 --- a/chainqueue/runnable/server.py +++ b/chainqueue/runnable/server.py @@ -76,7 +76,7 @@ class SessionController: self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) self.srv.bind(config.get('SESSION_SOCKET_PATH')) self.srv.listen(2) - self.srv.settimeout(1.0) + self.srv.settimeout(4.0) def shutdown(self, signo, frame): if self.dead: @@ -111,11 +111,14 @@ adapter = EthAdapter(backend) rpc = EthHTTPConnection(url=config.get('RPC_ENDPOINT'), chain_spec=chain_spec) if __name__ == '__main__': + havesends = 0 while True: srvs = None try: + logg.debug('getting connection') (srvs, srvs_addr) = ctrl.get_connection() except OSError as e: + havesends = 0 try: fi = os.stat(config.get('SESSION_SOCKET_PATH')) except FileNotFoundError: @@ -125,16 +128,38 @@ if __name__ == '__main__': logg.error('entity on socket path is not a socket') break if srvs == None: + logg.debug('timeout (remote socket is none)') txs = adapter.upcoming(chain_spec) for k in txs.keys(): + havesends += 1 logg.debug('txs {} {}'.format(k, txs[k])) adapter.dispatch(chain_spec, rpc, k, txs[k]) + if havesends > 0: + ctrl.srv.settimeout(0.1) + else: + ctrl.srv.settimeout(4.0) continue - srvs.setblocking(False) - data_in = srvs.recv(1024) - data_in_str = data_in.decode('utf-8') - data = bytes.fromhex(strip_0x(data_in_str)) - r = adapter.add(chain_spec, data) - srvs.send(r.to_bytes(4, byteorder='big')) + ctrl.srv.settimeout(0.1) + srvs.settimeout(0.1) + data_in = None + try: + data_in = srvs.recv(1024) + except BlockingIOError as e: + logg.debug('block io error: {}'.format(e)) + continue - ctrl.shutdown(None, None) + data = None + try: + data_in_str = data_in.decode('utf-8') + data = bytes.fromhex(strip_0x(data_in_str)) + except ValueError: + logg.error('invalid input "{}"'.format(data_in_str)) + continue + + logg.debug('recv {} bytes'.format(len(data))) + r = adapter.add(chain_spec, data) + r = srvs.send(r.to_bytes(4, byteorder='big')) + logg.debug('{} bytes sent'.format(r)) + srvs.close() + +ctrl.shutdown(None, None)