Package couchdbkit :: Package consumer :: Module cgevent
[hide private]
[frames] | no frames]

Source Code for Module couchdbkit.consumer.cgevent

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of couchdbkit released under the MIT license.  
  4  # See the NOTICE for more information. 
  5   
  6  import traceback 
  7   
  8  import gevent 
  9  from gevent import monkey  
 10   
 11  from .base import check_callable 
 12  from .sync import SyncConsumer 
 13  from ..utils import json 
 14   
15 -class ChangeConsumer(gevent.Greenlet):
16 - def __init__(self, db, callback=None, **params):
17 gevent.Greenlet.__init__(self) 18 self.process_change = callback 19 self.params = params 20 self.db = db
21
22 - def _run(self):
23 while True: 24 try: 25 resp = self.db.res.get("_changes", **self.params) 26 return self.consume(resp) 27 except (SystemExit, KeyboardInterrupt): 28 gevent.sleep(5) 29 except: 30 traceback.print_exc() 31 gevent.sleep(5)
32
33 - def consume(self, resp):
34 raise NotImplementedError
35
36 -class ContinuousChangeConsumer(ChangeConsumer):
37
38 - def consume(self, resp):
39 with resp.body_stream() as body: 40 while True: 41 line = body.readline() 42 if not line: 43 break 44 if line.endswith("\r\n"): 45 line = line[:-2] 46 else: 47 line = line[:-1] 48 if not line: 49 continue 50 self.process_change(line)
51
52 -class LongPollChangeConsumer(ChangeConsumer):
53
54 - def consume(self, resp):
55 with resp.body_stream() as body: 56 buf = [] 57 while True: 58 data = body.read() 59 if not data: 60 break 61 buf.append(data) 62 change = "".join(buf) 63 try: 64 change = json.loads(change) 65 except ValueError: 66 pass 67 self.process_change(change)
68 69
70 -class GeventConsumer(SyncConsumer):
71 - def __init__(self, db):
72 monkey.patch_socket() 73 super(GeventConsumer, self).__init__(db)
74
75 - def _fetch(self, cb, **params):
76 resp = self.db.res.get("_changes", **params) 77 cb(resp.json_body)
78
79 - def fetch(self, cb=None, **params):
80 if cb is None: 81 return super(GeventConsumer, self).wait_once(**params) 82 return gevent.spawn(self._fetch, cb, **params)
83
84 - def wait_once(self, cb=None, **params):
85 if cb is None: 86 return super(GeventConsumer, self).wait_once(**params) 87 88 check_callable(cb) 89 params.update({"feed": "longpoll"}) 90 LongPollChangeConsumer.spawn(self.db, callback=cb, 91 **params).join()
92
93 - def wait(self, cb, **params):
94 check_callable(cb) 95 params.update({"feed": "continuous"}) 96 ContinuousChangeConsumer.spawn(self.db, callback=cb, 97 **params).join()
98
99 - def wait_once_async(self, cb, **params):
100 check_callable(cb) 101 params.update({"feed": "longpoll"}) 102 return LongPollChangeConsumer.spawn(self.db, callback=cb, 103 **params)
104
105 - def wait_async(self, cb, **params):
106 check_callable(cb) 107 params.update({"feed": "continuous"}) 108 return ContinuousChangeConsumer.spawn(self.db, callback=cb, 109 **params)
110