API Docs

pyee

pyee supplies a EventEmitter class that is similar to the EventEmitter class from Node.js. In addition, it supplies the subclasses AsyncIOEventEmitter, TwistedEventEmitter and ExecutorEventEmitter for supporting async and threaded execution with asyncio, twisted, and concurrent.futures Executors respectively, as supported by the environment.

Example

In [1]: from pyee.base import EventEmitter

In [2]: ee = EventEmitter()

In [3]: @ee.on('event')
   ...: def event_handler():
   ...:     print('BANG BANG')
   ...:

In [4]: ee.emit('event')
BANG BANG

In [5]:

EventEmitter

The base event emitter class. All other event emitters inherit from this class.

Most events are registered with an emitter via the on and once methods, and fired with the emit method. However, pyee event emitters have two special events:

  • new_listener: Fires whenever a new listener is created. Listeners for this event do not fire upon their own creation.

  • error: When emitted raises an Exception by default, behavior can be overridden by attaching callback to the event.

For example:

@ee.on('error')
def on_error(message):
    logging.err(message)

ee.emit('error', Exception('something blew up'))

All callbacks are handled in a synchronous, blocking manner. As in node.js, raised exceptions are not automatically handled for you---you must catch your own exceptions, and treat them accordingly.

Source code in pyee/base.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class EventEmitter:
    """The base event emitter class. All other event emitters inherit from
    this class.

    Most events are registered with an emitter via the `on` and `once`
    methods, and fired with the `emit` method. However, pyee event emitters
    have two *special* events:

    - `new_listener`: Fires whenever a new listener is created. Listeners for
      this event do not fire upon their own creation.

    - `error`: When emitted raises an Exception by default, behavior can be
      overridden by attaching callback to the event.

      For example:

    ```py
    @ee.on('error')
    def on_error(message):
        logging.err(message)

    ee.emit('error', Exception('something blew up'))
    ```

    All callbacks are handled in a synchronous, blocking manner. As in node.js,
    raised exceptions are not automatically handled for you---you must catch
    your own exceptions, and treat them accordingly.
    """

    def __init__(self) -> None:
        self._events: Dict[
            str,
            "OrderedDict[Callable, Callable]",
        ] = dict()
        self._lock: Lock = Lock()

    def __getstate__(self) -> Mapping[str, Any]:
        state = self.__dict__.copy()
        del state["_lock"]
        return state

    def __setstate__(self, state: Mapping[str, Any]) -> None:
        self.__dict__.update(state)
        self._lock = Lock()

    def on(
        self, event: str, f: Optional[Handler] = None
    ) -> Union[Handler, Callable[[Handler], Handler]]:
        """Registers the function `f` to the event name `event`, if provided.

        If `f` isn't provided, this method calls `EventEmitter#listens_to`, and
        otherwise calls `EventEmitter#add_listener`. In other words, you may either
        use it as a decorator:

        ```py
        @ee.on('data')
        def data_handler(data):
            print(data)
        ```

        Or directly:

        ```py
        ee.on('data', data_handler)
        ```

        In both the decorated and undecorated forms, the event handler is
        returned. The upshot of this is that you can call decorated handlers
        directly, as well as use them in remove_listener calls.

        Note that this method's return type is a union type. If you are using
        mypy or pyright, you will probably want to use either
        `EventEmitter#listens_to` or `EventEmitter#add_listener`.
        """
        if f is None:
            return self.listens_to(event)
        else:
            return self.add_listener(event, f)

    def listens_to(self, event: str) -> Callable[[Handler], Handler]:
        """Returns a decorator which will register the decorated function to
        the event name `event`:

        ```py
        @ee.listens_to("event")
        def data_handler(data):
            print(data)
        ```

        By only supporting the decorator use case, this method has improved
        type safety over `EventEmitter#on`.
        """

        def on(f: Handler) -> Handler:
            self._add_event_handler(event, f, f)
            return f

        return on

    def add_listener(self, event: str, f: Handler) -> Handler:
        """Register the function `f` to the event name `event`:

        ```
        def data_handler(data):
            print(data)

        h = ee.add_listener("event", data_handler)
        ```

        By not supporting the decorator use case, this method has improved
        type safety over `EventEmitter#on`.
        """
        self._add_event_handler(event, f, f)
        return f

    def _add_event_handler(self, event: str, k: Callable, v: Callable):
        # Fire 'new_listener' *before* adding the new listener!
        self.emit("new_listener", event, k)

        # Add the necessary function
        # Note that k and v are the same for `on` handlers, but
        # different for `once` handlers, where v is a wrapped version
        # of k which removes itself before calling k
        with self._lock:
            if event not in self._events:
                self._events[event] = OrderedDict()
            self._events[event][k] = v

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> None:
        f(*args, **kwargs)

    def event_names(self) -> Set[str]:
        """Get a set of events that this emitter is listening to."""
        return set(self._events.keys())

    def _emit_handle_potential_error(self, event: str, error: Any) -> None:
        if event == "error":
            if isinstance(error, Exception):
                raise error
            else:
                raise PyeeException(f"Uncaught, unspecified 'error' event: {error}")

    def _call_handlers(
        self,
        event: str,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> bool:
        handled = False

        with self._lock:
            funcs = list(self._events.get(event, OrderedDict()).values())
        for f in funcs:
            self._emit_run(f, args, kwargs)
            handled = True

        return handled

    def emit(
        self,
        event: str,
        *args: Any,
        **kwargs: Any,
    ) -> bool:
        """Emit `event`, passing `*args` and `**kwargs` to each attached
        function. Returns `True` if any functions are attached to `event`;
        otherwise returns `False`.

        Example:

        ```py
        ee.emit('data', '00101001')
        ```

        Assuming `data` is an attached function, this will call
        `data('00101001')'`.
        """
        handled = self._call_handlers(event, args, kwargs)

        if not handled:
            self._emit_handle_potential_error(event, args[0] if args else None)

        return handled

    def once(
        self,
        event: str,
        f: Optional[Callable] = None,
    ) -> Callable:
        """The same as `ee.on`, except that the listener is automatically
        removed after being called.
        """

        def _wrapper(f: Callable) -> Callable:
            def g(
                *args: Any,
                **kwargs: Any,
            ) -> Any:
                with self._lock:
                    # Check that the event wasn't removed already right
                    # before the lock
                    if event in self._events and f in self._events[event]:
                        self._remove_listener(event, f)
                    else:
                        return None
                # f may return a coroutine, so we need to return that
                # result here so that emit can schedule it
                return f(*args, **kwargs)

            self._add_event_handler(event, f, g)
            return f

        if f is None:
            return _wrapper
        else:
            return _wrapper(f)

    def _remove_listener(self, event: str, f: Callable) -> None:
        """Naked unprotected removal."""
        self._events[event].pop(f)
        if not len(self._events[event]):
            del self._events[event]

    def remove_listener(self, event: str, f: Callable) -> None:
        """Removes the function `f` from `event`."""
        with self._lock:
            self._remove_listener(event, f)

    def remove_all_listeners(self, event: Optional[str] = None) -> None:
        """Remove all listeners attached to `event`.
        If `event` is `None`, remove all listeners on all events.
        """
        with self._lock:
            if event is not None:
                self._events[event] = OrderedDict()
            else:
                self._events = dict()

    def listeners(self, event: str) -> List[Callable]:
        """Returns a list of all listeners registered to the `event`."""
        return list(self._events.get(event, OrderedDict()).keys())

add_listener(event, f)

Register the function f to the event name event:

def data_handler(data):
    print(data)

h = ee.add_listener("event", data_handler)

By not supporting the decorator use case, this method has improved type safety over EventEmitter#on.

pyee/base.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def add_listener(self, event: str, f: Handler) -> Handler:
    """Register the function `f` to the event name `event`:

    ```
    def data_handler(data):
        print(data)

    h = ee.add_listener("event", data_handler)
    ```

    By not supporting the decorator use case, this method has improved
    type safety over `EventEmitter#on`.
    """
    self._add_event_handler(event, f, f)
    return f

emit(event, *args, **kwargs)

Emit event, passing *args and **kwargs to each attached function. Returns True if any functions are attached to event; otherwise returns False.

Example:

ee.emit('data', '00101001')

Assuming data is an attached function, this will call data('00101001')'.

pyee/base.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def emit(
    self,
    event: str,
    *args: Any,
    **kwargs: Any,
) -> bool:
    """Emit `event`, passing `*args` and `**kwargs` to each attached
    function. Returns `True` if any functions are attached to `event`;
    otherwise returns `False`.

    Example:

    ```py
    ee.emit('data', '00101001')
    ```

    Assuming `data` is an attached function, this will call
    `data('00101001')'`.
    """
    handled = self._call_handlers(event, args, kwargs)

    if not handled:
        self._emit_handle_potential_error(event, args[0] if args else None)

    return handled

event_names()

Get a set of events that this emitter is listening to.

pyee/base.py
162
163
164
def event_names(self) -> Set[str]:
    """Get a set of events that this emitter is listening to."""
    return set(self._events.keys())

listeners(event)

Returns a list of all listeners registered to the event.

pyee/base.py
269
270
271
def listeners(self, event: str) -> List[Callable]:
    """Returns a list of all listeners registered to the `event`."""
    return list(self._events.get(event, OrderedDict()).keys())

listens_to(event)

Returns a decorator which will register the decorated function to the event name event:

@ee.listens_to("event")
def data_handler(data):
    print(data)

By only supporting the decorator use case, this method has improved type safety over EventEmitter#on.

pyee/base.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def listens_to(self, event: str) -> Callable[[Handler], Handler]:
    """Returns a decorator which will register the decorated function to
    the event name `event`:

    ```py
    @ee.listens_to("event")
    def data_handler(data):
        print(data)
    ```

    By only supporting the decorator use case, this method has improved
    type safety over `EventEmitter#on`.
    """

    def on(f: Handler) -> Handler:
        self._add_event_handler(event, f, f)
        return f

    return on

on(event, f=None)

Registers the function f to the event name event, if provided.

If f isn't provided, this method calls EventEmitter#listens_to, and otherwise calls EventEmitter#add_listener. In other words, you may either use it as a decorator:

@ee.on('data')
def data_handler(data):
    print(data)

Or directly:

ee.on('data', data_handler)

In both the decorated and undecorated forms, the event handler is returned. The upshot of this is that you can call decorated handlers directly, as well as use them in remove_listener calls.

Note that this method's return type is a union type. If you are using mypy or pyright, you will probably want to use either EventEmitter#listens_to or EventEmitter#add_listener.

pyee/base.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def on(
    self, event: str, f: Optional[Handler] = None
) -> Union[Handler, Callable[[Handler], Handler]]:
    """Registers the function `f` to the event name `event`, if provided.

    If `f` isn't provided, this method calls `EventEmitter#listens_to`, and
    otherwise calls `EventEmitter#add_listener`. In other words, you may either
    use it as a decorator:

    ```py
    @ee.on('data')
    def data_handler(data):
        print(data)
    ```

    Or directly:

    ```py
    ee.on('data', data_handler)
    ```

    In both the decorated and undecorated forms, the event handler is
    returned. The upshot of this is that you can call decorated handlers
    directly, as well as use them in remove_listener calls.

    Note that this method's return type is a union type. If you are using
    mypy or pyright, you will probably want to use either
    `EventEmitter#listens_to` or `EventEmitter#add_listener`.
    """
    if f is None:
        return self.listens_to(event)
    else:
        return self.add_listener(event, f)

once(event, f=None)

The same as ee.on, except that the listener is automatically removed after being called.

pyee/base.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def once(
    self,
    event: str,
    f: Optional[Callable] = None,
) -> Callable:
    """The same as `ee.on`, except that the listener is automatically
    removed after being called.
    """

    def _wrapper(f: Callable) -> Callable:
        def g(
            *args: Any,
            **kwargs: Any,
        ) -> Any:
            with self._lock:
                # Check that the event wasn't removed already right
                # before the lock
                if event in self._events and f in self._events[event]:
                    self._remove_listener(event, f)
                else:
                    return None
            # f may return a coroutine, so we need to return that
            # result here so that emit can schedule it
            return f(*args, **kwargs)

        self._add_event_handler(event, f, g)
        return f

    if f is None:
        return _wrapper
    else:
        return _wrapper(f)

remove_all_listeners(event=None)

Remove all listeners attached to event. If event is None, remove all listeners on all events.

pyee/base.py
259
260
261
262
263
264
265
266
267
def remove_all_listeners(self, event: Optional[str] = None) -> None:
    """Remove all listeners attached to `event`.
    If `event` is `None`, remove all listeners on all events.
    """
    with self._lock:
        if event is not None:
            self._events[event] = OrderedDict()
        else:
            self._events = dict()

remove_listener(event, f)

Removes the function f from event.

pyee/base.py
254
255
256
257
def remove_listener(self, event: str, f: Callable) -> None:
    """Removes the function `f` from `event`."""
    with self._lock:
        self._remove_listener(event, f)

BaseEventEmitter

Bases: EventEmitter

BaseEventEmitter is deprecated and an alias for EventEmitter.

Source code in pyee/__init__.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BaseEventEmitter(EventEmitter):
    """
    BaseEventEmitter is deprecated and an alias for EventEmitter.
    """

    def __init__(self):
        warn(
            DeprecationWarning(
                "pyee.BaseEventEmitter is deprecated and will be removed in a "
                "future major version; you should instead use pyee.EventEmitter."
            )
        )

        super(BaseEventEmitter, self).__init__()

AsyncIOEventEmitter

Bases: AsyncIOEventEmitter

AsyncIOEventEmitter has been moved to the pyee.asyncio module.

Source code in pyee/__init__.py
57
58
59
60
61
62
63
64
65
66
67
68
69
class AsyncIOEventEmitter(_AsyncIOEventEmitter):
    """
    AsyncIOEventEmitter has been moved to the pyee.asyncio module.
    """

    def __init__(self, loop=None):
        warn(
            DeprecationWarning(
                "pyee.AsyncIOEventEmitter has been moved to the pyee.asyncio "
                "module."
            )
        )
        super(AsyncIOEventEmitter, self).__init__(loop=loop)

PyeeException

Bases: Exception

An exception internal to pyee.

Source code in pyee/base.py
19
20
class PyeeException(Exception):
    """An exception internal to pyee."""

pyee.asyncio

AsyncIOEventEmitter

Bases: EventEmitter

An event emitter class which can run asyncio coroutines in addition to synchronous blocking functions. For example:

@ee.on('event')
async def async_handler(*args, **kwargs):
    await returns_a_future()

On emit, the event emitter will automatically schedule the coroutine using asyncio.ensure_future and the configured event loop (defaults to asyncio.get_event_loop()).

Unlike the case with the EventEmitter, all exceptions raised by event handlers are automatically emitted on the error event. This is important for asyncio coroutines specifically but is also handled for synchronous functions for consistency.

When loop is specified, the supplied event loop will be used when scheduling work with ensure_future. Otherwise, the default asyncio event loop is used.

For asyncio coroutine event handlers, calling emit is non-blocking. In other words, you do not have to await any results from emit, and the coroutine is scheduled in a fire-and-forget fashion.

Source code in pyee/asyncio.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class AsyncIOEventEmitter(EventEmitter):
    """An event emitter class which can run asyncio coroutines in addition to
    synchronous blocking functions. For example:

    ```py
    @ee.on('event')
    async def async_handler(*args, **kwargs):
        await returns_a_future()
    ```

    On emit, the event emitter  will automatically schedule the coroutine using
    `asyncio.ensure_future` and the configured event loop (defaults to
    `asyncio.get_event_loop()`).

    Unlike the case with the EventEmitter, all exceptions raised by
    event handlers are automatically emitted on the `error` event. This is
    important for asyncio coroutines specifically but is also handled for
    synchronous functions for consistency.

    When `loop` is specified, the supplied event loop will be used when
    scheduling work with `ensure_future`. Otherwise, the default asyncio
    event loop is used.

    For asyncio coroutine event handlers, calling emit is non-blocking.
    In other words, you do not have to await any results from emit, and the
    coroutine is scheduled in a fire-and-forget fashion.
    """

    def __init__(self, loop: Optional[AbstractEventLoop] = None):
        super(AsyncIOEventEmitter, self).__init__()
        self._loop: Optional[AbstractEventLoop] = loop
        self._waiting: Set[Future] = set()

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ):
        try:
            coro: Any = f(*args, **kwargs)
        except Exception as exc:
            self.emit("error", exc)
        else:
            if iscoroutine(coro):
                if self._loop:
                    # ensure_future is *extremely* cranky about the types here,
                    # but this is relatively well-tested and I think the types
                    # are more strict than they should be
                    fut: Any = ensure_future(cast(Any, coro), loop=self._loop)
                else:
                    fut = ensure_future(cast(Any, coro))

            elif isinstance(coro, Future):
                fut = cast(Any, coro)
            else:
                return

            def callback(f):
                self._waiting.remove(f)

                if f.cancelled():
                    return

                exc: Exception = f.exception()
                if exc:
                    self.emit("error", exc)

            fut.add_done_callback(callback)
            self._waiting.add(fut)

pyee.twisted

TwistedEventEmitter

Bases: EventEmitter

An event emitter class which can run twisted coroutines and handle returned Deferreds, in addition to synchronous blocking functions. For example:

@ee.on('event')
@inlineCallbacks
def async_handler(*args, **kwargs):
    yield returns_a_deferred()

or:

@ee.on('event')
async def async_handler(*args, **kwargs):
    await returns_a_deferred()

When async handlers fail, Failures are first emitted on the failure event. If there are no failure handlers, the Failure's associated exception is then emitted on the error event. If there are no error handlers, the exception is raised. For consistency, when handlers raise errors synchronously, they're captured, wrapped in a Failure and treated as an async failure. This is unlike the behavior of EventEmitter, which have no special error handling.

For twisted coroutine event handlers, calling emit is non-blocking. In other words, you do not have to await any results from emit, and the coroutine is scheduled in a fire-and-forget fashion.

Similar behavior occurs for "sync" functions which return Deferreds.

Source code in pyee/twisted.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class TwistedEventEmitter(EventEmitter):
    """An event emitter class which can run twisted coroutines and handle
    returned Deferreds, in addition to synchronous blocking functions. For
    example:

    ```py
    @ee.on('event')
    @inlineCallbacks
    def async_handler(*args, **kwargs):
        yield returns_a_deferred()
    ```

    or:

    ```py
    @ee.on('event')
    async def async_handler(*args, **kwargs):
        await returns_a_deferred()
    ```


    When async handlers fail, Failures are first emitted on the `failure`
    event. If there are no `failure` handlers, the Failure's associated
    exception is then emitted on the `error` event. If there are no `error`
    handlers, the exception is raised. For consistency, when handlers raise
    errors synchronously, they're captured, wrapped in a Failure and treated
    as an async failure. This is unlike the behavior of EventEmitter,
    which have no special error handling.

    For twisted coroutine event handlers, calling emit is non-blocking.
    In other words, you do not have to await any results from emit, and the
    coroutine is scheduled in a fire-and-forget fashion.

    Similar behavior occurs for "sync" functions which return Deferreds.
    """

    def __init__(self):
        super(TwistedEventEmitter, self).__init__()

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> None:
        d: Optional[Deferred[Any]] = None
        try:
            result = f(*args, **kwargs)
        except Exception:
            self.emit("failure", Failure())
        else:
            if iscoroutine and iscoroutine(result):
                d = ensureDeferred(result)
            elif isinstance(result, Deferred):
                d = result
            elif not d:
                return

            def errback(failure: Failure) -> None:
                if failure:
                    self.emit("failure", failure)

            d.addErrback(errback)

    def _emit_handle_potential_error(self, event: str, error: Any) -> None:
        if event == "failure":
            if isinstance(error, Failure):
                try:
                    error.raiseException()
                except Exception as exc:
                    self.emit("error", exc)
            elif isinstance(error, Exception):
                self.emit("error", error)
            else:
                self.emit("error", PyeeException(f"Unexpected failure object: {error}"))
        else:
            (super(TwistedEventEmitter, self))._emit_handle_potential_error(
                event, error
            )

pyee.executor

ExecutorEventEmitter

Bases: EventEmitter

An event emitter class which runs handlers in a concurrent.futures executor.

By default, this class creates a default ThreadPoolExecutor, but a custom executor may also be passed in explicitly to, for instance, use a ProcessPoolExecutor instead.

This class runs all emitted events on the configured executor. Errors captured by the resulting Future are automatically emitted on the error event. This is unlike the EventEmitter, which have no error handling.

The underlying executor may be shut down by calling the shutdown method. Alternately you can treat the event emitter as a context manager:

with ExecutorEventEmitter() as ee:
    # Underlying executor open

    @ee.on('data')
    def handler(data):
        print(data)

    ee.emit('event')

# Underlying executor closed

Since the function call is scheduled on an executor, emit is always non-blocking.

No effort is made to ensure thread safety, beyond using an executor.

Source code in pyee/executor.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class ExecutorEventEmitter(EventEmitter):
    """An event emitter class which runs handlers in a `concurrent.futures`
    executor.

    By default, this class creates a default `ThreadPoolExecutor`, but
    a custom executor may also be passed in explicitly to, for instance,
    use a `ProcessPoolExecutor` instead.

    This class runs all emitted events on the configured executor. Errors
    captured by the resulting Future are automatically emitted on the
    `error` event. This is unlike the EventEmitter, which have no error
    handling.

    The underlying executor may be shut down by calling the `shutdown`
    method. Alternately you can treat the event emitter as a context manager:

    ```py
    with ExecutorEventEmitter() as ee:
        # Underlying executor open

        @ee.on('data')
        def handler(data):
            print(data)

        ee.emit('event')

    # Underlying executor closed
    ```

    Since the function call is scheduled on an executor, emit is always
    non-blocking.

    No effort is made to ensure thread safety, beyond using an executor.
    """

    def __init__(self, executor: Optional[Executor] = None):
        super(ExecutorEventEmitter, self).__init__()
        if executor:
            self._executor: Executor = executor
        else:
            self._executor = ThreadPoolExecutor()

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ):
        future: Future = self._executor.submit(f, *args, **kwargs)

        @future.add_done_callback
        def _callback(f: Future) -> None:
            exc: Optional[BaseException] = f.exception()
            if isinstance(exc, Exception):
                self.emit("error", exc)
            elif exc is not None:
                raise exc

    def shutdown(self, wait: bool = True) -> None:
        """Call `shutdown` on the internal executor."""

        self._executor.shutdown(wait=wait)

    def __enter__(self) -> "ExecutorEventEmitter":
        return self

    def __exit__(
        self, type: Type[Exception], value: Exception, traceback: TracebackType
    ) -> Optional[bool]:
        self.shutdown()

shutdown(wait=True)

Call shutdown on the internal executor.

pyee/executor.py
70
71
72
73
def shutdown(self, wait: bool = True) -> None:
    """Call `shutdown` on the internal executor."""

    self._executor.shutdown(wait=wait)

pyee.trio

TrioEventEmitter

Bases: EventEmitter

An event emitter class which can run trio tasks in a trio nursery.

By default, this class will lazily create both a nursery manager (the object returned from trio.open_nursery() and a nursery (the object yielded by using the nursery manager as an async context manager). It is also possible to supply an existing nursery manager via the manager argument, or an existing nursery via the nursery argument.

Instances of TrioEventEmitter are themselves async context managers, so that they may manage the lifecycle of the underlying trio nursery. For example, typical usage of this library may look something like this::

async with TrioEventEmitter() as ee:
    # Underlying nursery is instantiated and ready to go
    @ee.on('data')
    async def handler(data):
        print(data)

    ee.emit('event')

# Underlying nursery and manager have been cleaned up

Unlike the case with the EventEmitter, all exceptions raised by event handlers are automatically emitted on the error event. This is important for trio coroutines specifically but is also handled for synchronous functions for consistency.

For trio coroutine event handlers, calling emit is non-blocking. In other words, you should not attempt to await emit; the coroutine is scheduled in a fire-and-forget fashion.

Source code in pyee/trio.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class TrioEventEmitter(EventEmitter):
    """An event emitter class which can run trio tasks in a trio nursery.

    By default, this class will lazily create both a nursery manager (the
    object returned from `trio.open_nursery()` and a nursery (the object
    yielded by using the nursery manager as an async context manager). It is
    also possible to supply an existing nursery manager via the `manager`
    argument, or an existing nursery via the `nursery` argument.

    Instances of TrioEventEmitter are themselves async context managers, so
    that they may manage the lifecycle of the underlying trio nursery. For
    example, typical usage of this library may look something like this::

    ```py
    async with TrioEventEmitter() as ee:
        # Underlying nursery is instantiated and ready to go
        @ee.on('data')
        async def handler(data):
            print(data)

        ee.emit('event')

    # Underlying nursery and manager have been cleaned up
    ```

    Unlike the case with the EventEmitter, all exceptions raised by event
    handlers are automatically emitted on the `error` event. This is
    important for trio coroutines specifically but is also handled for
    synchronous functions for consistency.

    For trio coroutine event handlers, calling emit is non-blocking. In other
    words, you should not attempt to await emit; the coroutine is scheduled
    in a fire-and-forget fashion.
    """

    def __init__(
        self,
        nursery: Optional[Nursery] = None,
        manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None,
    ):
        super(TrioEventEmitter, self).__init__()
        self._nursery: Optional[Nursery] = None
        self._manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None
        if nursery:
            if manager:
                raise PyeeException(
                    "You may either pass a nursery or a nursery manager " "but not both"
                )
            self._nursery = nursery
        elif manager:
            self._manager = manager
        else:
            self._manager = trio.open_nursery()

    def _async_runner(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> Callable[[], Awaitable[None]]:
        async def runner() -> None:
            try:
                await f(*args, **kwargs)
            except Exception as exc:
                self.emit("error", exc)

        return runner

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> None:
        if not self._nursery:
            raise PyeeException("Uninitialized trio nursery")
        self._nursery.start_soon(self._async_runner(f, args, kwargs))

    @asynccontextmanager
    async def context(
        self,
    ) -> AsyncGenerator["TrioEventEmitter", None]:
        """Returns an async contextmanager which manages the underlying
        nursery to the EventEmitter. The `TrioEventEmitter`'s
        async context management methods are implemented using this
        function, but it may also be used directly for clarity.
        """
        if self._nursery is not None:
            yield self
        elif self._manager is not None:
            async with self._manager as nursery:
                self._nursery = nursery
                yield self
        else:
            raise PyeeException("Uninitialized nursery or nursery manager")

    async def __aenter__(self) -> "TrioEventEmitter":
        self._context: Optional[
            AbstractAsyncContextManager["TrioEventEmitter"]
        ] = self.context()
        return await self._context.__aenter__()

    async def __aexit__(
        self,
        type: Optional[Type[BaseException]],
        value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        if self._context is None:
            raise PyeeException("Attempting to exit uninitialized context")
        rv = await self._context.__aexit__(type, value, traceback)
        self._context = None
        self._nursery = None
        self._manager = None
        return rv

context() async

Returns an async contextmanager which manages the underlying nursery to the EventEmitter. The TrioEventEmitter's async context management methods are implemented using this function, but it may also be used directly for clarity.

pyee/trio.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@asynccontextmanager
async def context(
    self,
) -> AsyncGenerator["TrioEventEmitter", None]:
    """Returns an async contextmanager which manages the underlying
    nursery to the EventEmitter. The `TrioEventEmitter`'s
    async context management methods are implemented using this
    function, but it may also be used directly for clarity.
    """
    if self._nursery is not None:
        yield self
    elif self._manager is not None:
        async with self._manager as nursery:
            self._nursery = nursery
            yield self
    else:
        raise PyeeException("Uninitialized nursery or nursery manager")

pyee.uplift

unwrap(event_emitter)

Unwrap an uplifted EventEmitter, returning it to its prior state.

pyee/uplift.py
17
18
19
20
def unwrap(event_emitter: EventEmitter) -> None:
    """Unwrap an uplifted EventEmitter, returning it to its prior state."""
    if event_emitter in EMIT_WRAPPERS:
        EMIT_WRAPPERS[event_emitter]()

uplift(cls, underlying, error_handling='new', proxy_new_listener='forward', *args, **kwargs)

A helper to create instances of an event emitter cls that inherits event behavior from an underlying event emitter instance.

This is mostly helpful if you have a simple underlying event emitter that you don't have direct control over, but you want to use that event emitter in a new context - for example, you may want to uplift a EventEmitter supplied by a third party library into an AsyncIOEventEmitter so that you may register async event handlers in your asyncio app but still be able to receive events from the underlying event emitter and call the underlying event emitter's existing handlers.

When called, uplift instantiates a new instance of cls, passing along any unrecognized arguments, and overwrites the emit method on the underlying event emitter to also emit events on the new event emitter and vice versa. In both cases, they return whether the emit method was handled by either emitter. Execution order prefers the event emitter on which emit was called.

The unwrap function may be called on either instance; this will unwrap both emit methods.

The error_handling flag can be configured to control what happens to unhandled errors:

  • 'new': Error handling for the new event emitter is always used and the underlying library's non-event-based error handling is inert.
  • 'underlying': Error handling on the underlying event emitter is always used and the new event emitter can not implement non-event-based error handling.
  • 'neither': Error handling for the new event emitter is used if the handler was registered on the new event emitter, and vice versa.

Tuning this option can be useful depending on how the underlying event emitter does error handling. The default is 'new'.

The proxy_new_listener option can be configured to control how new_listener events are treated:

  • 'forward': new_listener events are propagated from the underlying
  • 'both': new_listener events are propagated as with other events.
  • 'neither': new_listener events are only fired on their respective event emitters. event emitter to the new event emitter but not vice versa.
  • 'backward': new_listener events are propagated from the new event emitter to the underlying event emitter, but not vice versa.

Tuning this option can be useful depending on how the new_listener event is used by the underlying event emitter, if at all. The default is 'forward', since underlying may not know how to handle certain handlers, such as asyncio coroutines.

Each event emitter tracks its own internal table of handlers. remove_listener, remove_all_listeners and listeners all work independently. This means you will have to remember which event emitter an event handler was added to!

Note that both the new event emitter returned by cls and the underlying event emitter should inherit from EventEmitter, or at least implement the interface for the undocumented _call_handlers and _emit_handle_potential_error methods.

pyee/uplift.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def uplift(
    cls: Type[UpliftingEventEmitter],
    underlying: EventEmitter,
    error_handling: ErrorStrategy = "new",
    proxy_new_listener: ProxyStrategy = "forward",
    *args: Any,
    **kwargs: Any,
) -> UpliftingEventEmitter:
    """A helper to create instances of an event emitter `cls` that inherits
    event behavior from an `underlying` event emitter instance.

    This is mostly helpful if you have a simple underlying event emitter
    that you don't have direct control over, but you want to use that
    event emitter in a new context - for example, you may want to `uplift` a
    `EventEmitter` supplied by a third party library into an
    `AsyncIOEventEmitter` so that you may register async event handlers
    in your `asyncio` app but still be able to receive events from the
    underlying event emitter and call the underlying event emitter's existing
    handlers.

    When called, `uplift` instantiates a new instance of `cls`, passing
    along any unrecognized arguments, and overwrites the `emit` method on
    the `underlying` event emitter to also emit events on the new event
    emitter and vice versa. In both cases, they return whether the `emit`
    method was handled by either emitter. Execution order prefers the event
    emitter on which `emit` was called.

    The `unwrap` function may be called on either instance; this will
    unwrap both `emit` methods.

    The `error_handling` flag can be configured to control what happens to
    unhandled errors:

    - 'new': Error handling for the new event emitter is always used and the
      underlying library's non-event-based error handling is inert.
    - 'underlying': Error handling on the underlying event emitter is always
      used and the new event emitter can not implement non-event-based error
      handling.
    - 'neither': Error handling for the new event emitter is used if the
      handler was registered on the new event emitter, and vice versa.

    Tuning this option can be useful depending on how the underlying event
    emitter does error handling. The default is 'new'.

    The `proxy_new_listener` option can be configured to control how
    `new_listener` events are treated:

    - 'forward': `new_listener` events are propagated from the underlying
    - 'both': `new_listener` events are propagated as with other events.
    - 'neither': `new_listener` events are only fired on their respective
      event emitters.
      event emitter to the new event emitter but not vice versa.
    - 'backward': `new_listener` events are propagated from the new event
      emitter to the underlying event emitter, but not vice versa.

    Tuning this option can be useful depending on how the `new_listener`
    event is used by the underlying event emitter, if at all. The default is
    'forward', since `underlying` may not know how to handle certain
    handlers, such as asyncio coroutines.

    Each event emitter tracks its own internal table of handlers.
    `remove_listener`, `remove_all_listeners` and `listeners` all
    work independently. This means you will have to remember which event
    emitter an event handler was added to!

    Note that both the new event emitter returned by `cls` and the
    underlying event emitter should inherit from `EventEmitter`, or at
    least implement the interface for the undocumented `_call_handlers` and
    `_emit_handle_potential_error` methods.
    """

    (
        new_proxy_new_listener,
        underlying_proxy_new_listener,
    ) = _PROXY_NEW_LISTENER_SETTINGS[proxy_new_listener]

    new: UpliftingEventEmitter = cls(*args, **kwargs)

    uplift_error_handlers: Dict[str, Tuple[EventEmitter, EventEmitter]] = dict(
        new=(new, new), underlying=(underlying, underlying), neither=(new, underlying)
    )

    new_error_handler, underlying_error_handler = uplift_error_handlers[error_handling]

    _wrap(new, underlying, new_error_handler, new_proxy_new_listener)
    _wrap(underlying, new, underlying_error_handler, underlying_proxy_new_listener)

    return new

pyee.cls

evented(cls)

Configure an evented class.

Evented classes are classes which use an EventEmitter to call instance methods during runtime. To achieve this without this helper, you would instantiate an EventEmitter in the __init__ method and then call event_emitter.on for every method on self.

This decorator and the on function help make things look a little nicer by defining the event handler on the method in the class and then adding the __init__ hook in a wrapper:

from pyee.cls import evented, on

@evented
class Evented:
    @on("event")
    def event_handler(self, *args, **kwargs):
        print(self, args, kwargs)

evented_obj = Evented()

evented_obj.event_emitter.emit(
    "event", "hello world", numbers=[1, 2, 3]
)

The __init__ wrapper will create a self.event_emitter: EventEmitter automatically but you can also define your own event_emitter inside your class's unwrapped __init__ method. For example, to use this decorator with a TwistedEventEmitter::

@evented
class Evented:
    def __init__(self):
        self.event_emitter = TwistedEventEmitter()

    @on("event")
    async def event_handler(self, *args, **kwargs):
        await self.some_async_action(*args, **kwargs)
pyee/cls.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def evented(cls: Cls) -> Cls:
    """
    Configure an evented class.

    Evented classes are classes which use an EventEmitter to call instance
    methods during runtime. To achieve this without this helper, you would
    instantiate an `EventEmitter` in the `__init__` method and then call
    `event_emitter.on` for every method on `self`.

    This decorator and the `on` function help make things look a little nicer
    by defining the event handler on the method in the class and then adding
    the `__init__` hook in a wrapper:

    ```py
    from pyee.cls import evented, on

    @evented
    class Evented:
        @on("event")
        def event_handler(self, *args, **kwargs):
            print(self, args, kwargs)

    evented_obj = Evented()

    evented_obj.event_emitter.emit(
        "event", "hello world", numbers=[1, 2, 3]
    )
    ```

    The `__init__` wrapper will create a `self.event_emitter: EventEmitter`
    automatically but you can also define your own event_emitter inside your
    class's unwrapped `__init__` method. For example, to use this
    decorator with a `TwistedEventEmitter`::

    ```py
    @evented
    class Evented:
        def __init__(self):
            self.event_emitter = TwistedEventEmitter()

        @on("event")
        async def event_handler(self, *args, **kwargs):
            await self.some_async_action(*args, **kwargs)
    ```
    """
    handlers: List[Handler] = list(_handlers)
    _handlers.reset()

    og_init: Callable = cls.__init__

    @wraps(cls.__init__)
    def init(self, *args, **kwargs):
        og_init(self, *args, **kwargs)
        if not hasattr(self, "event_emitter"):
            self.event_emitter = EventEmitter()

        for h in handlers:
            self.event_emitter.on(h.event, _bind(self, h.method))

    cls.__init__ = init

    return cls

on(event)

Register an event handler on an evented class. See the evented class decorator for a full example.

pyee/cls.py
31
32
33
34
35
36
37
38
39
40
41
def on(event: str) -> Callable[[Callable], Callable]:
    """
    Register an event handler on an evented class. See the `evented` class
    decorator for a full example.
    """

    def decorator(method: Callable) -> Callable:
        _handlers.append(Handler(event=event, method=method))
        return method

    return decorator