Skip to content

EventBus API Reference

The EventBus is the central routing system that connects event sources to handlers.

Class: EventBus

from starstreamer.core.event_bus import EventBus

Constructor

EventBus(handler_registry: HandlerRegistry)

Creates a new EventBus instance.

Parameters: - handler_registry (HandlerRegistry): Registry for managing event handlers with dependency injection support

Methods

emit(event_type: str, data: dict, source: str = "unknown") -> None

Emit an event to all registered handlers.

await bus.emit("custom.alert", {
    "message": "New follower!",
    "username": "viewer123"
}, source="custom")

Parameters: - event_type (str): The event type identifier - data (dict): Event payload data - source (str): Event source identifier. Default: "unknown"

Returns: None


register_handler(event_type: str, handler: Callable, filters: List = None, priority: int = 5) -> None

Register a handler for an event type.

async def my_handler(event):
    print(f"Got event: {event.type}")

bus.register_handler("twitch.chat.message", my_handler)

Parameters: - event_type (str): Event type pattern (supports wildcards) - handler (Callable): Async function to handle events - filters (List[Callable], optional): List of filter functions - priority (int): Handler priority (lower = higher priority). Default: 5

Returns: None


unregister_handler(event_type: str, handler: Callable) -> None

Remove a handler registration.

bus.unregister_handler("twitch.chat.message", my_handler)

Parameters: - event_type (str): Event type pattern - handler (Callable): Handler to remove

Returns: None


get_stats() -> dict

Get runtime statistics about the event bus.

stats = bus.get_stats()
print(f"Running: {stats['running']}")
print(f"Total handlers: {stats['total_handlers']}")
print(f"Queue size: {stats['queue_size']}")

Returns: - dict: Statistics dictionary containing: - running (bool): Whether event processing is active - queue_size (int): Current event queue size - registered_events (list): List of registered event types - handler_counts (dict): Handler counts by event type - total_handlers (int): Total number of registered handlers - handler_styles (dict): Distribution of handler styles (explicit vs legacy) - using_di_registry (bool): Whether using dependency injection registry


add_error_handler(handler: Callable) -> None

Add a global error handler for handler exceptions.

async def error_handler(exception: Exception, handler_name: Optional[str]):
    logger.error(f"Handler {handler_name} failed: {exception}")

bus.add_error_handler(error_handler)

Parameters: - handler (Callable): Error handler function

Returns: None


add_middleware(middleware: Callable) -> None

Add middleware to process events before they reach handlers.

async def log_middleware(event: Event) -> Optional[Event]:
    logger.info(f"Processing event: {event.type}")
    return event  # Return None to filter event

bus.add_middleware(log_middleware)

Parameters: - middleware (Callable): Middleware function that returns Event or None

Returns: None


start() -> None

Start the event processing loop.

await bus.start()

Returns: None


stop() -> None

Stop the event processing loop.

await bus.stop()

Returns: None


unregister_module_handlers(module_name: str) -> int

Remove all handlers registered by a specific module. This method is essential for hot reloading functionality.

# Unregister all handlers from the chat module
removed_count = bus.unregister_module_handlers("chat")
print(f"Removed {removed_count} handlers")

Parameters: - module_name (str): The module name (e.g., "chat", "alerts", "rpg")

Returns: - int: Number of handlers that were removed

Use Cases: - Hot reloading modules during development - Cleaning up before module reload - Module unloading and cleanup

Example with Hot Reload:

async def hot_reload_module(module_name: str):
    # Remove old handlers
    removed = bus.unregister_module_handlers(module_name)
    logger.info(f"Removed {removed} handlers from {module_name}")

    # Reload the module
    await registry.reload_module(module_name)

    # New handlers are automatically registered
    stats = bus.get_stats()
    logger.info(f"Now have {stats['total_handlers']} total handlers")

Handler Tracking: The EventBus automatically tracks which module registered each handler by: 1. Inspecting the call stack during registration 2. Mapping handler functions to their module names 3. Maintaining an internal registry for cleanup

Global Functions

get_event_bus() -> Optional[EventBus]

Get the global EventBus instance.

from starstreamer import get_event_bus

bus = get_event_bus()

Returns: - Optional[EventBus]: The global event bus instance or None if not set


set_event_bus(bus: EventBus) -> None

Set the global EventBus instance.

from starstreamer import set_event_bus
from starstreamer.core.event_bus import EventBus
from starstreamer.core.di.registry import HandlerRegistry
from starstreamer.core.di.container import ServiceContainer

# Create DI-enabled event bus
container = ServiceContainer()
registry = HandlerRegistry(container)
custom_bus = EventBus(registry)
set_event_bus(custom_bus)

Parameters: - bus (EventBus): New event bus instance

Returns: None

Event Class

Class: Event

from starstreamer.runtime.types import Event

Event data structure passed to handlers.

Attributes: - type (str): Event type identifier - data (dict): Event payload - timestamp (float): Unix timestamp when event occurred - source (str): Event source ("twitch", "obs", etc.) - raw (Any): Original untransformed event data - context (dict): Additional context (triggers, metadata)

Example:

@on_event("twitch.chat.message")
async def handler(event: Event):
    print(f"Type: {event.type}")
    print(f"User: {event.data['user']['username']}")
    print(f"Message: {event.data['message']}")
    print(f"Timestamp: {event.timestamp}")
    print(f"Source: {event.source}")

Wildcard Patterns

The EventBus supports wildcard patterns for event types:

Pattern Matches Example
* Single segment twitch.* matches twitch.follow but not twitch.chat.message
** Multiple segments twitch.** matches both twitch.follow and twitch.chat.message
? Single character twitch.f?llow matches twitch.follow and twitch.fallow

Examples:

# Match all Twitch events
bus.register("twitch.*", handler)

# Match all chat events from any platform
bus.register("*.chat.message", handler)

# Match everything
bus.register("**", universal_handler)

Error Handling

Handler Exceptions

Handler exceptions are caught and logged but don't stop other handlers:

@on_event("test.event")
async def failing_handler(event):
    raise ValueError("This handler fails")

@on_event("test.event")
async def working_handler(event):
    print("This still runs")

# Both handlers execute, error is logged
await bus.emit("test.event", {})

Global Error Handler

async def global_error_handler(exception: Exception, handler_name: Optional[str]):
    # Log to external service
    await error_service.log({
        'handler': handler_name,
        'error': str(exception)
    })

bus.add_error_handler(global_error_handler)

Performance Considerations

Concurrent Execution

Handlers run concurrently using asyncio.gather():

# These run in parallel
@on_event("test.event")
async def handler1(event):
    await asyncio.sleep(1)
    print("Handler 1")

@on_event("test.event")
async def handler2(event):
    print("Handler 2")  # Prints immediately

Dispatch Timing

Monitor dispatch performance:

stats = bus.get_stats()
if stats['average_dispatch_time'] > 100:  # Over 100ms
    logger.warning("Event dispatch is slow!")

Usage Examples

Basic Event Handling

from starstreamer.core.event_bus import get_event_bus

bus = get_event_bus()

# Register handler
async def chat_handler(event):
    print(f"Chat: {event.data['message']}")

bus.register_handler("twitch.chat.message", chat_handler)

# Emit event
await bus.emit("twitch.chat.message", {
    "user": {"username": "viewer"},
    "message": "Hello world!"
})

Custom Event Chains

@on_event("twitch.follow")
async def follow_handler(event):
    # Process follow
    username = event.data['user']['username']

    # Trigger custom celebration
    bus = get_event_bus()
    await bus.emit("celebration.start", {
        "type": "follow",
        "username": username
    })

@on_event("celebration.start")
async def celebration_handler(event):
    # Handle celebration
    print(f"Celebrating {event.data['type']} from {event.data['username']}")

Testing with EventBus

import pytest
from starstreamer.core.event_bus import EventBus

@pytest.mark.asyncio
async def test_event_handling():
    bus = EventBus()
    received = []

    async def test_handler(event):
        received.append(event)

    bus.register_handler("test.event", test_handler)
    await bus.emit("test.event", {"test": "data"})

    assert len(received) == 1
    assert received[0].data["test"] == "data"

See Also