1
2
3
4
5
6 """
7 module to fetch and stream changes from a database
8 """
9
10 from .utils import json
11
12
14 """\
15 Change stream object.
16
17 .. code-block:: python
18
19 from couchdbkit import Server from couchdbkit.changes import ChangesStream
20
21 s = Server()
22 db = s['testdb']
23 stream = ChangesStream(db)
24
25 print "got change now"
26 for c in stream:
27 print c
28
29 print "stream changes"
30 with ChangesStream(db, feed="continuous", heartbeat=True) as stream:
31 for c in stream: print c
32
33 """
34
36 self.db = db
37 self.params = params
38
41
44
46 r = self.db.res.get("_changes", **self.params)
47 with r.body_stream() as body:
48 while True:
49 line = body.readline()
50 if not line:
51 break
52 if line.endswith("\r\n"):
53 line = line[:-2]
54 else:
55 line = line[:-1]
56 if not line:
57
58 continue
59
60 if line.endswith(","):
61 line = line[:-1]
62 ret = self._parse_change(line)
63 if not ret:
64 continue
65 yield ret
66
68 if line.startswith('{"results":') or line.startswith('"last_seq'):
69 return None
70 else:
71 try:
72 obj = json.loads(line)
73 return obj
74 except ValueError:
75 return None
76
79
80
81 -def fold(db, fun, acc, since=0):
82 """Fold each changes and accuumulate result using a function
83
84 Args:
85
86 @param db: Database, a database object
87 @param fun: function, a callable with arity 2
88 @param since: int, sequence where to start the feed
89
90 @return: list, last acc returned
91
92 Ex of function:
93
94 fun(change_object,acc):
95 return acc
96
97 If the function return "stop", the changes feed will stop.
98
99
100 """
101
102 if not callable(fun):
103 raise TypeError("fun isn't a callable")
104
105 with ChangesStream(db, since=since) as st:
106 for c in st:
107 acc = fun(c, acc)
108 return acc
109
110
112 """Iter each changes and pass it to the callable"""
113
114 if not callable(fun):
115 raise TypeError("fun isn't a callable")
116
117 with ChangesStream(db, since=since) as st:
118 for c in st:
119 fun(c)
120