Source code for pynedm.utils

# Global to stop
import logging
import os
import json
from .fileutils import AttachmentFile
from .exception import CommandCollision, PynEDMException, CommandError
from .log import (debug, log, error, exception, listening_addresses)

__all__ = ["ProcessObject", "stop_listening", "should_stop", "listen", "start_process" ]

_should_stop = False
[docs]class ProcessObject(object): """ Process object to listen for commands as well as interacting with the nedm databases. :param adb: name of database :param username: username :param password: password :param uri: address of server :param verbose: vebosity :type adb: str :type username: str :type password: str :type uri: str :type verbose: bool """ def __init__(self, uri=None, username=None, password=None, adb=None, verbose=False, **kw): import cloudant as _ca self._currentInfo = {} acct = kw.get("acct", None) if acct is None: acct = _ca.Account(uri=uri) if username and password: res = acct.login(username, password) if res.status_code != 200: raise PynEDMException("User credentials incorrect") self.isRunning = False self.verbose = verbose self.acct = acct self.db = adb
[docs] def write_document_to_db(self, adoc, db=None, ignoreErrors=True): """ Write a document to the database. :param adoc: dictionary to be return to the DB. :param db: database name :param ignoreErrors: if True, do not reraise errors :type adoc: dict :type db: str :type ignoreErrors: bool :returns: dict -- response from the server :raises: :class:`pynedm.exception.PynEDMException` """ try: if db is None: db = self.acct[self.db] else: db = self.acct[db] except: raise PynEDMException("Cannot write while not listening") try: return db.design("nedm_default").post("_update/insert_with_timestamp",params=adoc).json() except Exception as e: if ignoreErrors: log("Exception ({}) when posting doc({})".format(e,adoc)) return {} pass else: raise
def _attachment_path(self, docid, attachment_name, db=None): """ Helper function to grab the attachment path """ if db is None: db = self.db if db is None: raise PynEDMException("db must be defined") return '/'.join([self.acct.uri, '_attachments', db, docid, attachment_name])
[docs] def delete_file(self, docid, attachment_name, db=None): """ delete file associated with docid. :param docid: document id :param attachment_name: name of attachment :param db: name of database :type docid: str :type attachment_name: str :type db: str :returns: json response from server """ delete_url = self._attachment_path(docid, attachment_name, db) return self.acct.delete(delete_url).json()
[docs] def open_file(self, docid, attachment_name, db=None): """ open file for reading, allows reading ranges of data. :param docid: document id :param attachment_name: name of attachment :param db: name of database :type docid: str :type attachment_name: str :type db: str :returns: :class:`pynedm.fileutils.AttachmentFile` -- file-like object Following code example:: o = ProcessObject(...) _fn = "temp.out" _doc = "no_exist" _db = "nedm%2Fhg_laser" x = o.open_file(_doc, _fn, db=_db) y = x.read(4) print len(y), y # should be equal print x.read() x.seek(1) for i in x.iterate(10): print i """ download_url = self._attachment_path(docid, attachment_name, db) return AttachmentFile(self.acct[download_url])
[docs] def download_file(self, docid, attachment_name, db=None, chunk_size=100*1024, headers=None): """ download file associated with docid, yields the data in chunks first data yielded is the total expected size, the rest is the data from the file. :param docid: document id :param db: database name :param chunk_size: size of chunks to yield :param headers: HTTP headers forwarded to :mod:`requests` :type docid: str :type db: str :type chunk_size: int :type headers: dict :returns: dict -- response from the server Following code example:: from clint.textui.progress import Bar as ProgressBar total_size = None x = process_object.download_file("docid", "attachment", "mydb") bar = ProgressBar(expected_size=x.next(), filled_char='=') total = 0 with open("temp_file.out", "wb") as o: for ch in x: total += len(ch) bar.show(total) o.write(ch) o.flush() """ download_url = self._attachment_path(docid, attachment_name, db) if headers is None: headers = {} r = self.acct.get(download_url, stream=True, headers=headers) yield int(r.headers['content-length']) for chunk in r.iter_content(chunk_size=chunk_size): if chunk: yield chunk
[docs] def upload_file(self, file_or_name, docid, db=None,attachment_name=None,callback=None): """ Upload file associated with a particular doc id :param file_or_name: full path to file or file-like object :param docid: id of document :param db: name of database :param callback: upload callback, should be of form: func(size_read, total_size) :param attachment_name: name of attachment, otherwise name will be taken from file :type file_or_name: str or file :type docid: str :type db: str :type callback: func(size_read, total_size) :type attachment_name: str """ actual_file = file_or_name if not hasattr(file_or_name, "read"): # Assume it's a file-like object if not attachment_name: attachment_name = os.path.basename(file_or_name) actual_file = open(file_or_name, "rb") elif not attachment_name: raise PynEDMException("Must include attachment name for file-like objects") # Get file size actual_file.seek(0, 2) total_size = actual_file.tell() actual_file.seek(0) post_to_url = self._attachment_path(docid, attachment_name, db) cookies = '; '.join(['='.join(x) for x in self.acct._session.cookies.items()]) import pycurl from StringIO import StringIO class FileReader: def __init__(self, fp, callback = None): self.fp = fp self.total_read = 0 self.cbck = callback def read_callback(self, size): x = self.fp.read(size) if x is not None: self.total_read += len(x) if self.cbck: self.cbck(self.total_read, total_size) return x c = pycurl.Curl() storage = StringIO() c.setopt(pycurl.URL, post_to_url) c.setopt(pycurl.PUT, 1) c.setopt(pycurl.READFUNCTION, FileReader(actual_file, callback).read_callback) c.setopt(pycurl.INFILESIZE, total_size) c.setopt(c.WRITEFUNCTION, storage.write) c.setopt(c.COOKIE, cookies) c.perform() c.close() content = storage.getvalue() try: return json.loads(content) except: return { "error" : True, "content" : content }
[docs] def send_command(self, cmd_name, *args, **kwargs): """ Send command, raises exception if timeout or if an exception occurs in remotely-called function. :param cmd_name: Name of command :param args: arguments to command :param db: (optional) name of database :param timeout: (optional) how much time to wait, default 10000 (10 seconds) :type cmd_name: str :type db: str :type timeout: int :returns: return of remotely-called function :raises: :class:`pynedm.exception.CommandError` Following code example:: o = ProcessObject(...) # Gives by IP print o.send_command("temp-control.1.nedm1_d.ip_get") # Choosing another database, timeout. print o.send_command("getvoltage", 1, db="nedm%2Finternal_coils", timeout=4000) try: # Will raise error (not enough arguments) print o.send_command("getvoltage", db="nedm%2Finternal_coils", timeout=4000) except: traceback.print_exc() try: # Will raise timeout (command doesn't exist) print o.send_command("get_voltage", db="nedm%2Finternal_coils", timeout=4000) except: traceback.print_exc() """ db_name = kwargs.get("db", self.db) timeout = kwargs.get("timeout", 10000) db = self.acct[db_name] ret = self.write_document_to_db( { "type" : "command", "execute" : cmd_name, "arguments" : args }, db_name, ignoreErrors=False) if "ok" not in ret: raise CommandError("Error saving document") for l in db.changes(params=dict( filter="_doc_ids", timeout=timeout, feed="continuous", include_docs=True, doc_ids=[ ret["id"] ] )): if 'doc' not in l: continue if 'response' not in l['doc']: continue resp = l['doc']['response'] exc = "Exception" if resp["content"][:len(exc)] == exc: raise CommandError(resp["content"]) return resp["return"] raise CommandError("Timeout")
[docs] def wait(self): """ Wait until the current changes feed execution is complete. Execution can be stopped also by calling :func:`stop_listening` """ if "thread" not in self._currentInfo: return th = self._currentInfo["thread"] while th.isAlive(): th.join(0.1) if should_stop(): self.__remove_commands_doc() self.__remove_commands_doc()
def __check_keys(self,docid): import json db = self.acct[self.db] r = db.design("execute_commands").view("export_commands").get(params=dict(group_level=1)).json() all_keys = dict([(x["key"],x["value"]) for x in r["rows"]]) bad_keys = [k for k in all_keys if all_keys[k] > 1] if len(bad_keys) > 0: r = db.design("execute_commands").view("export_commands?reduce=false").post( params=dict(reduce=False), data=json.dumps(dict(keys=bad_keys))).json() url_to_use = db.uri_parts[0] + "://" + db.uri_parts[1] + "/_utils/document.html?" + db.uri_parts[2][1:] + "/" s = set([x["id"] for x in r["rows"] if (x["id"] != docid and x["key"] in bad_keys)]) conflict_str = "\nKey conflicts:\n{}\n\ncheck the following documents:\n{}".format('\n'.join(bad_keys), '\n'.join(map(lambda x: url_to_use + x, s))) if len(s) == 1: conflict_str += """ You have tried to use command keys that are in use! """ raise CommandCollision(conflict_str) def run(self, func_dic_copy, docid): if self.isRunning: return self.isRunning = True db = self.acct[self.db] from .listen import _watch_changes_feed import threading as _th self._currentInfo = { "doc_name": docid, "thread" : _th.Thread(target=_watch_changes_feed, args=(db, func_dic_copy, self.verbose)) } self.__check_keys(docid) self._currentInfo["thread"].daemon = True self._currentInfo["thread"].start() def stop_listening(self): self.__remove_commands_doc() def __remove_commands_doc(self): import requests as _req if not "doc_name" in self._currentInfo: return doc_name = self._currentInfo["doc_name"] db = self.acct[self.db] log("Removing commands doc {}".format(doc_name)) try: doc = db.document(doc_name) outp = doc.get().json() doc.delete(outp["_rev"]).raise_for_status() except _req.exceptions.ConnectionError: log("Error removing document, did the server die?") pass except PynEDMException as e: log("Unknown exception ({})".format(e)) pass del self._currentInfo["doc_name"] def __del__(self): stop_listening(True) self.wait()
def start_process(func, *args, **kwargs): import Queue as _q import threading as _th def wrap_f(q, *args, **kwargs): ret = func(*args, **kwargs) q.put(ret) q = _q.Queue() t = _th.Thread(target=wrap_f, args=(q,)+args, kwargs=kwargs) t.start() t.result = q return t
[docs]def stop_listening(stop=True): """ Request the listening to stop. Code blocked on :func:`ProcessObject.wait` will proceed. """ global _should_stop if not type(stop) == type(True): raise PynEDMException("Expected bool, received (%s)" % type(stop)) if stop and not _should_stop: debug("Stop Requested") _should_stop = stop
[docs]def should_stop(): """ Returns whether or not stop has been requested. :rtype: bool """ return _should_stop
[docs]def listen(function_dict,database,username=None, password=None, uri="http://localhost:5984", verbose=False, **kw ): """ Listen to database changes feed and execute commands when certain documents arrive. :param function_dict: dictionary of functions (values) with names (keys) :param database: name of database :param username: username :param password: password :param uri: address of server :param verbose: vebosity :type function_dict: dict :type database: str :type username: str :type password: str :type uri: str :type verbose: bool :rtype: :class:`ProcessObject` function_dict should look like the following:: adict = { "func_name1" : func1, "func_name2" : func2, } *or*, if explicitly passing in documentation strings:: adict = { "func_name1" : (func1, "my doc string"), "func_name2" : (func2, "my doc string for func2") } where of course the names can be more creative and func1/2 should be actually references to functions. """ stop_listening(False) # Handle interruption signals def _builtin_sighandler(sig, frame): log("Handler called {}, {}".format(sig, frame)) stop_listening() import signal try: signal.signal(signal.SIGINT, _builtin_sighandler) except ValueError: log("Not handling signals") # Now we start with the listen function import inspect as _ins import pydoc as _pyd import uuid as _uuid # Get the database information process_object = ProcessObject(uri, username, password, database) # build_dictionary document = { "uuid" : _uuid.getnode(), "type" : "export_commands", "keys" : {}, "log_servers" : listening_addresses() } # Copy function dictionary func_dic_copy = function_dict.copy() for k in function_dict: o = function_dict[k] exp_dic = {} if _ins.isfunction(o): # If we just have a function, use the doc string exp_dic = dict(Info=_pyd.plain(_pyd.text.document(o, k))) else: exp_dic = dict(Info=o[1]) func_dic_copy[k] = o[0] document["keys"][k] = exp_dic if verbose: log("Tracking the following commands: \n" + '\n '.join(function_dict.keys())) r = process_object.write_document_to_db(document) if not "ok" in r: raise PynEDMException("Error seen: {}".format(r)) process_object.run(func_dic_copy, r["id"]) return process_object