Compare commits

..

No commits in common. "master" and "v0.1.3" have entirely different histories.

24 changed files with 307 additions and 663 deletions

View File

@ -1,33 +1,3 @@
- 0.3.1
* Change license to AGPL3 and copyright waived to public domain
- 0.3.0
* Implement on chainlib 0.3.0
- 0.2.12
* Breaking upgrade of chainlib.
* Implement generic block and tx.
- 0.2.11
* Upgrade shep to handle exception in filestore list
- 0.2.10
* Upgrade shep to guarantee state lock atomicity
- 0.2.9
* Minimize instantiations of adapters in filter execution
- 0.2.8
* Upgrade chainsyncer
- 0.2.7
* Upgrade chainlib
- 0.2.6
* Deps upgrade
- 0.2.5
* Deps upgrade
- 0.2.4
* Allow omission of state store sync in queue store backend
- 0.2.2
* Fix missing symbol crashes related to race conditions
- 0.2.1
* Receive removed race checks from chainqueue
- 0.2.0
* primitive race condition handling between fs access of sync and queue
* re-enable throttling based on in-flight transaction count
- 0.1.2
* add settings object
- 0.1.0

141
LICENSE
View File

@ -1,5 +1,5 @@
GNU AFFERO GENERAL PUBLIC LICENSE
Version 3, 19 November 2007
GNU GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
@ -7,15 +7,17 @@
Preamble
The GNU Affero General Public License is a free, copyleft license for
software and other kinds of works, specifically designed to ensure
cooperation with the community in the case of network server software.
The GNU General Public License is a free, copyleft license for
software and other kinds of works.
The licenses for most software and other practical works are designed
to take away your freedom to share and change the works. By contrast,
our General Public Licenses are intended to guarantee your freedom to
the GNU General Public License is intended to guarantee your freedom to
share and change all versions of a program--to make sure it remains free
software for all its users.
software for all its users. We, the Free Software Foundation, use the
GNU General Public License for most of our software; it applies also to
any other work released this way by its authors. You can apply it to
your programs, too.
When we speak of free software, we are referring to freedom, not
price. Our General Public Licenses are designed to make sure that you
@ -24,34 +26,44 @@ them if you wish), that you receive source code or can get it if you
want it, that you can change the software or use pieces of it in new
free programs, and that you know you can do these things.
Developers that use our General Public Licenses protect your rights
with two steps: (1) assert copyright on the software, and (2) offer
you this License which gives you legal permission to copy, distribute
and/or modify the software.
To protect your rights, we need to prevent others from denying you
these rights or asking you to surrender the rights. Therefore, you have
certain responsibilities if you distribute copies of the software, or if
you modify it: responsibilities to respect the freedom of others.
A secondary benefit of defending all users' freedom is that
improvements made in alternate versions of the program, if they
receive widespread use, become available for other developers to
incorporate. Many developers of free software are heartened and
encouraged by the resulting cooperation. However, in the case of
software used on network servers, this result may fail to come about.
The GNU General Public License permits making a modified version and
letting the public access it on a server without ever releasing its
source code to the public.
For example, if you distribute copies of such a program, whether
gratis or for a fee, you must pass on to the recipients the same
freedoms that you received. You must make sure that they, too, receive
or can get the source code. And you must show them these terms so they
know their rights.
The GNU Affero General Public License is designed specifically to
ensure that, in such cases, the modified source code becomes available
to the community. It requires the operator of a network server to
provide the source code of the modified version running there to the
users of that server. Therefore, public use of a modified version, on
a publicly accessible server, gives the public access to the source
code of the modified version.
Developers that use the GNU GPL protect your rights with two steps:
(1) assert copyright on the software, and (2) offer you this License
giving you legal permission to copy, distribute and/or modify it.
An older license, called the Affero General Public License and
published by Affero, was designed to accomplish similar goals. This is
a different license, not a version of the Affero GPL, but Affero has
released a new version of the Affero GPL which permits relicensing under
this license.
For the developers' and authors' protection, the GPL clearly explains
that there is no warranty for this free software. For both users' and
authors' sake, the GPL requires that modified versions be marked as
changed, so that their problems will not be attributed erroneously to
authors of previous versions.
Some devices are designed to deny users access to install or run
modified versions of the software inside them, although the manufacturer
can do so. This is fundamentally incompatible with the aim of
protecting users' freedom to change the software. The systematic
pattern of such abuse occurs in the area of products for individuals to
use, which is precisely where it is most unacceptable. Therefore, we
have designed this version of the GPL to prohibit the practice for those
products. If such problems arise substantially in other domains, we
stand ready to extend this provision to those domains in future versions
of the GPL, as needed to protect the freedom of users.
Finally, every program is threatened constantly by software patents.
States should not allow patents to restrict development and use of
software on general-purpose computers, but in those that do, we wish to
avoid the special danger that patents applied to a free program could
make it effectively proprietary. To prevent this, the GPL assures that
patents cannot be used to render the program non-free.
The precise terms and conditions for copying, distribution and
modification follow.
@ -60,7 +72,7 @@ modification follow.
0. Definitions.
"This License" refers to version 3 of the GNU Affero General Public License.
"This License" refers to version 3 of the GNU General Public License.
"Copyright" also means copyright-like laws that apply to other kinds of
works, such as semiconductor masks.
@ -537,45 +549,35 @@ to collect a royalty for further conveying from those to whom you convey
the Program, the only way you could satisfy both those terms and this
License would be to refrain entirely from conveying the Program.
13. Remote Network Interaction; Use with the GNU General Public License.
Notwithstanding any other provision of this License, if you modify the
Program, your modified version must prominently offer all users
interacting with it remotely through a computer network (if your version
supports such interaction) an opportunity to receive the Corresponding
Source of your version by providing access to the Corresponding Source
from a network server at no charge, through some standard or customary
means of facilitating copying of software. This Corresponding Source
shall include the Corresponding Source for any work covered by version 3
of the GNU General Public License that is incorporated pursuant to the
following paragraph.
13. Use with the GNU Affero General Public License.
Notwithstanding any other provision of this License, you have
permission to link or combine any covered work with a work licensed
under version 3 of the GNU General Public License into a single
under version 3 of the GNU Affero General Public License into a single
combined work, and to convey the resulting work. The terms of this
License will continue to apply to the part which is the covered work,
but the work with which it is combined will remain governed by version
3 of the GNU General Public License.
but the special requirements of the GNU Affero General Public License,
section 13, concerning interaction through a network will apply to the
combination as such.
14. Revised Versions of this License.
The Free Software Foundation may publish revised and/or new versions of
the GNU Affero General Public License from time to time. Such new versions
will be similar in spirit to the present version, but may differ in detail to
the GNU General Public License from time to time. Such new versions will
be similar in spirit to the present version, but may differ in detail to
address new problems or concerns.
Each version is given a distinguishing version number. If the
Program specifies that a certain numbered version of the GNU Affero General
Program specifies that a certain numbered version of the GNU General
Public License "or any later version" applies to it, you have the
option of following the terms and conditions either of that numbered
version or of any later version published by the Free Software
Foundation. If the Program does not specify a version number of the
GNU Affero General Public License, you may choose any version ever published
GNU General Public License, you may choose any version ever published
by the Free Software Foundation.
If the Program specifies that a proxy can decide which future
versions of the GNU Affero General Public License can be used, that proxy's
versions of the GNU General Public License can be used, that proxy's
public statement of acceptance of a version permanently authorizes you
to choose that version for the Program.
@ -633,29 +635,40 @@ the "copyright" line and a pointer to where the full notice is found.
Copyright (C) <year> <name of author>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
GNU General Public License for more details.
You should have received a copy of the GNU Affero General Public License
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Also add information on how to contact you by electronic and paper mail.
If your software can interact with users remotely through a computer
network, you should also make sure that it provides a way for users to
get its source. For example, if your program is a web application, its
interface could display a "Source" link that leads users to an archive
of the code. There are many ways you could offer source, and different
solutions will be better for different programs; see section 13 for the
specific requirements.
If the program does terminal interaction, make it output a short
notice like this when it starts in an interactive mode:
<program> Copyright (C) <year> <name of author>
This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
This is free software, and you are welcome to redistribute it
under certain conditions; type `show c' for details.
The hypothetical commands `show w' and `show c' should show the appropriate
parts of the General Public License. Of course, your program's commands
might be different; for a GUI interface, you would use an "about box".
You should also get your employer (if you work as a programmer) or school,
if any, to sign a "copyright disclaimer" for the program, if necessary.
For more information on this, and how to apply and follow the GNU AGPL, see
For more information on this, and how to apply and follow the GNU GPL, see
<https://www.gnu.org/licenses/>.
The GNU General Public License does not permit incorporating your program
into proprietary programs. If your program is a subroutine library, you
may consider it more useful to permit linking proprietary applications with
the library. If this is what you want to do, use the GNU Lesser General
Public License instead of this License. But first, please read
<https://www.gnu.org/licenses/why-not-lgpl.html>.

View File

@ -1 +1 @@
include *requirements.txt LICENSE WAIVER WAIVER.asc CHANGELOG chaind/data/config/* chaind/data/config/syncer/*
include *requirements.txt chaind/data/config/* chaind/data/config/syncer/*

17
WAIVER
View File

@ -1,17 +0,0 @@
# Copyright waiver for the python package "chaind"
I dedicate any and all copyright interest in this software to the
public domain. I make this dedication for the benefit of the public at
large and to the detriment of my heirs and successors. I intend this
dedication to be an overt act of relinquishment in perpetuity of all
present and future rights to this software under copyright law.
To the best of my knowledge and belief, my contributions are either
originally authored by me or are derived from prior works which I have
verified are also in the public domain and are not subject to claims
of copyright by other parties.
To the best of my knowledge and belief, no individual, business,
organization, government, or other entity has any copyright interest
in my contributions, and I affirm that I will not make contributions
that are otherwise encumbered.

View File

@ -1,28 +0,0 @@
-----BEGIN PGP MESSAGE-----
owGVU2tQVFUcX0BkvY6jTQFjBJ4FA2tWgl6YyZLD8BIFiYeQOXT37lnuifviPnbd
AkIweRnhCAbpEDBWyHNsoJkm0QYICpTZScIv4IAPHlEqZTU0gp27C2r1qQ87s/ee
///3PLdynZtG66L7Yfrrkp7Po13a3Djj6n0749KiXqdybl8KAJG8YBNRFi0DK4ks
UARmXgQyDYFgk2meAwJJZZNZEPhTNIk4kz9BxAETNCGKlCEgORv+mQDJMIB6AIQ4
GYpQUv9gJCQBiTfLVlKEQOZVaEJQjAyigIlnMWQwiAMsmQ2do8vQCDOvCDFCDpqR
DHizU5dzmZQJhhSzoIPfiYuXZRGxkHPMsjZAQyRKjgFJoSgoSbwoqXSqQHULExKP
EGIUo4oHeJyDDEjKgSNCBnE5CpJoBzL2JEBRgLKCZJt6jr0TArarHqpUZkVWsFVH
FJJT2aMRKJwJh/wwLIa0BhNECr9sVVrRns3xVgaalg0asQho1qsHFM9hl0ZFlYzN
YUiI8K5I8BgQcVgO7kTB3YkQ79kACwEOUp3DxLhhrFDkWSCICL+28mK2BKw0omic
C01aIIG9IzPCY+oKyUi8s8YHwTtbc9aOJzhexuka34Y4LWyWYkjESgT28NAiFsGr
CvFdEmUEpf/hl1PZTciCTArJ6IFRkRCHe9Rjs1kkh95xFKcHWWpjnNqPXjXrZMNP
akU0KTnu6X/vJ4Ft/DtQvYM+DpBmMxJZLJKU8ZMV4QuuOnVc1H8sEI4RNQkHqxVJ
uBCOUlgj5jAFE6WuL6zSuGg1vl66VdrAXxqErX7F/deKk1a+SHdX9TvUEGs2rLz5
8sT6++kJ1+OPLe74jGlMjj5M+Obp7L+3921dXaWZ+Nk7sN2jubh1Ls/65kD5YCIb
USRMCEc2kImvFBYacgOT6wa1jQHD9ry5xYqhvu+fGR5dd4jtqk+3HCBTH6ub3p17
cMvuK4XdMaGV8wl0WpR7fG3DpwtTZ5VWu2z5KPPdn5gztye3HzxRWlrt1x4WuQf2
pUy2E6H+sSdv2OHxXV0ex9tCaWlpku5cmz9U00nQixUdcWlzpfaRJ+WWWo/W87MX
N22M6j2WEX0+vSCiXvx4gyE/dn/NtpHMsjtrqmPGerq2B53y/OTH2VsGw+jpirVF
C6476QmXHZqae16VS2dto/xTTQfSzK9e0Dz3wRSXOfINsfelsj/aky2DxTfOlX8X
YprouzCli6mN+O3Zy8arh2D9E/d3hRf4Td1L6Viafc0eftJt/nSnT0vWW3HMuSt3
iyuo8Nw/xzx9P2wzzle5N5fHdGi/8Aj71hIemxLi4v3r0eTJJK/h1CbLzaKBW2Tr
V4szM1pD0+XenIik2fUNOqH5TC/dFTXSX7kxhKpyObLp5Yp9wYk+qfnTGd3XDFzH
4Za/xsf33DW835bSe9Q7drNWDHtxf+1A/RsLngWFmrSJgchTnmU943XRNwu6rzam
PG5IzEhYuFMyu6Vy296gGh82ngvo181cD4p+enNDbmGJUG26NDT8fMbF98b+Bg==
=vZcN
-----END PGP MESSAGE-----

View File

@ -1,27 +1,10 @@
# standard imports
import logging
import time
# external imports
from chainqueue import Store as QueueStore
# local imports
from chaind.lock import StoreLock
logg = logging.getLogger(__name__)
class ChaindAdapter:
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, store_sync=True):
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
self.cache_adapter = cache_adapter
self.dispatcher = dispatcher
store_lock = StoreLock()
while True:
try:
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache, sync=store_sync)
break
except FileNotFoundError as e:
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e))
store_lock.again()
continue
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)

View File

@ -1,7 +1,6 @@
# standard imports
import logging
import os
import time
# external imports
from chainlib.error import RPCException
@ -12,27 +11,23 @@ from chainqueue.store.fs import (
CounterStore,
)
from shep.store.file import SimpleFileStoreFactory
from shep.error import (
StateInvalid,
StateLockedKey,
)
from shep.error import StateInvalid
# local imports
from .base import ChaindAdapter
from chaind.lock import StoreLock
logg = logging.getLogger(__name__)
class ChaindFsAdapter(ChaindAdapter):
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None, store_sync=True):
factory = SimpleFileStoreFactory(path, use_lock=True).add
state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32):
factory = SimpleFileStoreFactory(path).add
state_store = Status(factory)
index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes)
counter_store = CounterStore(path)
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold, store_sync=store_sync)
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold)
def put(self, signed_tx):
@ -42,37 +37,16 @@ class ChaindFsAdapter(ChaindAdapter):
def get(self, tx_hash):
v = None
store_lock = StoreLock()
while True:
try:
v = self.store.get(tx_hash)
break
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
except FileNotFoundError as e:
logg.debug('queuestore get (file missing) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('queuestore get (statelock) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
try:
v = self.store.get(tx_hash)
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
return v[1]
def upcoming(self, limit=0):
real_limit = 0
in_flight = []
if limit > 0:
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
real_limit = limit - len(in_flight)
if real_limit <= 0:
return []
r = self.store.upcoming(limit=real_limit)
logg.info('upcoming returning {} upcoming from limit {} less {} active in-flight txs'.format(len(r), limit, len(in_flight)))
return r
def upcoming(self):
return self.store.upcoming()
def pending(self):
@ -83,30 +57,15 @@ class ChaindFsAdapter(ChaindAdapter):
return self.store.deferred()
def failed(self):
return self.store.failed()
def succeed(self, block, tx):
if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=False)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
return self.store.final(tx.hash, block, tx, error=False)
def fail(self, block, tx):
if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=True)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
def sendfail(self):
return self.store.fail(tx.hash)
return self.store.final(tx.hash, block, tx, error=True)
def enqueue(self, tx_hash):
@ -114,44 +73,15 @@ class ChaindFsAdapter(ChaindAdapter):
def dispatch(self, tx_hash):
entry = None
store_lock = StoreLock()
while True:
try:
entry = self.store.send_start(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
entry = self.store.send_start(tx_hash)
tx_wire = entry.serialize()
r = None
try:
r = self.dispatcher.send(tx_wire)
except RPCException as e:
logg.error('dispatch send failed for {}: {}'.format(tx_hash, e))
except RPCException:
self.store.fail(tx_hash)
return False
store_lock = StoreLock()
while True:
try:
self.store.send_end(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
self.store.send_end(tx_hash)
return True

12
chaind/cli/__init__.py Normal file
View File

@ -0,0 +1,12 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@ -1,19 +1,17 @@
def apply_flag(flag):
flag.add('session')
flag.add('dispatch')
flag.add('socket')
flag.add('socket_client')
flag.add('token')
# local imports
from .base import ChaindFlag
flag.alias('chaind_base', 'session')
flag.alias('chaind_socket_client', 'session', 'socket', 'socket_client')
def process_flags(argparser, flags):
if flags & ChaindFlag.SESSION > 0:
argparser.add_argument('--session-id', dest='session_id', type=str, help='Session to store state and data under')
argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, help='Directory to store volatile data')
argparser.add_argument('--data-dir', dest='data_dir', type=str, help='Directory to store persistent data')
return flag
if flags & ChaindFlag.SOCKET > 0:
argparser.add_argument('--socket-path', dest='socket', type=str, help='UNIX socket path')
if flags & ChaindFlag.SOCKET_CLIENT > 0:
argparser.add_argument('--send-socket', dest='socket_send', action='store_true', help='Send to UNIX socket')
def apply_arg(arg):
arg.add_long('session-id', 'session', help='Session to store state and data under')
arg.add_long('socket-path', 'socket', help='UNIX socket path')
arg.add_long('send-socket', 'socket_client', typ=bool, help='Send to UNIX socket')
arg.add_long('token-module', 'token', help='Python module path to resolve tokens from identifiers')
return arg
if flags & ChaindFlag.TOKEN > 0:
argparser.add_argument('--token-module', dest='token_module', type=str, help='Python module path to resolve tokens from identifiers')

View File

@ -1,19 +1,23 @@
def process_config(config, arg, args, flags):
args_override = {}
if arg.match('session', flags):
args_override['SESSION_ID'] = getattr(args, 'session_id')
args_override['SESSION_RUNTIME_DIR'] = getattr(args, 'runtime_path')
args_override['SESSION_DATA_DIR'] = getattr(args, 'state_path')
# external imports
from chaind.cli import ChaindFlag
if arg.match('socket', flags):
def process_config(config, args, flags):
args_override = {}
if flags & ChaindFlag.SESSION:
args_override['SESSION_ID'] = getattr(args, 'session_id')
args_override['SESSION_RUNTIME_DIR'] = getattr(args, 'runtime_dir')
args_override['SESSION_DATA_DIR'] = getattr(args, 'data_dir')
if flags & ChaindFlag.SOCKET:
args_override['SESSION_SOCKET_PATH'] = getattr(args, 'socket')
if arg.match('token', flags):
if flags & ChaindFlag.TOKEN:
args_override['TOKEN_MODULE'] = getattr(args, 'token_module')
config.dict_override(args_override, 'local cli args')
if arg.match('socket_client', flags):
config.add(getattr(args, 'send_socket'), '_SOCKET_SEND', False)
if flags & ChaindFlag.SOCKET_CLIENT:
config.add(getattr(args, 'socket_send'), '_SOCKET_SEND', False)
return config

View File

@ -1,6 +0,0 @@
# standard imports
import os
data_dir = os.path.realpath(os.path.dirname(__file__))
config_dir = os.path.join(data_dir, 'config')

View File

@ -1,5 +1,5 @@
[session]
socket_path =
runtime_path =
runtime_dir =
id =
data_path =
data_dir =

View File

@ -1,2 +0,0 @@
[token]
module =

View File

@ -1,33 +0,0 @@
# standard imports
import logging
# local ipmorts
from chaind.adapters.fs import ChaindFsAdapter
from chaind.eth.cache import EthCacheTx
logg = logging.getLogger(__name__)
class DispatchProcessor:
def __init__(self, chain_spec, queue_dir, dispatcher):
self.dispatcher = dispatcher
self.chain_spec = chain_spec,
self.queue_dir = queue_dir
def process(self, rpc, limit=50):
adapter = ChaindFsAdapter(
self.chain_spec,
self.queue_dir,
EthCacheTx,
self.dispatcher,
)
upcoming = adapter.upcoming(limit=limit)
logg.info('processor has {} candidates for {}, processing with limit {}'.format(len(upcoming), self.chain_spec, limit))
i = 0
for tx_hash in upcoming:
if adapter.dispatch(tx_hash):
i += 1
return i

View File

@ -20,7 +20,3 @@ class ClientInputError(ValueError):
class QueueLockError(Exception):
pass
class BackendError(Exception):
pass

View File

@ -6,116 +6,42 @@ import time
from chainlib.status import Status as TxStatus
from chainsyncer.filter import SyncFilter
from chainqueue.error import NotLocalTxError
from chaind.adapters.fs import ChaindFsAdapter
from shep.error import StateLockedKey
# local imports
from .error import (
QueueLockError,
BackendError,
)
from chaind.lock import StoreLock
from .error import QueueLockError
logg = logging.getLogger(__name__)
class StateFilter(SyncFilter):
def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
self.chain_spec = chain_spec
self.adapter_path = adapter_path
self.tx_adapter = tx_adapter
delay_limit = 3.0
def __init__(self, adapter, throttler=None):
self.adapter = adapter
self.throttler = throttler
self.last_block_height = 0
self.adapter = None
self.store_lock = None
def __get_adapter(self, block, force_reload=False):
if self.store_lock == None:
self.store_lock = StoreLock()
reload = False
if block.number != self.last_block_height:
reload = True
elif self.adapter == None:
reload = True
elif force_reload:
reload = True
self.last_block_height = block.number
if reload:
while True:
logg.info('reloading adapter')
try:
self.adapter = ChaindFsAdapter(
self.chain_spec,
self.adapter_path,
self.tx_adapter,
None,
)
break
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
self.store_lock.again()
continue
return self.adapter
def filter(self, conn, block, tx, session=None):
cache_tx = None
queue_adapter = self.__get_adapter(block)
self.store_lock.reset()
try:
cache_tx = self.adapter.get(tx.hash)
except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash))
return False
delay = 0.01
while True:
try:
cache_tx = queue_adapter.get(tx.hash)
break
except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash))
return False
except BackendError as e:
logg.error('adapter get failed: {}, one more try'.format(e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
if cache_tx == None:
raise NotLocalTxError(tx.hash)
self.store_lock.reset()
queue_lock = StoreLock(error=QueueLockError)
while True:
if delay > self.delay_limit:
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
try:
if tx.status == TxStatus.SUCCESS:
queue_adapter.succeed(block, tx)
self.adapter.succeed(block, tx)
else:
queue_adapter.fail(block, tx)
self.adapter.fail(block, tx)
break
except QueueLockError as e:
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
queue_lock.again()
except FileNotFoundError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
except NotLocalTxError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
logg.info('filter registered {} for {} in {}'.format(tx.status_name, tx.hash, block))
time.sleep(delay)
delay *= 2
if self.throttler != None:
self.throttler.dec(tx.hash)

View File

@ -1,34 +0,0 @@
# standard imports
import time
# local imports
from .error import BackendError
BASE_DELAY = 0.01
BASE_DELAY_LIMIT = 10.0
class StoreLock:
def __init__(self, delay=BASE_DELAY, delay_limit=BASE_DELAY_LIMIT, error=BackendError, description=None):
self.base_delay = delay
self.delay = delay
self.delay_limit = delay_limit
self.error = error
self.description = description
def again(self, e=None):
if self.delay > self.delay_limit:
err = None
if e != None:
err = str(e)
else:
err = self.description
raise self.error(err)
time.sleep(self.delay)
self.delay *= 2
def reset(self):
self.delay = self.base_delay

View File

@ -8,21 +8,19 @@ import stat
from hexathon import strip_0x
# local imports
from .error import (
from chaind.error import (
NothingToDoError,
ClientGoneError,
ClientBlockError,
ClientInputError,
)
from .lock import StoreLock
from .error import BackendError
logg = logging.getLogger(__name__)
class SessionController:
def __init__(self, config, processor):
def __init__(self, config, adapter, processor):
self.dead = False
os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True)
try:
@ -37,6 +35,7 @@ class SessionController:
self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY')))
self.processor = processor
self.chain_spec = config.get('CHAIN_SPEC')
self.adapter = adapter
def shutdown(self, signo, frame):
@ -60,16 +59,7 @@ class SessionController:
def process(self, conn):
state_lock = StoreLock()
r = None
while True:
try:
r = self.processor(conn)
break
except BackendError as e:
state_lock.again(e)
continue
r = self.processor(self.chain_spec, self.adapter, conn)
if r > 0:
self.srv.settimeout(0.1)
else:
@ -114,6 +104,7 @@ class SessionController:
logg.error('invalid input "{}"'.format(data_in_str))
raise ClientInputError()
logg.info('recv {} bytes'.format(len(data)))
return (srvs, data,)

View File

@ -4,123 +4,122 @@ import os
import uuid
# external imports
from chainlib.settings import ChainSettings
from chainqueue.settings import *
from chainsyncer.settings import ChainsyncerSettings
from chainqueue.settings import ChainqueueSettings
logg = logging.getLogger(__name__)
class ChaindSettings(ChainSettings):
class ChaindSettings(ChainsyncerSettings, ChainqueueSettings):
def __init__(settings, include_sync=False, include_queue=False):
super(ChaindSettings, settings).__init__()
settings.include_sync = include_sync
settings.include_queue = include_queue
def __init__(self, include_sync=False, include_queue=False):
super(ChaindSettings, self).__init__()
self.include_sync = include_sync
self.include_queue = include_queue
def process_session(self, config):
session_id = config.get('SESSION_ID')
base_dir = os.getcwd()
data_dir = config.get('SESSION_DATA_DIR')
if data_dir == None:
data_dir = os.path.join(base_dir, '.chaind', 'chaind', self.o.get('CHAIND_BACKEND'))
data_engine_dir = os.path.join(data_dir, config.get('CHAIND_ENGINE'))
os.makedirs(data_engine_dir, exist_ok=True)
# check if existing session
if session_id == None:
fp = os.path.join(data_engine_dir, 'default')
try:
os.stat(fp)
fp = os.path.realpath(fp)
except FileNotFoundError:
fp = None
if fp != None:
session_id = os.path.basename(fp)
make_default = False
if session_id == None:
session_id = str(uuid.uuid4())
make_default = True
# create the session persistent dir
session_dir = os.path.join(data_engine_dir, session_id)
if make_default:
fp = os.path.join(data_engine_dir, 'default')
os.symlink(session_dir, fp)
#data_dir = os.path.join(session_dir, config.get('CHAIND_COMPONENT'))
data_dir = session_dir
os.makedirs(data_dir, exist_ok=True)
# create volatile dir
uid = os.getuid()
runtime_dir = config.get('SESSION_RUNTIME_DIR')
if runtime_dir == None:
runtime_dir = os.path.join('/run', 'user', str(uid), 'chaind', self.o.get('CHAIND_BACKEND'))
#runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id, config.get('CHAIND_COMPONENT'))
runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id)
os.makedirs(runtime_dir, exist_ok=True)
self.o['SESSION_RUNTIME_DIR'] = runtime_dir
self.o['SESSION_DIR'] = session_dir
self.o['SESSION_DATA_DIR'] = data_dir
self.o['SESSION_ID'] = session_id
def process_sync_interface(self, config):
raise NotImplementedError('no sync interface implementation defined')
def process_sync(self, config):
self.process_sync_interface(config)
self.process_sync_range(config)
def process_socket(self, config):
socket_path = config.get('SESSION_SOCKET_PATH')
if socket_path == None:
socket_path = os.path.join(self.o['SESSION_RUNTIME_DIR'], 'chaind.sock')
self.o['SESSION_SOCKET_PATH'] = socket_path
def process_dispatch(self, config):
self.o['SESSION_DISPATCH_DELAY'] = 0.01
def process_token(self, config):
self.o['TOKEN_MODULE'] = config.get('TOKEN_MODULE')
def process_backend(self, config):
if self.include_sync and self.include_queue:
if self.o['QUEUE_BACKEND'] != self.o['SYNCER_BACKEND']:
raise ValueError('queue and syncer backends must match. queue "{}" != syncer "{}"'.format(self.o['QUEUE_BACKEND'], self.o['SYNCER_BACKEND']))
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
elif self.include_sync:
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
elif self.include_queue:
self.o['CHAIND_BACKEND'] = self.o['QUEUE_BACKEND']
else:
raise ValueError('at least one backend must be set')
def process(self, config):
super(ChaindSettings, self).process(config)
if self.include_sync:
self.process_sync(config)
self.process_sync_backend(config)
if self.include_queue:
self.process_queue_backend(config)
self.process_dispatch(config)
self.process_token(config)
self.process_backend(config)
self.process_session(config)
self.process_socket(config)
def dir_for(self, k):
return os.path.join(self.o['SESSION_PATH'], k)
def process_session(settings, config):
session_id = config.get('SESSION_ID')
base_dir = os.getcwd()
data_dir = config.get('SESSION_DATA_PATH')
if data_dir == None:
data_dir = os.path.join(base_dir, '.chaind', 'chaind', settings.get('CHAIND_BACKEND'))
data_engine_dir = os.path.join(data_dir, config.get('CHAIND_ENGINE'))
os.makedirs(data_engine_dir, exist_ok=True)
# check if existing session
if session_id == None:
fp = os.path.join(data_engine_dir, 'default')
try:
os.stat(fp)
fp = os.path.realpath(fp)
except FileNotFoundError:
fp = None
if fp != None:
session_id = os.path.basename(fp)
make_default = False
if session_id == None:
session_id = str(uuid.uuid4())
make_default = True
chain_spec = settings.get('CHAIN_SPEC')
network_id_str = str(chain_spec.network_id())
# create the session persistent dir
session_path = os.path.join(
data_engine_dir,
chain_spec.arch(),
chain_spec.fork(),
network_id_str,
session_id,
)
if make_default:
fp = os.path.join(data_engine_dir, 'default')
os.symlink(session_path, fp)
data_path = session_path
os.makedirs(data_path, exist_ok=True)
# create volatile dir
uid = os.getuid()
runtime_path = config.get('SESSION_RUNTIME_PATH')
if runtime_path == None:
runtime_path = os.path.join('/run', 'user', str(uid), 'chaind', settings.get('CHAIND_BACKEND'))
runtime_path = os.path.join(
runtime_path,
config.get('CHAIND_ENGINE'),
chain_spec.arch(),
chain_spec.fork(),
str(chain_spec.network_id()),
session_id,
)
os.makedirs(runtime_path, exist_ok=True)
settings.set('SESSION_RUNTIME_PATH', runtime_path)
settings.set('SESSION_PATH', session_path)
settings.set('SESSION_DATA_PATH', data_path)
settings.set('SESSION_ID', session_id)
return settings
def process_socket(settings, config):
socket_path = config.get('SESSION_SOCKET_PATH')
if socket_path == None:
socket_path = os.path.join(settings.get('SESSION_RUNTIME_PATH'), 'chaind.sock')
settings.set('SESSION_SOCKET_PATH', socket_path)
return settings
def process_dispatch(settings, config):
settings.set('SESSION_DISPATCH_DELAY', 0.01)
return settings
def process_token(settings, config):
settings.set('TOKEN_MODULE', config.get('TOKEN_MODULE'))
return settings
def process_backend(settings, config):
settings.set('CHAIND_BACKEND', config.get('STATE_BACKEND')) #backend)
return settings
def process_queue(settings, config):
if config.get('STATE_PATH') == None:
queue_state_dir = settings.dir_for('queue')
config.add(queue_state_dir, 'STATE_PATH', False)
logg.debug('setting queue state path {}'.format(queue_state_dir))
settings = process_queue_tx(settings, config)
settings = process_queue_paths(settings, config)
if config.get('STATE_BACKEND') == 'fs':
settings = process_queue_backend_fs(settings, config)
settings = process_queue_store(settings, config)
return settings
return os.path.join(self.o['SESSION_DIR'], k)

View File

@ -1,4 +1,5 @@
# standard imports
import unittest
import hashlib
import tempfile
@ -7,11 +8,9 @@ from chainqueue.cache import CacheTokenTx
from chainlib.status import Status as TxStatus
from chainlib.chain import ChainSpec
from chainlib.error import RPCException
from chainlib.tx import (
Tx,
TxResult,
)
from chainlib.block import Block
# local imports
from chaind.adapters.fs import ChaindFsAdapter
class MockCacheAdapter(CacheTokenTx):
@ -34,22 +33,22 @@ class MockDispatcher:
def send(self, v):
if v in self.fails:
if v not in self.fails:
raise RPCException('{} is in fails'.format(v))
pass
class MockTx(Tx):
class MockTx:
def __init__(self, tx_hash, status=TxStatus.SUCCESS):
result = TxResult()
result.status = status
super(MockTx, self).__init__(result=result)
self.set_hash(tx_hash)
self.hash = tx_hash
self.status = status
class MockBlock(Block):
class TestChaindFsBase(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher)
def __init__(self, number):
super(MockBlock, self).__init__()
self.number = number

View File

@ -1,31 +0,0 @@
# standard imports
import unittest
import tempfile
import logging
# external imports
from chainlib.chain import ChainSpec
# local imports
from chaind.adapters.fs import ChaindFsAdapter
logging.STATETRACE = 5
logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
class TestChaindFsBase(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher, event_callback=self.log_state)
def log_state(self, k, from_state, to_state):
logg.log(logging.STATETRACE, 'state change {}: {} -> {}'.format(
k,
from_state,
to_state,
)
)

View File

@ -1,6 +1,6 @@
chainlib~=0.3.0
chainqueue~=0.2.0
chainsyncer~=0.5.0
confini~=0.6.1
chainlib~=0.1.1
chainqueue~=0.1.5
chainsyncer~=0.4.2
confini~=0.6.0
funga~=0.5.2
pyxdg~=0.26

View File

@ -1,24 +1,24 @@
[metadata]
name = chaind
version = 0.3.2
version = 0.1.3
description = Base package for chain queue service
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://git.defalsify.org/chaind.eth
url = https://gitlab.com/chaintool/chaind
keywords =
blockchain
cryptocurrency
dlt
p2p
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Environment :: Console
Intended Audience :: Developers
License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
# Topic :: Blockchain :: EVM
license = OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)
license = GPL3
licence_files =
LICENSE
@ -30,7 +30,6 @@ packages =
# chaind.runnable
chaind.adapters
chaind.unittest
chaind.data
chaind.cli
#[options.entry_points]

View File

@ -14,16 +14,16 @@ from chaind.filter import StateFilter
# test imports
from chaind.unittest.common import (
MockTx,
MockBlock,
MockCacheAdapter,
MockDispatcher,
TestChaindFsBase,
)
from chaind.unittest.fs import TestChaindFsBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestChaindFs(TestChaindFsBase):
def setUp(self):
@ -43,15 +43,12 @@ class TestChaindFs(TestChaindFsBase):
self.assertEqual(data, v)
def test_fs_fail(self):
def test_fs_defer(self):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
self.dispatcher.add_fail(data)
r = self.adapter.dispatch(hsh)
self.assertFalse(r)
txs = self.adapter.failed()
self.dispatcher.add_fail(hsh)
self.adapter.dispatch(hsh)
txs = self.adapter.deferred()
self.assertEqual(len(txs), 1)
@ -75,10 +72,9 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
fltr = StateFilter(self.adapter)
tx = MockTx(hsh)
block = MockBlock(42)
fltr.filter(None, block, tx)
fltr.filter(None, None, tx)
def test_fs_filter_fail(self):
@ -87,30 +83,9 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
fltr = StateFilter(self.adapter)
tx = MockTx(hsh, TxStatus.ERROR)
block = MockBlock(42)
fltr.filter(None, block, tx)
def test_upcoming(self):
drv = QueueDriver(self.adapter)
txs = []
for i in range(10):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
txs.append(hsh)
self.adapter.enqueue(hsh)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 5)
r = self.adapter.dispatch(txs[0])
self.assertTrue(r)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 4)
fltr.filter(None, None, tx)
if __name__ == '__main__':