Source code for himbeerecouch.rpc

from multiprocessing.connection import Listener, Client
import multiprocessing as _mp
from threading import Thread
from .log import log
from .database import get_acct
from .util import stack_trace
import traceback
import string
import random
import logging
import os
import signal
import json

[docs]def id_generator(size=6, chars=string.ascii_uppercase + string.digits): """ Generates random ID, used for private shared key between processes """ return ''.join(random.choice(chars) for _ in range(size))
_server = ('', 17000) _authkey = id_generator(64) class RPCObject(object): def __init__(self): def _output_handler(sig, fn): self.output_handler(sig, fn) signal.signal(signal.SIGUSR1, _output_handler) def output_handler(self, sn, fr): """Dumping current stack information: {} Complete""".format('\n '.join(stack_trace(self.output_handler))))
[docs]class RPCServer(RPCObject): """ RPC (Remote-Procedure-Call server) Calls remote procedures on all child processes """ def __init__(self, address, authkey): super(RPCServer, self).__init__() self._clients = {} self._server_c = Listener(address, authkey=authkey) def output_handler(self, sn, fr): super(RPCServer, self).output_handler(sn, fr) for c in self._clients: try: os.kill(self._clients[c]["pid"], signal.SIGUSR1) except: pass def accept_connection(self, connections): for x in range(connections): client_c = self._server_c.accept() client_name = json.loads(client_c.recv()) self._clients[client_c] = client_name def __getattr__(self, name): def do_rpc(*args, **kwargs): results = {} for c in self._clients: client_name = self._clients[c]["name"] try: c.send((name, args, kwargs)) results[client_name] = c.recv() except: results[client_name] = traceback.format_exc() for _, v in results.items(): if isinstance(v, Exception): raise v return results return do_rpc
[docs]class RPCProxy(RPCObject): """ RPCProxy, object running in child processes that connects to an :class:`RPCServer` object. """ def __init__(self, address, authkey): super(RPCProxy, self).__init__() self._functions = { } self._quitnotifiers = set() self._conn = Client(address, authkey=authkey) self._conn.send(json.dumps({"name" : _mp.current_process().name, "pid" : os.getpid()})) self._should_exit = False self.register_function(self.exit_now, "exit") def _exit(*args): self.exit_now() signal.signal(signal.SIGINT, _exit) signal.signal(signal.SIGTERM, _exit) def listen(self): def handle_client(client_c): while not self._should_exit: try: func_name, args, kwargs = client_c.recv() except EOFError: self.exit_now() break try: r = self._functions[func_name](*args,**kwargs) client_c.send(r) except Exception as e: client_c.send(e) self.t = Thread(target=handle_client, args=(self._conn,)) self.t.daemon = True self.t.start() def register_exit_notification(self, func): self._quitnotifiers.add(func) def remove_exit_notification(self, func): try: self._quitnotifiers.remove(func) except KeyError: pass def exit_now(self): log("Exit requested") self._should_exit = True for f in self._quitnotifiers: f() return True def should_exit(self): return self._should_exit def register_function(self, func, name = None): if name is None: name = func.__name__ self._functions[name] = func
class RaspProxyProcess(RPCProxy): def __init__(self): RPCProxy.__init__(self, _server,authkey=_authkey) class RaspServerProcess(RPCServer): def __init__(self): RPCServer.__init__(self, _server,authkey=_authkey)
[docs]class ProcessImporter(object): """ Handles importing classes/modules from code in database. See :func:`start_new_process` for an example of how this is used. """ def __init__(self, modules, exported_commands = None): self._modules = modules if exported_commands is None: self._exported_commands = {} else: self._exported_commands = exported_commands self._already_exported = [] def find_module(self, fullname, path=None): if fullname in self._modules: return self return None def load_module(self, name): import sys if name in sys.modules and name not in self._already_exported: return sys.modules[name] import imp mod = imp.new_module(name) exec self._modules[name] in mod.__dict__ for cmd in self._exported_commands: mod.__dict__[cmd] = self._exported_commands[cmd] sys.modules[name] = mod self._already_exported.append(name) return mod
[docs]def start_new_process(name, code): """ Start new child process :param name: Name of child process :type name: str :param code: code to run, dictionary of modules (in string format) :type code: dict :returns: (multiprocessing.Process, multiprocessing.Queue) The Queue returned has the result returned from the child process:: { "ok" : True } when everything ok, and:: { "error" : ...traceback... } when not. """ def _new_proc(q): import sys o = RaspProxyProcess() sys.meta_path.append( ProcessImporter(code, { "log" : log, "get_acct" : get_acct, "should_quit" : o.should_exit, "register_quit_notification" : o.register_exit_notification, "remove_quit_notification" : o.remove_exit_notification } ) ) o.listen() try: import main main.main() q.put({"ok" : True}) except: q.put({"error" : traceback.format_exc()}) q = _mp.Queue() t = _mp.Process(name=name, target=_new_proc, args=(q,)) t.daemon = True t.start() return t, q