add transfer to custodial accounts

This commit is contained in:
Carlosokumu 2024-12-11 17:46:38 +03:00
parent 2f0b59e579
commit 7b9bff6541
Signed by: carlos
GPG Key ID: 7BD6BC8160A5C953

611
parse.py
View File

@ -14,6 +14,8 @@ import select
from multiprocessing import Process
from multiprocessing import Pipe
import subprocess
import requests
import requests.adapters
from web3 import Web3
from dotenv import load_dotenv
@ -21,54 +23,55 @@ logg = logging.getLogger()
logg.setLevel(logging.DEBUG)
tokens = (
'ID',
'NOID',
'HEX',
'LABEL',
'VALUE',
'END',
"ID",
"NOID",
"HEX",
"LABEL",
"VALUE",
"END",
)
t_ID = r'[a-zA-Z0-9]+(-[a-zA-Z0-9]+)+'
t_NOID = r'\-'
t_LABEL = r'[a-zA-Z_]+'
t_VALUE = r'\d+'
t_END = r'\n'
t_HEX = r'0x([a-fA-F0-9])+'
t_ID = r"[a-zA-Z0-9]+(-[a-zA-Z0-9]+)+"
t_NOID = r"\-"
t_LABEL = r"[a-zA-Z_]+"
t_VALUE = r"\d+"
t_END = r"\n"
t_HEX = r"0x([a-fA-F0-9])+"
#def t_VALUE(t):
# def t_VALUE(t):
# r'\d+'
# t.value = int(t.value)
#def t_HEX(t):
# def t_HEX(t):
# r'0x([a-fA-F0-9])+'
#print(t)
#try:
# t.value = bytes.fromhex(t.value)
#except:
# return False
#return t.value
#t.value = t.value
# print(t)
# try:
# t.value = bytes.fromhex(t.value)
# except:
# return False
# return t.value
# t.value = t.value
# pass
t_ignore = ' \t'
t_ignore = " \t"
def t_error(t):
print("problem: ", t.value[0])
t.lexer.skip(1)
lexer = lex.lex()
lexer = lex.lex()
load_dotenv()
#Chain Params
# Chain Params
chainId = os.getenv("CHAIN_ID")
rpc = os.getenv("RPC")
gasCap = os.getenv("GAS_FEE_CAP")
bearer_token = os.getenv("BEARER_TOKEN")
gas_cap = os.getenv("GAS_FEE_CAP")
master_private_key = os.getenv("MASTER_PRIVATE_KEY")
@ -78,26 +81,35 @@ gas_topup = os.getenv("GAS_TOPUP")
w3 = Web3(Web3.HTTPProvider(rpc))
#
#data = '''
#FOOBAR uf-2etg 0xa3bdefa momo 123
# data = '''
# FOOBAR uf-2etg 0xa3bdefa momo 123
#
#BARBAR
#BAZ
# BARBAR
# BAZ
#
#BASFB foo-bar-baz
# BASFB foo-bar-baz
#'''
#
#lexer.input(data)
# lexer.input(data)
#
#while True:
# while True:
# tok = lexer.token()
# if not tok:
# break
# print(tok)
class TokenTransfer:
def __init__(
self, to_address=None, from_address=None, amount=None, token_address=None
):
self.to_address = to_address
self.from_address = from_address
self.amount = amount
self.token_address = token_address
class CmdId(enum.IntEnum):
KEY_CREATE = 0x1
VOUCHER_CREATE = 0x10
@ -110,14 +122,15 @@ class CmdId(enum.IntEnum):
class Agent:
def __str__(self):
return self.__class__.__name__ + ':' #{}'.format(self.__class__.__name__, self.v)
return (
self.__class__.__name__ + ":"
) # {}'.format(self.__class__.__name__, self.v)
class AddressAgent(Agent):
def __init__(self, v):
self.v = bytes.fromhex(v[2:])
def __str__(self):
return Agent.__str__(self) + self.v.hex()
@ -129,8 +142,8 @@ class NameAgent(Agent):
def __str__(self):
return Agent.__str__(self) + self.v
class Cmd:
class Cmd:
def __init__(self, cmd):
self.c = CmdId[cmd]
self.i = None
@ -144,9 +157,21 @@ class Cmd:
self.s = None
self.n = None
def __str__(self):
return "[{:02d}]{} i={} v={} a={} f={} t={} k={} d={} p={} s={} n={}".format(self.c, self.c.name, self.i, self.v, self.a, self.f, self.t, self.k, self.d, self.p, self.s, self.n)
return "[{:02d}]{} i={} v={} a={} f={} t={} k={} d={} p={} s={} n={}".format(
self.c,
self.c.name,
self.i,
self.v,
self.a,
self.f,
self.t,
self.k,
self.d,
self.p,
self.s,
self.n,
)
def to_store(v):
@ -163,19 +188,19 @@ def to_agent(v):
def p_cmd(p):
'''cmd : key_create
| voucher_mint
| voucher_transfer
| voucher_create
| pair
'''
"""cmd : key_create
| voucher_mint
| voucher_transfer
| voucher_create
| pair
"""
p[0] = p[1]
def p_key_create(p):
'''key_create : change_label LABEL LABEL
| change_label LABEL LABEL HEX
'''
"""key_create : change_label LABEL LABEL
| change_label LABEL LABEL HEX
"""
o = p[1]
o.f = NameAgent(p[2])
o.t = to_store(p[3])
@ -184,11 +209,10 @@ def p_key_create(p):
p[0] = o
def p_voucher_mint(p):
'''voucher_mint : change_label hv
| change_label nv
'''
"""voucher_mint : change_label hv
| change_label nv
"""
o = p[1]
if o.c.value & 0x10 == 0:
raise ValueError("not a voucher command")
@ -198,9 +222,9 @@ def p_voucher_mint(p):
def p_voucher_create(p):
'''voucher_create : change_label LABEL LABEL VALUE
| change_label LABEL LABEL VALUE VALUE VALUE
'''
"""voucher_create : change_label LABEL LABEL VALUE
| change_label LABEL LABEL VALUE VALUE VALUE
"""
o = p[1]
if o.c.value & 0x10 == 0:
raise ValueError("not a voucher command")
@ -214,11 +238,11 @@ def p_voucher_create(p):
def p_voucher_mint_recipient(p):
'''voucher_mint : change_label hv HEX
| change_label nv HEX
| change_label hv LABEL
| change_label nv LABEL
'''
"""voucher_mint : change_label hv HEX
| change_label nv HEX
| change_label hv LABEL
| change_label nv LABEL
"""
o = p[1]
if o.c.value & 0x10 == 0:
raise ValueError("not a voucher command")
@ -229,15 +253,15 @@ def p_voucher_mint_recipient(p):
def p_voucher_transfer(p):
'''voucher_transfer : change_label hv HEX HEX
| change_label nv HEX HEX
| change_label hv LABEL HEX
| change_label nv LABEL HEX
| change_label hv LABEL LABEL
| change_label nv LABEL LABEL
| change_label hv HEX LABEL
| change_label nv HEX LABEL
'''
"""voucher_transfer : change_label hv HEX HEX
| change_label nv HEX HEX
| change_label hv LABEL HEX
| change_label nv LABEL HEX
| change_label hv LABEL LABEL
| change_label nv LABEL LABEL
| change_label hv HEX LABEL
| change_label nv HEX LABEL
"""
o = p[1]
if o.c.value & 0x10 == 0:
raise ValueError("not a voucher command")
@ -249,74 +273,87 @@ def p_voucher_transfer(p):
def p_nv(p):
'nv : LABEL VALUE'
p[0] = (p[1], p[2],)
"nv : LABEL VALUE"
p[0] = (
p[1],
p[2],
)
def p_hv(p):
'hv : HEX VALUE'
p[0] = (p[1], p[2],)
"hv : HEX VALUE"
p[0] = (
p[1],
p[2],
)
def p_change_label(p):
'''change_label : pair
| pairnoid
'''
"""change_label : pair
| pairnoid
"""
p[0] = p[1]
def p_pair(p):
'pair : LABEL ID'
"pair : LABEL ID"
o = Cmd(p[1])
o.i = p[2]
p[0] = o
def p_pairnoid(p):
'pairnoid : LABEL NOID'
"pairnoid : LABEL NOID"
o = Cmd(p[1])
o.i = str(uuid.uuid4())
p[0] = o
class Router:
def __init__(self):
self.__routes = {}
self.__r = {}
def register(self, cmd_id, fn):
self.__routes[cmd_id] = fn
def sync(self, uid):
o = self.__r[uid]
if o[1] == None:
return None
r = o[1][0].recv()
o = (o[0], None,)
o = (
o[0],
None,
)
o[0].join()
return r
def __wrap(self, uid, fn, cmd):
r = fn(cmd)
self.__r[uid][1][1].send(r)
def exec(self, cmd):
logg.debug("router exec {}".format(cmd))
if cmd.c & 0xa0 > 0:
if cmd.c & 0xA0 > 0:
return self.sync(cmd.i)
fn = self.__routes[cmd.c]
pi = Pipe(False)
po = Process(target=self.__wrap, args=(cmd.i, fn, cmd,))
self.__r[cmd.i] = (po, pi,)
po = Process(
target=self.__wrap,
args=(
cmd.i,
fn,
cmd,
),
)
self.__r[cmd.i] = (
po,
pi,
)
po.start()
po.join()
def finish(self):
print("syncing")
for k, v in self.__r.items():
@ -325,7 +362,6 @@ class Router:
logg.debug("synced " + k + ": " + r)
print("synced " + k + ": " + r)
def __del__(self):
self.finish()
@ -341,91 +377,169 @@ def printMessage(message):
print("+" + "-" * (box_width - 2) + "+")
def find_custodial_address(key_name):
directory = "custodialstore"
filename = f"{key_name}.json"
file_path = os.path.join(directory, filename)
# Check if the file exists
if os.path.isfile(file_path):
with open(file_path, "r") as f:
custodial_account = json.load(f)
return custodial_account["address"]
else:
return None
def remove_ansi_escape_codes(text):
return re.sub(r'\u001b\[.*?m', '', text)
return re.sub(r"\u001b\[.*?m", "", text)
def generate_private_key():
"""Generate a new private key."""
web3 = Web3()
account = web3.eth.account.create()
return account.address,w3.to_hex(account.key)
return account.address, w3.to_hex(account.key)
def store_key_in_keystore(private_key, key_name, address):
keystore_dir = "custodial_store"
def store_key_in_keystore(keystore_dir, private_key, key_name, address):
# Create the directory if it doesn't exist
if not os.path.exists(keystore_dir):
os.makedirs(keystore_dir)
os.makedirs(keystore_dir)
keystore = {
'key_name': key_name,
'private_key': private_key,
'address': address,
"key_name": key_name,
"private_key": private_key,
"address": address,
}
store_path = os.path.join(keystore_dir, f"{key_name}.json")
# Save to JSON file (simulated keystore)
with open(store_path, 'w') as f:
with open(store_path, "w") as f:
json.dump(keystore, f)
return store_path
def load_gas(address,nonce):
def load_gas(address):
command = (
f'cast send {address} '
f'--value {gas_topup} '
f'--private-key {master_private_key} '
f'--rpc-url {rpc} '
f' --json '
)
f"cast send {address} "
f"--value {gas_topup} "
f"--private-key {master_private_key} "
f"--rpc-url {rpc} "
f" --json "
)
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr)
raise subprocess.CalledProcessError(
result.returncode, command, output=result.stdout, stderr=result.stderr
)
message = f"Added {gas_topup} to {address}"
printMessage(message)
def create_custodialaccount():
# create custodial account endpoint
url = "http://localhost:5003/api/v2/account/create"
def store_voucher(voucher_creator,voucher_symbol,voucher_address):
headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json",
}
try:
response = requests.post(url, headers=headers)
# Check if the request was successful (status code 200)
if response.status_code == 200:
account = response.json()
public_key = account["result"]["publicKey"]
return public_key
else:
return None
except requests.exceptions.RequestException as e:
print("Error:", e)
return None
def do_custodial_token_transfer(transfer):
url = "http://localhost:5003/api/v2/token/transfer"
headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json",
}
data = {
"from": transfer.from_address,
"to": transfer.to_address,
"amount": str(transfer.amount),
"tokenAddress": transfer.token_address,
}
json_data = json.dumps(data)
try:
response = requests.post(url=url, headers=headers, data=json_data)
# Check if the request was successful (status code 200)
if response.status_code == 200:
transfer = response.json()
return transfer["result"]["trackingId"]
else:
print("Error:", response.json)
return None
except requests.exceptions.RequestException as e:
print("Error:", e)
return None
def store_voucher(voucher_creator, voucher_symbol, voucher_address):
vouchers = []
voucher = {
'voucher_address': voucher_address,
'owner': voucher_creator,
'symbol': voucher_symbol
"voucher_address": voucher_address,
"owner": voucher_creator,
"symbol": voucher_symbol,
}
vouchers.append(voucher)
try:
with open("vouchers.json", 'r') as f:
with open("vouchers.json", "r") as f:
existing_vouchers = json.load(f)
except (FileNotFoundError):
existing_vouchers = []
except FileNotFoundError:
existing_vouchers = []
for entry in existing_vouchers:
vouchers.append(entry)
with open("vouchers.json", 'w') as f:
with open("vouchers.json", "w") as f:
json.dump(vouchers, f)
def key_create_handler(cmd):
key_name = str(cmd.f).split(":")[1]
master_address = w3.eth.account.from_key(master_private_key)
nonce = w3.eth.get_transaction_count(master_address.address,'pending')
store_name = cmd.t
keystore_dir = "user_store"
if cmd.k is None:
address,private_key = generate_private_key()
address, private_key = generate_private_key()
else:
if cmd.k.startswith("0x"):
private_key = cmd.k[2:]
else:
private_key = cmd.k
address = w3.eth.account.from_key(private_key).address
address = w3.eth.account.from_key(private_key).address
load_gas(address,nonce)
store_key_in_keystore(private_key, key_name, address)
if store_name == "custodialstore":
address = create_custodialaccount()
if address is None:
raise ValueError("account address cannot be None")
private_key = None
keystore_dir = "custodialstore"
load_gas(address)
store_key_in_keystore(keystore_dir, private_key, key_name, address)
return address
@ -433,14 +547,14 @@ def voucher_create_handler(cmd):
name = cmd.n
symbol = cmd.s
random_ascii = ''.join(random.choices(string.ascii_letters, k=2)).upper()
random_ascii = "".join(random.choices(string.ascii_letters, k=2)).upper()
try:
with open("vouchers.json", 'r') as f:
with open("vouchers.json", "r") as f:
existing_vouchers = json.load(f)
except (FileNotFoundError):
existing_vouchers = []
except FileNotFoundError:
existing_vouchers = []
voucher_symbols = list(map(lambda symbol: symbol['symbol'], existing_vouchers))
voucher_symbols = list(map(lambda symbol: symbol["symbol"], existing_vouchers))
if symbol in voucher_symbols:
symbol = symbol + random_ascii
@ -450,113 +564,155 @@ def voucher_create_handler(cmd):
else:
private_key = master_private_key
#Command to create a voucher
publish_token = f'ge-publish --private-key {private_key} --json ' \
f'--rpc {rpc} --gas-fee-cap {gasCap} --chainid {chainId} ' \
f'p erc20 --name "{name}" --symbol "{symbol}"'
# Command to create a voucher
publish_token = (
f"ge-publish --private-key {private_key} --json "
f"--rpc {rpc} --gas-fee-cap {gas_cap} --chainid {chainId} "
f'p erc20 --name "{name}" --symbol "{symbol}"'
)
result = subprocess.run(publish_token, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, publish_token, output=result.stdout, stderr=result.stderr)
raise subprocess.CalledProcessError(
result.returncode, publish_token, output=result.stdout, stderr=result.stderr
)
output_lines = result.stderr.strip().split("\n")
deployment_result = output_lines[1]
try:
data = json.loads(deployment_result)
contract_address = data.get('contract_address', None)
data = json.loads(deployment_result)
contract_address = data.get("contract_address", None)
except json.JSONDecodeError as e:
print("Error parsing JSON:", e)
print("Error parsing JSON:", e)
store_voucher(private_key,symbol,contract_address)
store_voucher(private_key, symbol, contract_address)
#Command to add the token to the token index
# Command to add the token to the token index
add_token_to_index = (
f'cast send --private-key {master_private_key} '
f'--rpc-url {rpc} '
f'{token_index} '
f'"add(address)" {contract_address} '
f' --json '
)
f"cast send --private-key {master_private_key} "
f"--rpc-url {rpc} "
f"{token_index} "
f'"add(address)" {contract_address} '
f" --json "
)
#sleep for 5 second to allow chain to sync
# sleep for 5 second to allow chain to sync
time.sleep(5)
result2 = subprocess.run(add_token_to_index, shell=True, capture_output=True, text=True)
result2 = subprocess.run(
add_token_to_index, shell=True, capture_output=True, text=True
)
if result2.returncode != 0:
raise subprocess.CalledProcessError(result2.returncode, add_token_to_index, output=result2.stdout, stderr=result2.stderr)
raise subprocess.CalledProcessError(
result2.returncode,
add_token_to_index,
output=result2.stdout,
stderr=result2.stderr,
)
message = f"Voucher {name} created with address {contract_address} and symbol {symbol} and added to token index: {token_index}"
printMessage(message)
return contract_address
def voucher_transfer_handler(cmd):
#Amount to transfer
value = cmd.v # Amount to transfer
token_transfer = TokenTransfer()
#Token symbol to transfer
if str(cmd.a).startswith("NameAgent"):
voucher_name = str(cmd.a).split(":")[1]
# Amount to transfer
value = cmd.v # Amount to transfer
is_custodial_address = False
token_transfer.amount = value
# Token symbol to transfer
if str(cmd.a).startswith("NameAgent"):
voucher_name = str(cmd.a).split(":")[1]
with open("vouchers.json", "r") as file:
data = json.load(file)
data = json.load(file)
voucher_symbols = list(map(lambda symbol: symbol['symbol'], data))
voucher_address = list(map(lambda address: address['voucher_address'], data))
voucher_symbols = list(map(lambda symbol: symbol["symbol"], data))
voucher_address = list(map(lambda address: address["voucher_address"], data))
if voucher_name in voucher_symbols:
index = voucher_symbols.index(voucher_name)
s = voucher_address[index]
index = voucher_symbols.index(voucher_name)
token_transfer.token_address = voucher_address[index]
elif str(cmd.a).startswith("AddressAgent"):
s = "0x" + str(cmd.a).split(":")[1]
else:
raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.")
elif str(cmd.a).startswith("AddressAgent"):
token_transfer.token_address = "0x" + str(cmd.a).split(":")[1]
else:
raise ValueError(
f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'."
)
if str(cmd.t).startswith("NameAgent"):
if str(cmd.t).startswith("NameAgent"):
key_name = str(cmd.t).split(":")[1]
store_path = os.path.join("custodial_store", f"{key_name}.json")
with open(store_path, "r") as file:
data = json.load(file)
acct = w3.eth.account.from_key(data["private_key"])
to = acct.address
elif str(cmd.t).startswith("AddressAgent"):
custodial_address = find_custodial_address(key_name)
if custodial_address is not None:
to = custodial_address
token_transfer.to_address = to
else:
store_path = os.path.join("custodial_store", f"{key_name}.json")
with open(store_path, "r") as file:
data = json.load(file)
acct = w3.eth.account.from_key(data["private_key"])
to = acct.address
token_transfer.to_address = to
elif str(cmd.t).startswith("AddressAgent"):
to = "0x" + str(cmd.t).split(":")[1]
else:
raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.")
token_transfer.to_address = to
else:
raise ValueError(
f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'."
)
if str(cmd.f).startswith("NameAgent"):
if str(cmd.f).startswith("NameAgent"):
key_name = str(cmd.f).split(":")[1]
store_path = os.path.join("custodial_store", f"{key_name}.json")
with open(store_path, "r") as file:
data = json.load(file)
from_private_key = data["private_key"]
elif str(cmd.f).startswith("AddressAgent"):
custodial_address = find_custodial_address(key_name)
if custodial_address is not None:
is_custodial_address = True
token_transfer.from_address = custodial_address
else:
store_path = os.path.join("custodial_store", f"{key_name}.json")
with open(store_path, "r") as file:
data = json.load(file)
from_private_key = data["private_key"]
elif str(cmd.f).startswith("AddressAgent"):
from_private_key = "0x" + str(cmd.f).split(":")[1]
else:
raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.")
command = (
f'cast send --private-key {from_private_key} '
f'--rpc-url {rpc} '
f'{s} '
f'"transfer(address,uint256)" {to} {value}'
f' --json '
)
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr)
if result.stderr:
raise ValueError(f"Command failed with error: {result.stderr}")
amount_transfered = value / 10**6
data = json.loads(result.stdout)
message = f"Transfered {amount_transfered} {voucher_name} to {to} "
printMessage(message)
return data["transactionHash"]
else:
raise ValueError(
f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'."
)
amount_transfered = value / 10**6
if is_custodial_address:
tracking_id = do_custodial_token_transfer(token_transfer)
if tracking_id is not None:
message = f"Transfered {amount_transfered} {voucher_name} to {to} "
printMessage(message)
return tracking_id
else:
command = (
f"cast send --private-key {from_private_key} "
f"--rpc-url {rpc} "
f"{token_transfer.token_address} "
f'"transfer(address,uint256)" {to} {value}'
f" --json "
)
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode, command, output=result.stdout, stderr=result.stderr
)
if result.stderr:
raise ValueError(f"Command failed with error: {result.stderr}")
data = json.loads(result.stdout)
message = f"Transfered {amount_transfered} {voucher_name} to {to} "
printMessage(message)
return data["transactionHash"]
def voucher_mint_handler(cmd):
@ -565,45 +721,51 @@ def voucher_mint_handler(cmd):
key_name = str(cmd.t).split(":")[1]
store_path = os.path.join("custodial_store", f"{key_name}.json")
with open(store_path, "r") as file:
data = json.load(file)
data = json.load(file)
acct = w3.eth.account.from_key(data["private_key"])
to = acct.address
elif str(cmd.t).startswith("AddressAgent"):
to = "0x" + str(cmd.t).split(":")[1]
else:
raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.")
raise ValueError(
f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'."
)
if str(cmd.a).startswith("NameAgent"):
voucher_symbol = str(cmd.a).split(":")[1]
voucher_symbol = str(cmd.a).split(":")[1]
with open("vouchers.json", "r") as file:
data = json.load(file)
data = json.load(file)
voucher_symbols = list(map(lambda symbol: symbol['symbol'], data))
voucher_address = list(map(lambda address: address['voucher_address'], data))
voucher_owners = list(map(lambda address: address['owner'], data))
voucher_symbols = list(map(lambda symbol: symbol["symbol"], data))
voucher_address = list(map(lambda address: address["voucher_address"], data))
voucher_owners = list(map(lambda address: address["owner"], data))
if voucher_symbol in voucher_symbols:
index = voucher_symbols.index(voucher_symbol)
s = voucher_address[index]
privatekey = voucher_owners[index]
index = voucher_symbols.index(voucher_symbol)
s = voucher_address[index]
privatekey = voucher_owners[index]
else:
raise ValueError(f"Voucher with symbol {voucher_symbol} was not found")
elif str(cmd.a).startswith("AddressAgent"):
s = "0x" + str(cmd.a).split(":")[1]
else:
raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.")
raise ValueError(
f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'."
)
command = (
f'cast send --private-key {privatekey} '
f'--rpc-url {rpc} '
f' {s} '
f"cast send --private-key {privatekey} "
f"--rpc-url {rpc} "
f" {s} "
f'"mintTo(address,uint256)" {to} {value}'
f' --json '
f" --json "
)
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr)
raise subprocess.CalledProcessError(
result.returncode, command, output=result.stdout, stderr=result.stderr
)
if result.stderr:
raise ValueError(f"Command failed with error: {result.stderr}")
data = json.loads(result.stdout)
@ -615,9 +777,6 @@ def voucher_mint_handler(cmd):
return data["transactionHash"]
parser = yacc.yacc()
running = True
@ -627,14 +786,14 @@ class FileGet:
def __init__(self, o, fp):
self.__running = True
self.__o = o
self.__f = open(fp, 'r')
self.__f = open(fp, "r")
def run(self):
while self.__running:
v = ifc.get()
if v == None:
break
v = v.strip('\n')
v = v.strip("\n")
if len(v) == 0:
break
r = parser.parse(v)
@ -657,15 +816,17 @@ class WaitGet:
v = r[0][0].recv()
if v == None:
break
v = v.strip('\n')
v = v.strip("\n")
if len(v) == 0:
break
r = parser.parse(v)
self.__o.exec(r)
def run(self):
(fo, fi,) = Pipe()
(
fo,
fi,
) = Pipe()
p = Process(target=self.__process, args=(fo,))
p.start()
while self.__running:
@ -675,16 +836,16 @@ class WaitGet:
break
fi.send(v)
p.join()
logg.debug('waitget run end')
logg.debug("waitget run end")
if __name__ == '__main__':
if __name__ == "__main__":
ifc = None
o = Router()
o.register(CmdId.KEY_CREATE,key_create_handler)
o.register(CmdId.VOUCHER_CREATE,voucher_create_handler)
o.register(CmdId.KEY_CREATE, key_create_handler)
o.register(CmdId.VOUCHER_CREATE, voucher_create_handler)
o.register(CmdId.VOUCHER_MINT, voucher_mint_handler)
o.register(CmdId.VOUCHER_TRANSFER,voucher_transfer_handler)
o.register(CmdId.VOUCHER_TRANSFER, voucher_transfer_handler)
if len(sys.argv) > 1:
ifc = FileGet(o, sys.argv[1])