Package couchdbkit :: Package consumer :: Module ceventlet
[hide private]
[frames] | no frames]

Source Code for Module couchdbkit.consumer.ceventlet

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of couchdbkit released under the MIT license.  
  4  # See the NOTICE for more information. 
  5   
  6  import traceback 
  7   
  8  import eventlet 
  9  from eventlet import event 
 10   
 11  from .base import check_callable 
 12  from .sync import SyncConsumer 
 13  from ..utils import json 
 14   
 15   
16 -class ChangeConsumer(object):
17 - def __init__(self, db, callback, **params):
18 self.process_change = callback 19 self.params = params 20 self.db = db 21 self.stop_event = event.Event()
22
23 - def wait(self):
24 eventlet.spawn_n(self._run) 25 self.stop_event.wait()
26
27 - def wait_async(self):
28 eventlet.spawn_n(self._run)
29
30 - def _run(self):
31 while True: 32 try: 33 resp = self.db.res.get("_changes", **self.params) 34 return self.consume(resp) 35 except (SystemExit, KeyboardInterrupt): 36 eventlet.sleep(5) 37 break 38 except: 39 traceback.print_exc() 40 eventlet.sleep(5) 41 break 42 self.stop_event.send(True)
43
44 - def consume(self, resp):
45 raise NotImplementedError
46
47 -class ContinuousChangeConsumer(ChangeConsumer):
48
49 - def consume(self, resp):
50 with resp.body_stream() as body: 51 while True: 52 line = body.readline() 53 if not line: 54 break 55 if line.endswith("\r\n"): 56 line = line[:-2] 57 else: 58 line = line[:-1] 59 if not line: 60 continue 61 self.process_change(line) 62 self.stop_event.send(True)
63 64
65 -class LongPollChangeConsumer(ChangeConsumer):
66
67 - def consume(self, resp):
68 with resp.body_stream() as body: 69 buf = [] 70 while True: 71 data = body.read() 72 if not data: 73 break 74 buf.append(data) 75 change = "".join(buf) 76 try: 77 change = json.loads(change) 78 except ValueError: 79 pass 80 self.process_change(change) 81 self.stop_event.send(True)
82 83
84 -class EventletConsumer(SyncConsumer):
85 - def __init__(self, db):
86 eventlet.monkey_patch(socket=True) 87 super(EventletConsumer, self).__init__(db)
88
89 - def _fetch(self, cb, **params):
90 resp = self.db.res.get("_changes", **params) 91 cb(resp.json_body)
92
93 - def fetch(self, cb=None, **params):
94 if cb is None: 95 return super(EventletConsumer, self).wait_once(**params) 96 eventlet.spawn_n(self._fetch, cb, **params)
97
98 - def wait_once(self, cb=None, **params):
99 if cb is None: 100 return super(EventletConsumer, self).wait_once(**params) 101 102 check_callable(cb) 103 params.update({"feed": "longpoll"}) 104 consumer = LongPollChangeConsumer(self.db, callback=cb, 105 **params) 106 consumer.wait()
107
108 - def wait(self, cb, **params):
109 params.update({"feed": "continuous"}) 110 consumer = ContinuousChangeConsumer(self.db, callback=cb, 111 **params) 112 consumer.wait()
113
114 - def wait_once_async(self, cb, **params):
115 check_callable(cb) 116 params.update({"feed": "longpoll"}) 117 consumer = LongPollChangeConsumer(self.db, callback=cb, 118 **params) 119 return consumer.wait_async()
120
121 - def wait_async(self, cb, **params):
122 check_callable(cb) 123 params.update({"feed": "continuous"}) 124 consumer = ContinuousChangeConsumer(self.db, callback=cb, 125 **params) 126 return consumer.wait_async()
127