cic-cli/cic/output.py
2021-10-25 18:55:42 +00:00

96 lines
2.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import os
import sys
import logging
import urllib.request
logg = logging.getLogger(__name__)
class OutputWriter:
def __init__(self, *args, **kwargs):
pass
def write(self, k, v):
raise NotImplementedError()
class StdoutWriter(OutputWriter):
def write(self, k, v):
sys.stdout.write('{}\t{}\n'.format(k, v))
class KVWriter(OutputWriter):
def __init__(self, path=None, *args, **kwargs):
try:
os.stat(path)
except FileNotFoundError:
os.makedirs(path)
self.path = path
def write(self, k, v):
fp = os.path.join(self.path, str(k))
logg.debug('path write {} {}'.format(fp, str(v)))
f = open(fp, 'wb')
f.write(v)
f.close()
class HTTPWriter(OutputWriter):
def __init__(self, path=None, *args, **kwargs):
super(HTTPWriter, self).__init__(*args, **kwargs)
self.path = path
def write(self, k, v):
path = self.path
if k != None:
path = os.path.join(path, k)
logg.debug('http writer post {}'.format(path))
rq = urllib.request.Request(path, method='POST', data=v)
r = urllib.request.urlopen(rq)
logg.info('http writer submitted at {}'.format(r.read()))
class KeyedWriter(OutputWriter):
def __init__(self, writer_keyed, writer_immutable):
self.writer_keyed = writer_keyed
self.writer_immutable = writer_immutable
def write(self, k, v):
logg.debug('writing keywriter {} {}'.format(k, v))
if isinstance(v, str):
v = v.encode('utf-8')
if self.writer_keyed != None:
self.writer_keyed.write(k, v)
if self.writer_immutable != None:
self.writer_immutable.write(None, v)
class KeyedWriterFactory:
def __init__(self, key_writer_constructor, immutable_writer_constructor, *args, **kwargs):
self.key_writer_constructor = key_writer_constructor
self.immutable_writer_constructor = immutable_writer_constructor
self.x = {}
for k in kwargs.keys():
logg.debug('adding key {} t keyed writer factory'.format(k))
self.x[k] = kwargs[k]
def new(self, path=None, *args, **kwargs):
writer_keyed = None
writer_immutable = None
if self.key_writer_constructor != None:
writer_keyed = self.key_writer_constructor(path, **self.x)
if self.immutable_writer_constructor != None:
writer_immutable = self.immutable_writer_constructor(path, **self.x)
return KeyedWriter(writer_keyed, writer_immutable)