import json import re import ply.lex as lex import ply.yacc as yacc import enum import uuid import os import logging import sys import select from multiprocessing import Process from multiprocessing import Pipe import subprocess from web3 import Web3 from dotenv import load_dotenv logg = logging.getLogger() logg.setLevel(logging.DEBUG) tokens = ( 'ID', 'NOID', 'HEX', 'LABEL', 'VALUE', 'END', ) t_ID = r'[a-zA-Z0-9]+(-[a-zA-Z0-9]+)+' t_NOID = r'\-' t_LABEL = r'[a-zA-Z_]+' t_VALUE = r'\d+' t_END = r'\n' t_HEX = r'0x([a-fA-F0-9])+' #def t_VALUE(t): # r'\d+' # t.value = int(t.value) #def t_HEX(t): # r'0x([a-fA-F0-9])+' #print(t) #try: # t.value = bytes.fromhex(t.value) #except: # return False #return t.value #t.value = t.value # pass t_ignore = ' \t' def t_error(t): print("problem: ", t.value[0]) t.lexer.skip(1) lexer = lex.lex() load_dotenv() #Chain params privatekey = os.getenv("PRIVATE_KEY") chainId = os.getenv("CHAIN_ID") rpc = os.getenv("RPC") gasCap = os.getenv("GAS_FEE_CAP") w3 = Web3(Web3.HTTPProvider(rpc)) # #data = ''' #FOOBAR uf-2etg 0xa3bdefa momo 123 # #BARBAR #BAZ # #BASFB foo-bar-baz #''' # #lexer.input(data) # #while True: # tok = lexer.token() # if not tok: # break # print(tok) class CmdId(enum.IntEnum): KEY_CREATE = 0x1 VOUCHER_CREATE = 0x10 VOUCHER_MINT = 0x11 VOUCHER_TRANSFER = 0x12 WAIT = 0x20 NEED = 0x21 PEEK = 0x80 class Agent: def __str__(self): return self.__class__.__name__ + ':' #{}'.format(self.__class__.__name__, self.v) class AddressAgent(Agent): def __init__(self, v): self.v = bytes.fromhex(v[2:]) def __str__(self): return Agent.__str__(self) + self.v.hex() class NameAgent(Agent): def __init__(self, v): self.v = v def __str__(self): return Agent.__str__(self) + self.v class Cmd: def __init__(self, cmd): self.c = CmdId[cmd] self.i = None self.a = None self.f = None self.t = None self.v = 0 self.k = None self.d = 0 self.p = 0 self.s = None self.n = None def __str__(self): return "[{:02d}]{} i={} v={} a={} f={} t={} k={} d={} p={} s={} n={}".format(self.c, self.c.name, self.i, self.v, self.a, self.f, self.t, self.k, self.d, self.p, self.s, self.n) def to_store(v): return str(v) def to_agent(v): r = None try: r = AddressAgent(v) except ValueError: r = NameAgent(v) return r def p_cmd(p): '''cmd : key_create | voucher_mint | voucher_transfer | voucher_create | pair ''' p[0] = p[1] def p_key_create(p): '''key_create : change_label LABEL LABEL | change_label LABEL LABEL HEX ''' o = p[1] o.f = NameAgent(p[2]) o.t = to_store(p[3]) if len(p) > 4: o.k = p[4] p[0] = o def p_voucher_mint(p): '''voucher_mint : change_label hv | change_label nv ''' o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") o.v = int(p[2][1]) o.a = to_agent(p[2][0]) p[0] = o def p_voucher_create(p): '''voucher_create : change_label LABEL LABEL VALUE | change_label LABEL LABEL VALUE VALUE VALUE ''' o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") o.s = p[2] o.n = p[3] o.v = int(p[4]) if len(p) > 5: o.d = p[5] o.p = p[6] p[0] = o def p_voucher_mint_recipient(p): '''voucher_mint : change_label hv HEX | change_label nv HEX | change_label hv LABEL | change_label nv LABEL ''' o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") o.v = int(p[2][1]) o.a = to_agent(p[2][0]) o.t = to_agent(p[3]) p[0] = o def p_voucher_transfer(p): '''voucher_transfer : change_label hv HEX HEX | change_label nv HEX HEX | change_label hv LABEL HEX | change_label nv LABEL HEX | change_label hv LABEL LABEL | change_label nv LABEL LABEL ''' o = p[1] if o.c.value & 0x10 == 0: raise ValueError("not a voucher command") o.v = int(p[2][1]) o.a = to_agent(p[2][0]) o.t = to_agent(p[3]) o.f = to_agent(p[4]) p[0] = o def p_nv(p): 'nv : LABEL VALUE' p[0] = (p[1], p[2],) def p_hv(p): 'hv : HEX VALUE' p[0] = (p[1], p[2],) def p_change_label(p): '''change_label : pair | pairnoid ''' p[0] = p[1] def p_pair(p): 'pair : LABEL ID' o = Cmd(p[1]) o.i = p[2] p[0] = o def p_pairnoid(p): 'pairnoid : LABEL NOID' o = Cmd(p[1]) o.i = str(uuid.uuid4()) p[0] = o class Router: def __init__(self): self.__routes = {} self.__r = {} def register(self, cmd_id, fn): self.__routes[cmd_id] = fn def sync(self, uid): o = self.__r[uid] if o[1] == None: return None r = o[1][0].recv() o = (o[0], None,) o[0].join() return r def __wrap(self, uid, fn, cmd): r = fn(cmd) self.__r[uid][1][1].send(r) def exec(self, cmd): logg.debug("router exec {}".format(cmd)) if cmd.c & 0xa0 > 0: return self.sync(cmd.i) fn = self.__routes[cmd.c] pi = Pipe(False) po = Process(target=self.__wrap, args=(cmd.i, fn, cmd,)) self.__r[cmd.i] = (po, pi,) po.start() def finish(self): print("syncing") for k, v in self.__r.items(): print("syncing key " + k) r = self.sync(k) logg.debug("synced " + k + ": " + r) print("synced " + k + ": " + r) def __del__(self): self.finish() def noop_handler(cmd): return str(cmd) def remove_ansi_escape_codes(text): return re.sub(r'\u001b\[.*?m', '', text) def generate_private_key(): """Generate a new private key.""" web3 = Web3() account = web3.eth.account.create() return account.address,w3.to_hex(account.key) def store_key_in_keystore(private_key, key_name, address): keystore_dir = "custodial_store" # Create the directory if it doesn't exist if not os.path.exists(keystore_dir): os.makedirs(keystore_dir) keystore = { 'key_name': key_name, 'private_key': private_key, 'address': address, } store_path = os.path.join(keystore_dir, f"{key_name}.json") # Save to JSON file (simulated keystore) with open(store_path, 'w') as f: json.dump(keystore, f) return store_path def store_voucher(voucher_creator,voucher_symbol,voucher_address): voucher = { voucher_symbol: voucher_address, 'owner': voucher_creator } with open("vouchers.json", 'w') as f: json.dump(voucher, f) def key_create_handler(cmd): key_name = str(cmd.f).split(":")[1] if cmd.k is None: address,private_key = generate_private_key() else: if private_key.startswith("0x"): private_key = private_key[2:] address = w3.eth.account.from_key(privatekey) store_key_in_keystore(private_key, key_name, address) return address def voucher_create_handler(cmd): name = cmd.n symbol = cmd.s if privatekey.startswith("0x"): private_key = privatekey[2:] command = f'ge-publish --private-key {private_key} ' \ f'--rpc {rpc} --gas-fee-cap {gasCap} --chainid {chainId} ' \ f'p erc20 --name "{name}" --symbol "{symbol}"' result = subprocess.run(command, shell=True, capture_output=True, text=True) if result.returncode != 0: raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr) output = result.stderr.strip().split("\n") contract_address = None for word in output[1].split(): if "contract_address=" in word: contract_address = word.split("=")[1] print("Voucher created with Address:",contract_address) store_voucher(privatekey,symbol,remove_ansi_escape_codes(contract_address)) return contract_address def voucher_transfer_handler(cmd): value = cmd.v # Amount to transfer if str(cmd.a).startswith("NameAgent"): voucher_name = str(cmd.a).split(":")[1] with open("vouchers.json", "r") as file: data = json.load(file) s = data[voucher_name] elif str(cmd.a).startswith("AddressAgent"): s = "0x" + str(cmd.a).split(":")[1] else: raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") if str(cmd.t).startswith("NameAgent"): key_name = str(cmd.t).split(":")[1] store_path = os.path.join("custodial_store", f"{key_name}.json") with open(store_path, "r") as file: data = json.load(file) acct = w3.eth.account.from_key(data["private_key"]) to = acct.address elif str(cmd.t).startswith("AddressAgent"): to = "0x" + str(cmd.t).split(":")[1] else: raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") command = ( f'cast send --private-key {privatekey} ' f'--rpc-url {rpc} ' f'{s} ' f'"transfer(address,uint256)" {to} {value}' f' --json ' ) result = subprocess.run(command, shell=True, capture_output=True, text=True) if result.returncode != 0: raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr) if result.stderr: raise ValueError(f"Command failed with error: {result.stderr}") data = json.loads(result.stdout) return data["transactionHash"] def voucher_mint_handler(cmd): value = cmd.v if str(cmd.t).startswith("NameAgent"): key_name = str(cmd.t).split(":")[1] store_path = os.path.join("custodial_store", f"{key_name}.json") with open(store_path, "r") as file: data = json.load(file) acct = w3.eth.account.from_key(data["private_key"]) to = acct.address elif str(cmd.t).startswith("AddressAgent"): to = "0x" + str(cmd.t).split(":")[1] else: raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") if str(cmd.a).startswith("NameAgent"): voucher_name = str(cmd.a).split(":")[1] with open("vouchers.json", "r") as file: data = json.load(file) s = data[voucher_name] privatekey = data["owner"] elif str(cmd.a).startswith("AddressAgent"): s = "0x" + str(cmd.a).split(":")[1] else: raise ValueError(f"Invalid command: {cmd.t}. Expected 'NameAgent' or 'AddressAgent'.") command = ( f'cast send --private-key {privatekey} ' f'--rpc-url {rpc} ' f' {s} ' f'"mintTo(address,uint256)" {to} {value}' f' --json ' ) result = subprocess.run(command, shell=True, capture_output=True, text=True) if result.returncode != 0: raise subprocess.CalledProcessError(result.returncode, command, output=result.stdout, stderr=result.stderr) if result.stderr: raise ValueError(f"Command failed with error: {result.stderr}") data = json.loads(result.stdout) return data["transactionHash"] def foo_handler(cmd): return os.popen('eth-info -p https://celo.grassecon.net').read() parser = yacc.yacc() running = True class FileGet: def __init__(self, o, fp): self.__running = True self.__o = o self.__f = open(fp, 'r') def run(self): while self.__running: v = ifc.get() if v == None: break v = v.strip('\n') if len(v) == 0: break r = parser.parse(v) self.__o.exec(r) def get(self): return self.__f.readline() class WaitGet: def __init__(self, o, *r): self.__running = True self.__o = o self.__f = r # TODO: router copy results in missing keys to sync when closing down def __process(self, f): while self.__running: r = select.select([f], [], []) v = r[0][0].recv() if v == None: break v = v.strip('\n') if len(v) == 0: break r = parser.parse(v) self.__o.exec(r) def run(self): (fo, fi,) = Pipe() p = Process(target=self.__process, args=(fo,)) p.start() while self.__running: v = input("> ") if v == "": fi.send(None) break fi.send(v) p.join() logg.debug('waitget run end') if __name__ == '__main__': ifc = None o = Router() o.register(CmdId.KEY_CREATE,foo_handler) o.register(CmdId.VOUCHER_CREATE,foo_handler) o.register(CmdId.VOUCHER_MINT, foo_handler) o.register(CmdId.VOUCHER_TRANSFER,foo_handler) if len(sys.argv) > 1: ifc = FileGet(o, sys.argv[1]) else: ifc = WaitGet(o, sys.stdin) ifc.run() o.finish()