1
2
3
4
5
6
7 from .base import ConsumerBase
8
9 OLD_CONSUMER_URIS = dict(
10 eventlet = "couchdbkit.consumer.ceventlet.EventletConsumer",
11 gevent = "couchdbkit.consumer.cgevent.GeventConsumer",
12 sync = "couchdbkit.consumer.sync.SyncConsumer")
13
15 if uri in ('eventlet', 'gevent', 'sync'):
16 import warnings
17 warnings.warn(
18 "Short names for uri in consumer backend are deprecated.",
19 DeprecationWarning
20 )
21 uri = OLD_CONSUMER_URIS[uri]
22
23 components = uri.split('.')
24 klass = components.pop(-1)
25 mod = __import__('.'.join(components))
26 for comp in components[1:]:
27 mod = getattr(mod, comp)
28 return getattr(mod, klass)
29
31 """ Database change consumer
32
33 Example Usage:
34
35 >>> from couchdbkit import Server, Consumer
36 >>> s = Server()
37 >>> db = s['testdb']
38 >>> c = Consumer(db)
39 >>> def print_line(line):
40 ... print "got %s" % line
41 ...
42 >>> c.wait(print_line,since=0) # Go into receive loop
43
44 """
45
46 - def __init__(self, db, backend='couchdbkit.consumer.sync.SyncConsumer', **kwargs):
47 """ Constructor for the consumer
48
49 Args:
50 @param db: Database instance
51 @param backend: backend entry point uri
52 The default class (sync) erialize each call to registered
53 callbacks. Line processing should be fast in this case to not
54 wait on socket read.
55
56 A string referring to one of the following bundled classes:
57
58 * ``sync``
59 * ``eventlet`` - Requires eventlet >= 0.9.7
60 * ``gevent`` - Requires gevent >= 0.12.2 (?)
61
62 You can optionnaly register in ``couchdbkit.consumers``entry point
63 your own worker.
64 """
65 self.db = db
66 self.consumer_class = load_consumer_class(backend)
67 self._consumer = self.consumer_class(db, **kwargs)
68
69 - def fetch(self, cb=None, **params):
70 """ Fetch all changes and return. If since is specified, fetch all changes
71 since this doc sequence
72
73 Args:
74 @param params: kwargs
75 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
76
77 @return: dict, change result
78
79 """
80 return self._consumer.fetch(cb=cb, **params)
81
83 """Wait for one change and return (longpoll feed)
84
85 Args:
86 @param params: kwargs
87 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
88
89 @return: dict, change result
90 """
91
92 return self._consumer.wait_once(cb=cb, **params)
93
94 - def wait(self, cb, **params):
95 """ Wait for changes until the connection close (continuous feed)
96
97 Args:
98 @param params: kwargs
99 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
100
101 @return: dict, line of change
102 """
103 return self._consumer.wait(cb, **params)
104
106 """ like wait_once but doesn't return anything. """
107 return self._consumer.wait_once_async(cb=cb, **params)
108
110 """ like wait but doesn't return anything. """
111 return self._consumer.wait_async(cb, **params)
112