Skip to content

Event System

StarStreamer's event system is the heart of the framework. It connects event sources (Twitch, OBS) to your action handlers through a central event bus.

Implementation Status: ✅ Fully Implemented - The core event system is complete and functional.

Architecture Overview

graph TD
    subgraph Sources
        T[Twitch EventSub]
        O[OBS WebSocket]
        W[Web Interface]
    end

    subgraph "Event Bus"
        EB[EventBus Core]
        R[Handler Registry]
        D[Dispatcher]
    end

    subgraph Handlers
        H1[Chat Commands]
        H2[Alerts]
        H3[Automations]
    end

    T --> EB
    O --> EB
    W --> EB
    EB --> R
    R --> D
    D --> H1
    D --> H2
    D --> H3

Event Flow

  1. Event Source emits an event (e.g., Twitch chat message)
  2. Event Bus receives and transforms the event
  3. Registry finds matching handlers
  4. Dispatcher executes handlers concurrently
  5. Handlers perform actions (send messages, trigger alerts, etc.)

Event Structure

All events follow a consistent structure:

class Event:
    type: str           # Event type (e.g., "twitch.chat.message")
    data: dict         # Event payload
    timestamp: float   # When event occurred
    source: str       # Event source ("twitch", "obs", etc.)

Event Types

Twitch Events

StarStreamer uses EventSub WebSocket for real-time Twitch events:

Event Type Description Key Data
twitch.chat.message Chat message received user, message, badges
twitch.follow New follower user, followed_at
twitch.subscription New/renewed subscription user, tier, is_gift
twitch.raid Channel raid from_broadcaster, viewers
twitch.cheer Bits cheer user, bits, message
twitch.channel_points Channel points redemption user, reward, input

OBS Events

OBS WebSocket provides scene and source control:

Event Type Description Key Data
obs.scene_changed Scene switched scene_name, previous_scene
obs.stream_started Stream went live output_active, output_state
obs.stream_stopped Stream ended output_active, output_state
obs.recording_started Recording started output_path
obs.source_visibility Source shown/hidden source_name, visible

Registering Handlers

Use the @on_event decorator to register handlers:

from starstreamer import on_event
from starstreamer.runtime.types import Event

@on_event("twitch.chat.message")
async def handle_chat(event: Event):
    """Process all chat messages"""
    username = event.data['user']['username']
    message = event.data['message']
    print(f"{username}: {message}")

Multiple Event Types

Register a handler for multiple events:

@on_event("twitch.follow")
@on_event("twitch.subscription")
async def celebrate_support(event: Event, twitch):
    """Thank followers and subscribers"""
    user = event.data['user']['username']

    if event.type == "twitch.follow":
        await twitch.send_message(f"Thanks for following @{user}!")
    else:
        await twitch.send_message(f"Thanks for subscribing @{user}!")

Event Wildcards

Use wildcards to handle groups of events:

@on_event("twitch.*")
async def log_all_twitch_events(event: Event, logger):
    """Log all Twitch events"""
    logger.info(f"Twitch event: {event.type}")

@on_event("obs.stream_*")
async def handle_stream_state(event: Event):
    """Handle stream start/stop"""
    if "started" in event.type:
        print("Stream is live!")
    else:
        print("Stream ended!")

Handler Execution

Concurrent Execution

Handlers run concurrently by default:

@on_event("twitch.follow")
async def handler_a(event):
    await asyncio.sleep(1)
    print("Handler A done")

@on_event("twitch.follow")
async def handler_b(event):
    print("Handler B immediate")

# When follow event fires:
# Output: "Handler B immediate" (immediately)
# Output: "Handler A done" (after 1 second)

Error Isolation

Handler errors don't affect other handlers:

@on_event("twitch.chat.message")
async def buggy_handler(event):
    raise ValueError("Oops!")  # This won't crash other handlers

@on_event("twitch.chat.message")
async def stable_handler(event):
    print("Still running!")  # This executes normally

Priority Control

Control handler execution order with priority:

from starstreamer import priority

@on_event("twitch.chat.message")
@priority(10)  # Higher priority, runs first
async def log_message(event, logger):
    logger.info(f"Chat: {event.data['message']}")

@on_event("twitch.chat.message")
@priority(1)   # Lower priority, runs after
async def process_commands(event):
    # Command processing happens after logging
    pass

Event Filtering

Pre-filter events before they reach handlers:

from starstreamer import filter
from starstreamer.events.filters import TwitchFilters

@on_event("twitch.chat.message")
@filter(TwitchFilters.not_bot())
async def ignore_bots(event):
    """Only processes messages from real users"""
    pass

@on_event("twitch.chat.message")
@filter(lambda e: e.data['user'].get('subscriber', False))
async def subscriber_only(event):
    """Only processes subscriber messages"""
    pass

Emitting Custom Events

Emit your own events for handler chaining:

from starstreamer.core.event_bus import get_event_bus

@on_event("twitch.follow")
async def new_follower(event):
    # Process follow
    user = event.data['user']['username']

    # Emit custom event
    bus = get_event_bus()
    await bus.emit("custom.celebration", {
        "type": "follow",
        "user": user
    })

@on_event("custom.celebration")
async def play_sound(event):
    """Triggered by custom event"""
    print(f"Playing celebration sound for {event.data['user']}")

Event Bus API

Getting the Event Bus

from starstreamer.core.event_bus import get_event_bus

bus = get_event_bus()

Emitting Events

await bus.emit("custom.alert", {
    "message": "Special event!",
    "priority": "high"
})

Registering Handlers Programmatically

async def my_handler(event):
    print(f"Handled: {event.type}")

bus.register("custom.*", my_handler)

Unregistering Handlers

bus.unregister("custom.*", my_handler)

Getting Statistics

stats = bus.get_stats()
print(f"Total events: {stats['total_events']}")
print(f"Active handlers: {stats['handler_count']}")

Best Practices

1. Use Specific Event Types

# Good - specific event type
@on_event("twitch.chat.message")

# Avoid - too broad unless needed
@on_event("twitch.*")

2. Handle Errors Gracefully

@on_event("twitch.chat.message")
async def safe_handler(event, logger):
    try:
        # Your code here
        pass
    except Exception as e:
        logger.error(f"Handler error: {e}")
        # Don't re-raise - let other handlers continue

3. Keep Handlers Focused

# Good - single responsibility
@on_event("twitch.follow")
async def thank_follower(event, twitch):
    user = event.data['user']['username']
    await twitch.send_message(f"Thanks @{user}!")

@on_event("twitch.follow")
async def log_follower(event, logger):
    logger.info(f"New follower: {event.data['user']['username']}")

# Avoid - doing too much in one handler
@on_event("twitch.follow")
async def do_everything(event):
    # Thank, log, update database, send webhook, etc.
    pass

4. Use Dependency Injection

# Good - explicit dependencies
@on_event("twitch.chat.message")
async def handler(event: Event, twitch: TwitchClient, logger: Logger):
    # Services are injected automatically
    pass

Performance

The event bus is designed for high throughput:

  • < 10ms dispatch latency
  • 1000+ events/second throughput
  • Concurrent handler execution
  • Error isolation
  • Zero polling - pure event-driven

Next Steps