1
2
3
4
5
6 import traceback
7
8 import gevent
9 from gevent import monkey
10
11 from .base import check_callable
12 from .sync import SyncConsumer
13 from ..utils import json
14
16 - def __init__(self, db, callback=None, **params):
17 gevent.Greenlet.__init__(self)
18 self.process_change = callback
19 self.params = params
20 self.db = db
21
23 while True:
24 try:
25 resp = self.db.res.get("_changes", **self.params)
26 return self.consume(resp)
27 except (SystemExit, KeyboardInterrupt):
28 gevent.sleep(5)
29 except:
30 traceback.print_exc()
31 gevent.sleep(5)
32
34 raise NotImplementedError
35
37
39 with resp.body_stream() as body:
40 while True:
41 line = body.readline()
42 if not line:
43 break
44 if line.endswith("\r\n"):
45 line = line[:-2]
46 else:
47 line = line[:-1]
48 if not line:
49 continue
50 self.process_change(line)
51
53
55 with resp.body_stream() as body:
56 buf = []
57 while True:
58 data = body.read()
59 if not data:
60 break
61 buf.append(data)
62 change = "".join(buf)
63 try:
64 change = json.loads(change)
65 except ValueError:
66 pass
67 self.process_change(change)
68
69
74
75 - def _fetch(self, cb, **params):
78
79 - def fetch(self, cb=None, **params):
83
92
93 - def wait(self, cb, **params):
98
104
110