Source code for pynedm.listen

# Reset any stop listening flags
import time as _ti
import requests as _req
import httplib as _http
from .utils import should_stop, log, exception
import traceback

class ShouldStop(Exception):
    """
    Raised when the loop should be stopped
    """
    pass

[docs]def _watch_changes_feed(adb, fd, verbose): """ _watch_changes_feed is a hidden function that performs all the work watching the change feed, it makes use of the filter function: execute_commands/execute_commands to ensure that it only reacts on a particular set of command keys. Documentation of that filter function is available `here <http://nedm-tum.github.io/nEDM-Interface/tutorial-couchdb_filter.html>`_: """ import threading as _th def _get_response(msg, retVal=None, ok = False): """ _get_response returns a dictionary with a msg and a timestamp for insertion into the db """ ad = { "response" : { "content" : msg, "timestamp" : _ti.strftime("%a, %d %b %Y %H:%M:%S +0000", _ti.gmtime()), "return" : retVal } } if ok: ad["response"]["ok"] = True return ad def _fire_single_thread(des, fd, label, args): try: retVal = fd[label](*args) des.put(upd, params=_get_response("'%s' success" % label, retVal, True)) except: des.put(upd, params=_get_response("Exception:\n{}".format(traceback.format_exc()))) def _heartbeat_thread(thedb): import uuid as _uuid anode = _uuid.getnode() des = thedb.design("nedm_default") adoc = { "type" : "heartbeat" } now = _ti.time() while not should_stop(): if _ti.time() - now >= 10: now = _ti.time() try: des.post("_update/insert_with_timestamp/heartbeat_" + str(anode), params=adoc) except: exception("Heartbeat exception") _ti.sleep(0.1) all_threads = [] des = adb.design("nedm_default") # Start Heartbeat thread heartbeat = _th.Thread(target=_heartbeat_thread, args=(adb,)) heartbeat.start() all_threads.append(heartbeat) #### connection_error = 0 if verbose: log("Waiting for command...") while 1: try: # Get changes feed and begin thread if should_stop(): raise ShouldStop() changes = adb.changes(params=dict(feed='continuous', heartbeat=2000, since='now', include_docs=True, only_commands=fd.keys(), filter="execute_commands/execute_commands"), emit_heartbeats=True ) for line in changes: if line is None and should_stop(): raise ShouldStop() if connection_error != 0: log("Connection reset after {} tries".format(connection_error)) connection_error = 0 if line is None: continue try: doc = line["doc"] upd = "_update/insert_with_timestamp/" + line["id"] label = doc["execute"] args = doc.get("arguments", []) if verbose: log(" command (%s) received" % label) if type(args) != type([]): raise Exception("'arguments' field must be a list") new_th = _th.Thread(target=_fire_single_thread, args=(des, fd, label, args)) new_th.start() all_threads.append(new_th) except: exception("Unexpected exception while listening") if verbose: log("Waiting for next command...") except (_req.exceptions.ChunkedEncodingError, _http.IncompleteRead): # Sometimes the changes feeds "stop" listening, so we can try restarting the feed log("Ignoring exception {}".format(traceback.format_exc())) pass except ShouldStop: break except: # all other errors? log("Seen unexpected error in changes feed: {}".format(traceback.format_exc())) connection_error += 1 _ti.sleep(1) if not should_stop(): stop_listening() for th in all_threads: while th.isAlive(): th.join(0.1)