From 7b9bff6541bbd2321e1eca65f8824f0b8276ce8f Mon Sep 17 00:00:00 2001 From: Carlosokumu Date: Wed, 11 Dec 2024 17:46:38 +0300 Subject: [PATCH] add transfer to custodial accounts --- parse.py | 693 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 427 insertions(+), 266 deletions(-) diff --git a/parse.py b/parse.py index c0a7b0f..3a9d3bb 100644 --- a/parse.py +++ b/parse.py @@ -14,6 +14,8 @@ import select from multiprocessing import Process from multiprocessing import Pipe import subprocess +import requests +import requests.adapters from web3 import Web3 from dotenv import load_dotenv @@ -21,54 +23,55 @@ logg = logging.getLogger() logg.setLevel(logging.DEBUG) - tokens = ( - 'ID', - 'NOID', - 'HEX', - 'LABEL', - 'VALUE', - 'END', + "ID", + "NOID", + "HEX", + "LABEL", + "VALUE", + "END", ) -t_ID = r'[a-zA-Z0-9]+(-[a-zA-Z0-9]+)+' -t_NOID = r'\-' -t_LABEL = r'[a-zA-Z_]+' -t_VALUE = r'\d+' -t_END = r'\n' -t_HEX = r'0x([a-fA-F0-9])+' +t_ID = r"[a-zA-Z0-9]+(-[a-zA-Z0-9]+)+" +t_NOID = r"\-" +t_LABEL = r"[a-zA-Z_]+" +t_VALUE = r"\d+" +t_END = r"\n" +t_HEX = r"0x([a-fA-F0-9])+" -#def t_VALUE(t): +# def t_VALUE(t): # r'\d+' # t.value = int(t.value) -#def t_HEX(t): +# def t_HEX(t): # r'0x([a-fA-F0-9])+' - #print(t) - #try: - # t.value = bytes.fromhex(t.value) - #except: - # return False - #return t.value - #t.value = t.value +# print(t) +# try: +# t.value = bytes.fromhex(t.value) +# except: +# return False +# return t.value +# t.value = t.value # pass -t_ignore = ' \t' +t_ignore = " \t" + def t_error(t): print("problem: ", t.value[0]) t.lexer.skip(1) -lexer = lex.lex() +lexer = lex.lex() load_dotenv() -#Chain Params +# Chain Params chainId = os.getenv("CHAIN_ID") rpc = os.getenv("RPC") -gasCap = os.getenv("GAS_FEE_CAP") +bearer_token = os.getenv("BEARER_TOKEN") +gas_cap = os.getenv("GAS_FEE_CAP") master_private_key = os.getenv("MASTER_PRIVATE_KEY") @@ -78,26 +81,35 @@ gas_topup = os.getenv("GAS_TOPUP") w3 = Web3(Web3.HTTPProvider(rpc)) - # -#data = ''' -#FOOBAR uf-2etg 0xa3bdefa momo 123 +# data = ''' +# FOOBAR uf-2etg 0xa3bdefa momo 123 # -#BARBAR -#BAZ +# BARBAR +# BAZ # -#BASFB foo-bar-baz +# BASFB foo-bar-baz #''' # -#lexer.input(data) +# lexer.input(data) # -#while True: +# while True: # tok = lexer.token() # if not tok: # break # print(tok) +class TokenTransfer: + def __init__( + self, to_address=None, from_address=None, amount=None, token_address=None + ): + self.to_address = to_address + self.from_address = from_address + self.amount = amount + self.token_address = token_address + + class CmdId(enum.IntEnum): KEY_CREATE = 0x1 VOUCHER_CREATE = 0x10 @@ -110,14 +122,15 @@ class CmdId(enum.IntEnum): class Agent: def __str__(self): - return self.__class__.__name__ + ':' #{}'.format(self.__class__.__name__, self.v) + return ( + self.__class__.__name__ + ":" + ) # {}'.format(self.__class__.__name__, self.v) class AddressAgent(Agent): def __init__(self, v): self.v = bytes.fromhex(v[2:]) - def __str__(self): return Agent.__str__(self) + self.v.hex() @@ -129,8 +142,8 @@ class NameAgent(Agent): def __str__(self): return Agent.__str__(self) + self.v -class Cmd: +class Cmd: def __init__(self, cmd): self.c = CmdId[cmd] self.i = None @@ -144,9 +157,21 @@ class Cmd: self.s = None self.n = None - def __str__(self): - return "[{:02d}]{} i={} v={} a={} f={} t={} k={} d={} p={} s={} n={}".format(self.c, self.c.name, self.i, self.v, self.a, self.f, self.t, self.k, self.d, self.p, self.s, self.n) + return "[{:02d}]{} i={} v={} a={} f={} t={} k={} d={} p={} s={} n={}".format( + self.c, + self.c.name, + self.i, + self.v, + self.a, + self.f, + self.t, + self.k, + self.d, + self.p, + self.s, + self.n, + ) def to_store(v): @@ -163,44 +188,43 @@ def to_agent(v): def p_cmd(p): - '''cmd : key_create - | voucher_mint - | voucher_transfer - | voucher_create - | pair - ''' + """cmd : key_create + | voucher_mint + | voucher_transfer + | voucher_create + | pair + """ p[0] = p[1] def p_key_create(p): - '''key_create : change_label LABEL LABEL - | change_label LABEL LABEL HEX - ''' + """key_create : change_label LABEL LABEL + | change_label LABEL LABEL HEX + """ o = p[1] o.f = NameAgent(p[2]) o.t = to_store(p[3]) if len(p) > 4: o.k = p[4] p[0] = o - def p_voucher_mint(p): - '''voucher_mint : change_label hv - | change_label nv - ''' + """voucher_mint : change_label hv + | change_label nv + """ o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") o.v = int(p[2][1]) o.a = to_agent(p[2][0]) - p[0] = o + p[0] = o def p_voucher_create(p): - '''voucher_create : change_label LABEL LABEL VALUE - | change_label LABEL LABEL VALUE VALUE VALUE - ''' + """voucher_create : change_label LABEL LABEL VALUE + | change_label LABEL LABEL VALUE VALUE VALUE + """ o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") @@ -210,34 +234,34 @@ def p_voucher_create(p): if len(p) > 5: o.d = p[5] o.p = p[6] - p[0] = o + p[0] = o def p_voucher_mint_recipient(p): - '''voucher_mint : change_label hv HEX - | change_label nv HEX - | change_label hv LABEL - | change_label nv LABEL - ''' + """voucher_mint : change_label hv HEX + | change_label nv HEX + | change_label hv LABEL + | change_label nv LABEL + """ o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") o.v = int(p[2][1]) o.a = to_agent(p[2][0]) o.t = to_agent(p[3]) - p[0] = o + p[0] = o def p_voucher_transfer(p): - '''voucher_transfer : change_label hv HEX HEX - | change_label nv HEX HEX - | change_label hv LABEL HEX - | change_label nv LABEL HEX - | change_label hv LABEL LABEL - | change_label nv LABEL LABEL - | change_label hv HEX LABEL - | change_label nv HEX LABEL - ''' + """voucher_transfer : change_label hv HEX HEX + | change_label nv HEX HEX + | change_label hv LABEL HEX + | change_label nv LABEL HEX + | change_label hv LABEL LABEL + | change_label nv LABEL LABEL + | change_label hv HEX LABEL + | change_label nv HEX LABEL + """ o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") @@ -245,78 +269,91 @@ def p_voucher_transfer(p): o.a = to_agent(p[2][0]) o.t = to_agent(p[3]) o.f = to_agent(p[4]) - p[0] = o + p[0] = o def p_nv(p): - 'nv : LABEL VALUE' - p[0] = (p[1], p[2],) + "nv : LABEL VALUE" + p[0] = ( + p[1], + p[2], + ) def p_hv(p): - 'hv : HEX VALUE' - p[0] = (p[1], p[2],) + "hv : HEX VALUE" + p[0] = ( + p[1], + p[2], + ) def p_change_label(p): - '''change_label : pair - | pairnoid - ''' + """change_label : pair + | pairnoid + """ p[0] = p[1] def p_pair(p): - 'pair : LABEL ID' + "pair : LABEL ID" o = Cmd(p[1]) o.i = p[2] p[0] = o def p_pairnoid(p): - 'pairnoid : LABEL NOID' + "pairnoid : LABEL NOID" o = Cmd(p[1]) o.i = str(uuid.uuid4()) p[0] = o class Router: - def __init__(self): self.__routes = {} self.__r = {} - def register(self, cmd_id, fn): self.__routes[cmd_id] = fn - def sync(self, uid): o = self.__r[uid] if o[1] == None: return None r = o[1][0].recv() - o = (o[0], None,) + o = ( + o[0], + None, + ) o[0].join() return r - def __wrap(self, uid, fn, cmd): r = fn(cmd) self.__r[uid][1][1].send(r) - def exec(self, cmd): logg.debug("router exec {}".format(cmd)) - if cmd.c & 0xa0 > 0: + if cmd.c & 0xA0 > 0: return self.sync(cmd.i) fn = self.__routes[cmd.c] pi = Pipe(False) - po = Process(target=self.__wrap, args=(cmd.i, fn, cmd,)) - self.__r[cmd.i] = (po, pi,) + po = Process( + target=self.__wrap, + args=( + cmd.i, + fn, + cmd, + ), + ) + self.__r[cmd.i] = ( + po, + pi, + ) po.start() po.join() - def finish(self): print("syncing") for k, v in self.__r.items(): @@ -325,7 +362,6 @@ class Router: logg.debug("synced " + k + ": " + r) print("synced " + k + ": " + r) - def __del__(self): self.finish() @@ -335,112 +371,190 @@ def noop_handler(cmd): def printMessage(message): - box_width = len(message) + 4 - print("+" + "-" * (box_width - 2) + "+") - print("| " + message + " |") - print("+" + "-" * (box_width - 2) + "+") + box_width = len(message) + 4 + print("+" + "-" * (box_width - 2) + "+") + print("| " + message + " |") + print("+" + "-" * (box_width - 2) + "+") +def find_custodial_address(key_name): + directory = "custodialstore" + filename = f"{key_name}.json" + file_path = os.path.join(directory, filename) + + # Check if the file exists + if os.path.isfile(file_path): + with open(file_path, "r") as f: + custodial_account = json.load(f) + return custodial_account["address"] + else: + return None + def remove_ansi_escape_codes(text): - return re.sub(r'\u001b\[.*?m', '', text) + return re.sub(r"\u001b\[.*?m", "", text) + def generate_private_key(): """Generate a new private key.""" web3 = Web3() - account = web3.eth.account.create() - return account.address,w3.to_hex(account.key) + account = web3.eth.account.create() + return account.address, w3.to_hex(account.key) -def store_key_in_keystore(private_key, key_name, address): - - keystore_dir = "custodial_store" + +def store_key_in_keystore(keystore_dir, private_key, key_name, address): # Create the directory if it doesn't exist if not os.path.exists(keystore_dir): - os.makedirs(keystore_dir) + os.makedirs(keystore_dir) keystore = { - 'key_name': key_name, - 'private_key': private_key, - 'address': address, + "key_name": key_name, + "private_key": private_key, + "address": address, } store_path = os.path.join(keystore_dir, f"{key_name}.json") - - # Save to JSON file (simulated keystore) - with open(store_path, 'w') as f: - json.dump(keystore, f) - - return store_path -def load_gas(address,nonce): + # Save to JSON file (simulated keystore) + with open(store_path, "w") as f: + json.dump(keystore, f) + + return store_path + + +def load_gas(address): command = ( - f'cast send {address} ' - f'--value {gas_topup} ' - f'--private-key {master_private_key} ' - f'--rpc-url {rpc} ' - f' --json ' - ) + f"cast send {address} " + f"--value {gas_topup} " + f"--private-key {master_private_key} " + f"--rpc-url {rpc} " + f" --json " + ) result = subprocess.run(command, shell=True, capture_output=True, text=True) if result.returncode != 0: - raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr) - + raise subprocess.CalledProcessError( + result.returncode, command, output=result.stdout, stderr=result.stderr + ) + message = f"Added {gas_topup} to {address}" printMessage(message) - - -def store_voucher(voucher_creator,voucher_symbol,voucher_address): + +def create_custodialaccount(): + # create custodial account endpoint + url = "http://localhost:5003/api/v2/account/create" + + headers = { + "Authorization": f"Bearer {bearer_token}", + "Content-Type": "application/json", + } + + try: + response = requests.post(url, headers=headers) + + # Check if the request was successful (status code 200) + if response.status_code == 200: + account = response.json() + public_key = account["result"]["publicKey"] + return public_key + else: + return None + except requests.exceptions.RequestException as e: + print("Error:", e) + return None + + +def do_custodial_token_transfer(transfer): + url = "http://localhost:5003/api/v2/token/transfer" + + headers = { + "Authorization": f"Bearer {bearer_token}", + "Content-Type": "application/json", + } + + data = { + "from": transfer.from_address, + "to": transfer.to_address, + "amount": str(transfer.amount), + "tokenAddress": transfer.token_address, + } + json_data = json.dumps(data) + + try: + response = requests.post(url=url, headers=headers, data=json_data) + # Check if the request was successful (status code 200) + if response.status_code == 200: + transfer = response.json() + return transfer["result"]["trackingId"] + else: + print("Error:", response.json) + return None + except requests.exceptions.RequestException as e: + print("Error:", e) + return None + + +def store_voucher(voucher_creator, voucher_symbol, voucher_address): vouchers = [] voucher = { - 'voucher_address': voucher_address, - 'owner': voucher_creator, - 'symbol': voucher_symbol + "voucher_address": voucher_address, + "owner": voucher_creator, + "symbol": voucher_symbol, } vouchers.append(voucher) try: - with open("vouchers.json", 'r') as f: + with open("vouchers.json", "r") as f: existing_vouchers = json.load(f) - except (FileNotFoundError): - existing_vouchers = [] - + except FileNotFoundError: + existing_vouchers = [] + for entry in existing_vouchers: vouchers.append(entry) - - with open("vouchers.json", 'w') as f: + + with open("vouchers.json", "w") as f: json.dump(vouchers, f) def key_create_handler(cmd): - key_name = str(cmd.f).split(":")[1] - master_address = w3.eth.account.from_key(master_private_key) - nonce = w3.eth.get_transaction_count(master_address.address,'pending') + key_name = str(cmd.f).split(":")[1] + store_name = cmd.t + + keystore_dir = "user_store" if cmd.k is None: - address,private_key = generate_private_key() + address, private_key = generate_private_key() else: if cmd.k.startswith("0x"): private_key = cmd.k[2:] else: private_key = cmd.k - address = w3.eth.account.from_key(private_key).address + address = w3.eth.account.from_key(private_key).address - load_gas(address,nonce) - store_key_in_keystore(private_key, key_name, address) + if store_name == "custodialstore": + address = create_custodialaccount() + if address is None: + raise ValueError("account address cannot be None") + private_key = None + keystore_dir = "custodialstore" + + load_gas(address) + + store_key_in_keystore(keystore_dir, private_key, key_name, address) return address - -def voucher_create_handler(cmd): + +def voucher_create_handler(cmd): name = cmd.n symbol = cmd.s - random_ascii = ''.join(random.choices(string.ascii_letters, k=2)).upper() + random_ascii = "".join(random.choices(string.ascii_letters, k=2)).upper() try: - with open("vouchers.json", 'r') as f: + with open("vouchers.json", "r") as f: existing_vouchers = json.load(f) - except (FileNotFoundError): - existing_vouchers = [] + except FileNotFoundError: + existing_vouchers = [] - voucher_symbols = list(map(lambda symbol: symbol['symbol'], existing_vouchers)) + voucher_symbols = list(map(lambda symbol: symbol["symbol"], existing_vouchers)) if symbol in voucher_symbols: symbol = symbol + random_ascii @@ -450,160 +564,208 @@ def voucher_create_handler(cmd): else: private_key = master_private_key - #Command to create a voucher - publish_token = f'ge-publish --private-key {private_key} --json ' \ - f'--rpc {rpc} --gas-fee-cap {gasCap} --chainid {chainId} ' \ - f'p erc20 --name "{name}" --symbol "{symbol}"' - + # Command to create a voucher + publish_token = ( + f"ge-publish --private-key {private_key} --json " + f"--rpc {rpc} --gas-fee-cap {gas_cap} --chainid {chainId} " + f'p erc20 --name "{name}" --symbol "{symbol}"' + ) + result = subprocess.run(publish_token, shell=True, capture_output=True, text=True) if result.returncode != 0: - raise subprocess.CalledProcessError(result.returncode, publish_token, output=result.stdout, stderr=result.stderr) - + raise subprocess.CalledProcessError( + result.returncode, publish_token, output=result.stdout, stderr=result.stderr + ) + output_lines = result.stderr.strip().split("\n") deployment_result = output_lines[1] try: - data = json.loads(deployment_result) - contract_address = data.get('contract_address', None) + data = json.loads(deployment_result) + contract_address = data.get("contract_address", None) except json.JSONDecodeError as e: - print("Error parsing JSON:", e) - - store_voucher(private_key,symbol,contract_address) - - #Command to add the token to the token index + print("Error parsing JSON:", e) + + store_voucher(private_key, symbol, contract_address) + + # Command to add the token to the token index add_token_to_index = ( - f'cast send --private-key {master_private_key} ' - f'--rpc-url {rpc} ' - f'{token_index} ' - f'"add(address)" {contract_address} ' - f' --json ' - ) - - #sleep for 5 second to allow chain to sync + f"cast send --private-key {master_private_key} " + f"--rpc-url {rpc} " + f"{token_index} " + f'"add(address)" {contract_address} ' + f" --json " + ) + + # sleep for 5 second to allow chain to sync time.sleep(5) - result2 = subprocess.run(add_token_to_index, shell=True, capture_output=True, text=True) + result2 = subprocess.run( + add_token_to_index, shell=True, capture_output=True, text=True + ) if result2.returncode != 0: - raise subprocess.CalledProcessError(result2.returncode, add_token_to_index, output=result2.stdout, stderr=result2.stderr) - + raise subprocess.CalledProcessError( + result2.returncode, + add_token_to_index, + output=result2.stdout, + stderr=result2.stderr, + ) + message = f"Voucher {name} created with address {contract_address} and symbol {symbol} and added to token index: {token_index}" printMessage(message) return contract_address - + + def voucher_transfer_handler(cmd): - #Amount to transfer - value = cmd.v # Amount to transfer + token_transfer = TokenTransfer() - #Token symbol to transfer - if str(cmd.a).startswith("NameAgent"): - voucher_name = str(cmd.a).split(":")[1] + # Amount to transfer + value = cmd.v # Amount to transfer + is_custodial_address = False + + token_transfer.amount = value + + # Token symbol to transfer + if str(cmd.a).startswith("NameAgent"): + voucher_name = str(cmd.a).split(":")[1] with open("vouchers.json", "r") as file: - data = json.load(file) + data = json.load(file) - voucher_symbols = list(map(lambda symbol: symbol['symbol'], data)) - voucher_address = list(map(lambda address: address['voucher_address'], data)) + voucher_symbols = list(map(lambda symbol: symbol["symbol"], data)) + voucher_address = list(map(lambda address: address["voucher_address"], data)) if voucher_name in voucher_symbols: - index = voucher_symbols.index(voucher_name) - s = voucher_address[index] - - elif str(cmd.a).startswith("AddressAgent"): - s = "0x" + str(cmd.a).split(":")[1] - else: - raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") + index = voucher_symbols.index(voucher_name) + token_transfer.token_address = voucher_address[index] - if str(cmd.t).startswith("NameAgent"): - key_name = str(cmd.t).split(":")[1] - store_path = os.path.join("custodial_store", f"{key_name}.json") - with open(store_path, "r") as file: - data = json.load(file) - acct = w3.eth.account.from_key(data["private_key"]) - to = acct.address - elif str(cmd.t).startswith("AddressAgent"): + elif str(cmd.a).startswith("AddressAgent"): + token_transfer.token_address = "0x" + str(cmd.a).split(":")[1] + else: + raise ValueError( + f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'." + ) + + if str(cmd.t).startswith("NameAgent"): + key_name = str(cmd.t).split(":")[1] + + custodial_address = find_custodial_address(key_name) + if custodial_address is not None: + to = custodial_address + token_transfer.to_address = to + else: + store_path = os.path.join("custodial_store", f"{key_name}.json") + with open(store_path, "r") as file: + data = json.load(file) + acct = w3.eth.account.from_key(data["private_key"]) + to = acct.address + token_transfer.to_address = to + elif str(cmd.t).startswith("AddressAgent"): to = "0x" + str(cmd.t).split(":")[1] - else: - raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") - - if str(cmd.f).startswith("NameAgent"): - key_name = str(cmd.f).split(":")[1] - store_path = os.path.join("custodial_store", f"{key_name}.json") - with open(store_path, "r") as file: - data = json.load(file) - from_private_key = data["private_key"] - elif str(cmd.f).startswith("AddressAgent"): + token_transfer.to_address = to + else: + raise ValueError( + f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'." + ) + + if str(cmd.f).startswith("NameAgent"): + key_name = str(cmd.f).split(":")[1] + custodial_address = find_custodial_address(key_name) + if custodial_address is not None: + is_custodial_address = True + token_transfer.from_address = custodial_address + else: + store_path = os.path.join("custodial_store", f"{key_name}.json") + with open(store_path, "r") as file: + data = json.load(file) + from_private_key = data["private_key"] + + elif str(cmd.f).startswith("AddressAgent"): from_private_key = "0x" + str(cmd.f).split(":")[1] - else: - raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") + else: + raise ValueError( + f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'." + ) - command = ( - f'cast send --private-key {from_private_key} ' - f'--rpc-url {rpc} ' - f'{s} ' - f'"transfer(address,uint256)" {to} {value}' - f' --json ' - ) + amount_transfered = value / 10**6 - result = subprocess.run(command, shell=True, capture_output=True, text=True) - if result.returncode != 0: - raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr) - if result.stderr: - raise ValueError(f"Command failed with error: {result.stderr}") - - amount_transfered = value / 10**6 + if is_custodial_address: + tracking_id = do_custodial_token_transfer(token_transfer) + if tracking_id is not None: + message = f"Transfered {amount_transfered} {voucher_name} to {to} " + printMessage(message) + return tracking_id + else: + command = ( + f"cast send --private-key {from_private_key} " + f"--rpc-url {rpc} " + f"{token_transfer.token_address} " + f'"transfer(address,uint256)" {to} {value}' + f" --json " + ) + result = subprocess.run(command, shell=True, capture_output=True, text=True) + if result.returncode != 0: + raise subprocess.CalledProcessError( + result.returncode, command, output=result.stdout, stderr=result.stderr + ) + if result.stderr: + raise ValueError(f"Command failed with error: {result.stderr}") + data = json.loads(result.stdout) + message = f"Transfered {amount_transfered} {voucher_name} to {to} " + printMessage(message) + return data["transactionHash"] - data = json.loads(result.stdout) - message = f"Transfered {amount_transfered} {voucher_name} to {to} " - printMessage(message) - return data["transactionHash"] - - - def voucher_mint_handler(cmd): value = cmd.v if str(cmd.t).startswith("NameAgent"): - key_name = str(cmd.t).split(":")[1] + key_name = str(cmd.t).split(":")[1] store_path = os.path.join("custodial_store", f"{key_name}.json") with open(store_path, "r") as file: - data = json.load(file) + data = json.load(file) acct = w3.eth.account.from_key(data["private_key"]) to = acct.address elif str(cmd.t).startswith("AddressAgent"): - to = "0x" + str(cmd.t).split(":")[1] + to = "0x" + str(cmd.t).split(":")[1] else: - raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") + raise ValueError( + f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'." + ) if str(cmd.a).startswith("NameAgent"): - voucher_symbol = str(cmd.a).split(":")[1] + voucher_symbol = str(cmd.a).split(":")[1] with open("vouchers.json", "r") as file: - data = json.load(file) + data = json.load(file) - voucher_symbols = list(map(lambda symbol: symbol['symbol'], data)) - voucher_address = list(map(lambda address: address['voucher_address'], data)) - voucher_owners = list(map(lambda address: address['owner'], data)) + voucher_symbols = list(map(lambda symbol: symbol["symbol"], data)) + voucher_address = list(map(lambda address: address["voucher_address"], data)) + voucher_owners = list(map(lambda address: address["owner"], data)) if voucher_symbol in voucher_symbols: - index = voucher_symbols.index(voucher_symbol) - s = voucher_address[index] - privatekey = voucher_owners[index] - else: + index = voucher_symbols.index(voucher_symbol) + s = voucher_address[index] + privatekey = voucher_owners[index] + else: raise ValueError(f"Voucher with symbol {voucher_symbol} was not found") elif str(cmd.a).startswith("AddressAgent"): - s = "0x" + str(cmd.a).split(":")[1] + s = "0x" + str(cmd.a).split(":")[1] else: - raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") - + raise ValueError( + f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'." + ) + command = ( - f'cast send --private-key {privatekey} ' - f'--rpc-url {rpc} ' - f' {s} ' + f"cast send --private-key {privatekey} " + f"--rpc-url {rpc} " + f" {s} " f'"mintTo(address,uint256)" {to} {value}' - f' --json ' + f" --json " ) result = subprocess.run(command, shell=True, capture_output=True, text=True) if result.returncode != 0: - raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr) + raise subprocess.CalledProcessError( + result.returncode, command, output=result.stdout, stderr=result.stderr + ) if result.stderr: raise ValueError(f"Command failed with error: {result.stderr}") data = json.loads(result.stdout) @@ -615,9 +777,6 @@ def voucher_mint_handler(cmd): return data["transactionHash"] - - - parser = yacc.yacc() running = True @@ -627,14 +786,14 @@ class FileGet: def __init__(self, o, fp): self.__running = True self.__o = o - self.__f = open(fp, 'r') + self.__f = open(fp, "r") def run(self): while self.__running: v = ifc.get() if v == None: break - v = v.strip('\n') + v = v.strip("\n") if len(v) == 0: break r = parser.parse(v) @@ -642,8 +801,8 @@ class FileGet: def get(self): return self.__f.readline() - - + + class WaitGet: def __init__(self, o, *r): self.__running = True @@ -657,15 +816,17 @@ class WaitGet: v = r[0][0].recv() if v == None: break - v = v.strip('\n') + v = v.strip("\n") if len(v) == 0: break r = parser.parse(v) self.__o.exec(r) - def run(self): - (fo, fi,) = Pipe() + ( + fo, + fi, + ) = Pipe() p = Process(target=self.__process, args=(fo,)) p.start() while self.__running: @@ -675,16 +836,16 @@ class WaitGet: break fi.send(v) p.join() - logg.debug('waitget run end') + logg.debug("waitget run end") -if __name__ == '__main__': +if __name__ == "__main__": ifc = None o = Router() - o.register(CmdId.KEY_CREATE,key_create_handler) - o.register(CmdId.VOUCHER_CREATE,voucher_create_handler) + o.register(CmdId.KEY_CREATE, key_create_handler) + o.register(CmdId.VOUCHER_CREATE, voucher_create_handler) o.register(CmdId.VOUCHER_MINT, voucher_mint_handler) - o.register(CmdId.VOUCHER_TRANSFER,voucher_transfer_handler) + o.register(CmdId.VOUCHER_TRANSFER, voucher_transfer_handler) if len(sys.argv) > 1: ifc = FileGet(o, sys.argv[1])