1
2
3
4
5
6 from __future__ import with_statement
7
8 from .base import ConsumerBase, check_callable
9 from ..utils import json
10
11 __all__ = ['SyncConsumer']
12
14
16 if cb is not None:
17 check_callable(cb)
18
19 params.update({"feed": "longpoll"})
20 resp = self.db.res.get("_changes", **params)
21 buf = ""
22 with resp.body_stream() as body:
23 while True:
24 data = body.read()
25 if not data:
26 break
27 buf += data
28
29 ret = json.loads(buf)
30 if cb is not None:
31 cb(ret)
32 return
33
34 return ret
35
36 - def wait(self, cb, **params):
37 check_callable(cb)
38 params.update({"feed": "continuous"})
39 resp = self.db.res.get("_changes", **params)
40
41 with resp.body_stream() as body:
42 while True:
43 try:
44 line = body.readline()
45 if not line:
46 break
47 if line.endswith("\r\n"):
48 line = line[:-2]
49 else:
50 line = line[:-1]
51 if not line:
52 continue
53
54 cb(json.loads(line))
55 except (KeyboardInterrupt, SystemExit,):
56 break
57