Source code for pynedm.fileutils

from cloudant.resource import Resource
from .exception import PynEDMNoFile
import traceback

__all__ = [ "AttachmentFile" ]

[docs]class AttachmentFile(Resource): """ Provides a file-like object for handling document attachments without downloading them. Returned by :func:`pynedm.utils.ProcessObject.open_file` """ def __init__(self,req): self.req = req self.curr_pos = 0 try: self.head_info = self.req.head() self.head_info.raise_for_status() self.total_length = int(self.head_info.headers['content-length']) except Exception as e: raise PynEDMNoFile(str(e))
[docs] def seek(self, seekpos, whence=0): """ Seek to a position in the file :param seekpos: seek position :type seekpos: int :param whence: direction (0 - from beginning, 1 - relative to current position, 2 - from end) :type seekpos: int """ if whence == 1: seekpos += self.curr_pos elif whence == 2: seekpos = self.total_length - seekpos if seekpos > self.total_length: seekpos = self.total_length self.curr_pos = seekpos
[docs] def tell(self): """ Get the current position :returns: int - current position """ return self.curr_pos
[docs] def read(self, numbytes=-1): """ Read number of bytes from current position :param numbytes: number of bytes to read (< 0 reads remaining bytes) :type numbytes: int """ if numbytes < 0: numbytes = self.total_length to = self.curr_pos + numbytes - 1 if to >= self.total_length: to = self.total_length-1 if self.curr_pos >= to: return None try: t = self.req.get(headers={ "Range" : "bytes={}-{}".format(self.curr_pos, to) }) t.raise_for_status() self.seek(to+1) return t.content except Exception as e: raise PynEDMNoFile(str(e))
def __enter__(self): """ Allow 'with' calls. """ return self def __exit__(self, *args): """ Allow 'with' calls. """
[docs] def iterate(self, chunk_size): """ Iterates from this current position to the end of the file :param chunk_size: number of bytes in chunk to yield :type chunk_size: int """ while True: ri = self.read(chunk_size) if ri is None: break yield ri