EventBus API Reference¶
The EventBus is the central routing system that connects event sources to handlers.
Class: EventBus¶
Constructor¶
Creates a new EventBus instance.
Parameters:
- handler_registry (HandlerRegistry): Registry for managing event handlers with dependency injection support
Methods¶
emit(event_type: str, data: dict, source: str = "unknown") -> None¶
Emit an event to all registered handlers.
await bus.emit("custom.alert", {
    "message": "New follower!",
    "username": "viewer123"
}, source="custom")
Parameters:
- event_type (str): The event type identifier
- data (dict): Event payload data
- source (str): Event source identifier. Default: "unknown"
Returns: None
register_handler(event_type: str, handler: Callable, filters: List = None, priority: int = 5) -> None¶
Register a handler for an event type.
async def my_handler(event):
    print(f"Got event: {event.type}")
bus.register_handler("twitch.chat.message", my_handler)
Parameters:
- event_type (str): Event type pattern (supports wildcards)
- handler (Callable): Async function to handle events
- filters (List[Callable], optional): List of filter functions
- priority (int): Handler priority (lower = higher priority). Default: 5
Returns: None
unregister_handler(event_type: str, handler: Callable) -> None¶
Remove a handler registration.
Parameters:
- event_type (str): Event type pattern
- handler (Callable): Handler to remove
Returns: None
get_stats() -> dict¶
Get runtime statistics about the event bus.
stats = bus.get_stats()
print(f"Running: {stats['running']}")
print(f"Total handlers: {stats['total_handlers']}")
print(f"Queue size: {stats['queue_size']}")
Returns:
- dict: Statistics dictionary containing:
  - running (bool): Whether event processing is active
  - queue_size (int): Current event queue size
  - registered_events (list): List of registered event types
  - handler_counts (dict): Handler counts by event type
  - total_handlers (int): Total number of registered handlers
  - handler_styles (dict): Distribution of handler styles (explicit vs legacy)
  - using_di_registry (bool): Whether using dependency injection registry
add_error_handler(handler: Callable) -> None¶
Add a global error handler for handler exceptions.
async def error_handler(exception: Exception, handler_name: Optional[str]):
    logger.error(f"Handler {handler_name} failed: {exception}")
bus.add_error_handler(error_handler)
Parameters:
- handler (Callable): Error handler function
Returns: None
add_middleware(middleware: Callable) -> None¶
Add middleware to process events before they reach handlers.
async def log_middleware(event: Event) -> Optional[Event]:
    logger.info(f"Processing event: {event.type}")
    return event  # Return None to filter event
bus.add_middleware(log_middleware)
Parameters:
- middleware (Callable): Middleware function that returns Event or None
Returns: None
start() -> None¶
Start the event processing loop.
Returns: None
stop() -> None¶
Stop the event processing loop.
Returns: None
unregister_module_handlers(module_name: str) -> int¶
Remove all handlers registered by a specific module. This method is essential for hot reloading functionality.
# Unregister all handlers from the chat module
removed_count = bus.unregister_module_handlers("chat")
print(f"Removed {removed_count} handlers")
Parameters:
- module_name (str): The module name (e.g., "chat", "alerts", "rpg")
Returns:
- int: Number of handlers that were removed
Use Cases: - Hot reloading modules during development - Cleaning up before module reload - Module unloading and cleanup
Example with Hot Reload:
async def hot_reload_module(module_name: str):
    # Remove old handlers
    removed = bus.unregister_module_handlers(module_name)
    logger.info(f"Removed {removed} handlers from {module_name}")
    # Reload the module
    await registry.reload_module(module_name)
    # New handlers are automatically registered
    stats = bus.get_stats()
    logger.info(f"Now have {stats['total_handlers']} total handlers")
Handler Tracking: The EventBus automatically tracks which module registered each handler by: 1. Inspecting the call stack during registration 2. Mapping handler functions to their module names 3. Maintaining an internal registry for cleanup
Global Functions¶
get_event_bus() -> Optional[EventBus]¶
Get the global EventBus instance.
Returns:
- Optional[EventBus]: The global event bus instance or None if not set
set_event_bus(bus: EventBus) -> None¶
Set the global EventBus instance.
from starstreamer import set_event_bus
from starstreamer.core.event_bus import EventBus
from starstreamer.core.di.registry import HandlerRegistry
from starstreamer.core.di.container import ServiceContainer
# Create DI-enabled event bus
container = ServiceContainer()
registry = HandlerRegistry(container)
custom_bus = EventBus(registry)
set_event_bus(custom_bus)
Parameters:
- bus (EventBus): New event bus instance
Returns: None
Event Class¶
Class: Event¶
Event data structure passed to handlers.
Attributes:
- type (str): Event type identifier
- data (dict): Event payload
- timestamp (float): Unix timestamp when event occurred
- source (str): Event source ("twitch", "obs", etc.)
- raw (Any): Original untransformed event data
- context (dict): Additional context (triggers, metadata)
Example:
@on_event("twitch.chat.message")
async def handler(event: Event):
    print(f"Type: {event.type}")
    print(f"User: {event.data['user']['username']}")
    print(f"Message: {event.data['message']}")
    print(f"Timestamp: {event.timestamp}")
    print(f"Source: {event.source}")
Wildcard Patterns¶
The EventBus supports wildcard patterns for event types:
| Pattern | Matches | Example | 
|---|---|---|
| * | Single segment | twitch.*matchestwitch.followbut nottwitch.chat.message | 
| ** | Multiple segments | twitch.**matches bothtwitch.followandtwitch.chat.message | 
| ? | Single character | twitch.f?llowmatchestwitch.followandtwitch.fallow | 
Examples:
# Match all Twitch events
bus.register("twitch.*", handler)
# Match all chat events from any platform
bus.register("*.chat.message", handler)
# Match everything
bus.register("**", universal_handler)
Error Handling¶
Handler Exceptions¶
Handler exceptions are caught and logged but don't stop other handlers:
@on_event("test.event")
async def failing_handler(event):
    raise ValueError("This handler fails")
@on_event("test.event")
async def working_handler(event):
    print("This still runs")
# Both handlers execute, error is logged
await bus.emit("test.event", {})
Global Error Handler¶
async def global_error_handler(exception: Exception, handler_name: Optional[str]):
    # Log to external service
    await error_service.log({
        'handler': handler_name,
        'error': str(exception)
    })
bus.add_error_handler(global_error_handler)
Performance Considerations¶
Concurrent Execution¶
Handlers run concurrently using asyncio.gather():
# These run in parallel
@on_event("test.event")
async def handler1(event):
    await asyncio.sleep(1)
    print("Handler 1")
@on_event("test.event")
async def handler2(event):
    print("Handler 2")  # Prints immediately
Dispatch Timing¶
Monitor dispatch performance:
stats = bus.get_stats()
if stats['average_dispatch_time'] > 100:  # Over 100ms
    logger.warning("Event dispatch is slow!")
Usage Examples¶
Basic Event Handling¶
from starstreamer.core.event_bus import get_event_bus
bus = get_event_bus()
# Register handler
async def chat_handler(event):
    print(f"Chat: {event.data['message']}")
bus.register_handler("twitch.chat.message", chat_handler)
# Emit event
await bus.emit("twitch.chat.message", {
    "user": {"username": "viewer"},
    "message": "Hello world!"
})
Custom Event Chains¶
@on_event("twitch.follow")
async def follow_handler(event):
    # Process follow
    username = event.data['user']['username']
    # Trigger custom celebration
    bus = get_event_bus()
    await bus.emit("celebration.start", {
        "type": "follow",
        "username": username
    })
@on_event("celebration.start")
async def celebration_handler(event):
    # Handle celebration
    print(f"Celebrating {event.data['type']} from {event.data['username']}")
Testing with EventBus¶
import pytest
from starstreamer.core.event_bus import EventBus
@pytest.mark.asyncio
async def test_event_handling():
    bus = EventBus()
    received = []
    async def test_handler(event):
        received.append(event)
    bus.register_handler("test.event", test_handler)
    await bus.emit("test.event", {"test": "data"})
    assert len(received) == 1
    assert received[0].data["test"] == "data"