1
2
3
4
5
6 import traceback
7
8 import eventlet
9 from eventlet import event
10
11 from .base import check_callable
12 from .sync import SyncConsumer
13 from ..utils import json
14
15
17 - def __init__(self, db, callback, **params):
18 self.process_change = callback
19 self.params = params
20 self.db = db
21 self.stop_event = event.Event()
22
24 eventlet.spawn_n(self._run)
25 self.stop_event.wait()
26
28 eventlet.spawn_n(self._run)
29
31 while True:
32 try:
33 resp = self.db.res.get("_changes", **self.params)
34 return self.consume(resp)
35 except (SystemExit, KeyboardInterrupt):
36 eventlet.sleep(5)
37 break
38 except:
39 traceback.print_exc()
40 eventlet.sleep(5)
41 break
42 self.stop_event.send(True)
43
45 raise NotImplementedError
46
48
50 with resp.body_stream() as body:
51 while True:
52 line = body.readline()
53 if not line:
54 break
55 if line.endswith("\r\n"):
56 line = line[:-2]
57 else:
58 line = line[:-1]
59 if not line:
60 continue
61 self.process_change(line)
62 self.stop_event.send(True)
63
64
66
68 with resp.body_stream() as body:
69 buf = []
70 while True:
71 data = body.read()
72 if not data:
73 break
74 buf.append(data)
75 change = "".join(buf)
76 try:
77 change = json.loads(change)
78 except ValueError:
79 pass
80 self.process_change(change)
81 self.stop_event.send(True)
82
83
88
89 - def _fetch(self, cb, **params):
92
93 - def fetch(self, cb=None, **params):
97
107
108 - def wait(self, cb, **params):
113
120
127