Package couchdbkit :: Module changes
[hide private]
[frames] | no frames]

Source Code for Module couchdbkit.changes

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of couchdbkit released under the MIT license. 
  4  # See the NOTICE for more information. 
  5   
  6  """ 
  7  module to fetch and stream changes from a database 
  8  """ 
  9   
 10  from .utils import json 
 11   
 12   
13 -class ChangesStream(object):
14 """\ 15 Change stream object. 16 17 .. code-block:: python 18 19 from couchdbkit import Server from couchdbkit.changes import ChangesStream 20 21 s = Server() 22 db = s['testdb'] 23 stream = ChangesStream(db) 24 25 print "got change now" 26 for c in stream: 27 print c 28 29 print "stream changes" 30 with ChangesStream(db, feed="continuous", heartbeat=True) as stream: 31 for c in stream: print c 32 33 """ 34
35 - def __init__(self, db, **params):
36 self.db = db 37 self.params = params
38
39 - def __enter__(self):
40 return self
41
42 - def __exit__(self, *args):
43 return False
44
45 - def __iter__(self):
46 r = self.db.res.get("_changes", **self.params) 47 with r.body_stream() as body: 48 while True: 49 line = body.readline() 50 if not line: 51 break 52 if line.endswith("\r\n"): 53 line = line[:-2] 54 else: 55 line = line[:-1] 56 if not line: 57 #heartbeat 58 continue 59 60 if line.endswith(","): 61 line = line[:-1] 62 ret = self._parse_change(line) 63 if not ret: 64 continue 65 yield ret
66
67 - def _parse_change(self, line):
68 if line.startswith('{"results":') or line.startswith('"last_seq'): 69 return None 70 else: 71 try: 72 obj = json.loads(line) 73 return obj 74 except ValueError: 75 return None
76
77 - def __next__(self):
78 return self
79 80
81 -def fold(db, fun, acc, since=0):
82 """Fold each changes and accuumulate result using a function 83 84 Args: 85 86 @param db: Database, a database object 87 @param fun: function, a callable with arity 2 88 @param since: int, sequence where to start the feed 89 90 @return: list, last acc returned 91 92 Ex of function: 93 94 fun(change_object,acc): 95 return acc 96 97 If the function return "stop", the changes feed will stop. 98 99 100 """ 101 102 if not callable(fun): 103 raise TypeError("fun isn't a callable") 104 105 with ChangesStream(db, since=since) as st: 106 for c in st: 107 acc = fun(c, acc) 108 return acc
109 110
111 -def foreach(db, fun, since=0):
112 """Iter each changes and pass it to the callable""" 113 114 if not callable(fun): 115 raise TypeError("fun isn't a callable") 116 117 with ChangesStream(db, since=since) as st: 118 for c in st: 119 fun(c)
120