Event System¶
StarStreamer's event system is the heart of the framework. It connects event sources (Twitch, OBS) to your action handlers through a central event bus.
Implementation Status: ✅ Fully Implemented - The core event system is complete and functional.
Architecture Overview¶
graph TD
subgraph Sources
T[Twitch EventSub]
O[OBS WebSocket]
W[Web Interface]
end
subgraph "Event Bus"
EB[EventBus Core]
R[Handler Registry]
D[Dispatcher]
end
subgraph Handlers
H1[Chat Commands]
H2[Alerts]
H3[Automations]
end
T --> EB
O --> EB
W --> EB
EB --> R
R --> D
D --> H1
D --> H2
D --> H3
Event Flow¶
- Event Source emits an event (e.g., Twitch chat message)
- Event Bus receives and transforms the event
- Registry finds matching handlers
- Dispatcher executes handlers concurrently
- Handlers perform actions (send messages, trigger alerts, etc.)
Event Structure¶
All events follow a consistent structure:
class Event:
type: str # Event type (e.g., "twitch.chat.message")
data: dict # Event payload
timestamp: float # When event occurred
source: str # Event source ("twitch", "obs", etc.)
Event Types¶
Twitch Events¶
StarStreamer uses EventSub WebSocket for real-time Twitch events:
| Event Type | Description | Key Data |
|---|---|---|
twitch.chat.message |
Chat message received | user, message, badges |
twitch.follow |
New follower | user, followed_at |
twitch.subscription |
New/renewed subscription | user, tier, is_gift |
twitch.raid |
Channel raid | from_broadcaster, viewers |
twitch.cheer |
Bits cheer | user, bits, message |
twitch.channel_points |
Channel points redemption | user, reward, input |
OBS Events¶
OBS WebSocket provides scene and source control:
| Event Type | Description | Key Data |
|---|---|---|
obs.scene_changed |
Scene switched | scene_name, previous_scene |
obs.stream_started |
Stream went live | output_active, output_state |
obs.stream_stopped |
Stream ended | output_active, output_state |
obs.recording_started |
Recording started | output_path |
obs.source_visibility |
Source shown/hidden | source_name, visible |
Registering Handlers¶
Use the @on_event decorator to register handlers:
from starstreamer import on_event
from starstreamer.runtime.types import Event
@on_event("twitch.chat.message")
async def handle_chat(event: Event):
"""Process all chat messages"""
username = event.data['user']['username']
message = event.data['message']
print(f"{username}: {message}")
Multiple Event Types¶
Register a handler for multiple events:
@on_event("twitch.follow")
@on_event("twitch.subscription")
async def celebrate_support(event: Event, twitch):
"""Thank followers and subscribers"""
user = event.data['user']['username']
if event.type == "twitch.follow":
await twitch.send_message(f"Thanks for following @{user}!")
else:
await twitch.send_message(f"Thanks for subscribing @{user}!")
Event Wildcards¶
Use wildcards to handle groups of events:
@on_event("twitch.*")
async def log_all_twitch_events(event: Event, logger):
"""Log all Twitch events"""
logger.info(f"Twitch event: {event.type}")
@on_event("obs.stream_*")
async def handle_stream_state(event: Event):
"""Handle stream start/stop"""
if "started" in event.type:
print("Stream is live!")
else:
print("Stream ended!")
Handler Execution¶
Concurrent Execution¶
Handlers run concurrently by default:
@on_event("twitch.follow")
async def handler_a(event):
await asyncio.sleep(1)
print("Handler A done")
@on_event("twitch.follow")
async def handler_b(event):
print("Handler B immediate")
# When follow event fires:
# Output: "Handler B immediate" (immediately)
# Output: "Handler A done" (after 1 second)
Error Isolation¶
Handler errors don't affect other handlers:
@on_event("twitch.chat.message")
async def buggy_handler(event):
raise ValueError("Oops!") # This won't crash other handlers
@on_event("twitch.chat.message")
async def stable_handler(event):
print("Still running!") # This executes normally
Priority Control¶
Control handler execution order with priority:
from starstreamer import priority
@on_event("twitch.chat.message")
@priority(10) # Higher priority, runs first
async def log_message(event, logger):
logger.info(f"Chat: {event.data['message']}")
@on_event("twitch.chat.message")
@priority(1) # Lower priority, runs after
async def process_commands(event):
# Command processing happens after logging
pass
Event Filtering¶
Pre-filter events before they reach handlers:
from starstreamer import filter
from starstreamer.events.filters import TwitchFilters
@on_event("twitch.chat.message")
@filter(TwitchFilters.not_bot())
async def ignore_bots(event):
"""Only processes messages from real users"""
pass
@on_event("twitch.chat.message")
@filter(lambda e: e.data['user'].get('subscriber', False))
async def subscriber_only(event):
"""Only processes subscriber messages"""
pass
Emitting Custom Events¶
Emit your own events for handler chaining:
from starstreamer.core.event_bus import get_event_bus
@on_event("twitch.follow")
async def new_follower(event):
# Process follow
user = event.data['user']['username']
# Emit custom event
bus = get_event_bus()
await bus.emit("custom.celebration", {
"type": "follow",
"user": user
})
@on_event("custom.celebration")
async def play_sound(event):
"""Triggered by custom event"""
print(f"Playing celebration sound for {event.data['user']}")
Event Bus API¶
Getting the Event Bus¶
Emitting Events¶
Registering Handlers Programmatically¶
Unregistering Handlers¶
Getting Statistics¶
stats = bus.get_stats()
print(f"Total events: {stats['total_events']}")
print(f"Active handlers: {stats['handler_count']}")
Best Practices¶
1. Use Specific Event Types¶
# Good - specific event type
@on_event("twitch.chat.message")
# Avoid - too broad unless needed
@on_event("twitch.*")
2. Handle Errors Gracefully¶
@on_event("twitch.chat.message")
async def safe_handler(event, logger):
try:
# Your code here
pass
except Exception as e:
logger.error(f"Handler error: {e}")
# Don't re-raise - let other handlers continue
3. Keep Handlers Focused¶
# Good - single responsibility
@on_event("twitch.follow")
async def thank_follower(event, twitch):
user = event.data['user']['username']
await twitch.send_message(f"Thanks @{user}!")
@on_event("twitch.follow")
async def log_follower(event, logger):
logger.info(f"New follower: {event.data['user']['username']}")
# Avoid - doing too much in one handler
@on_event("twitch.follow")
async def do_everything(event):
# Thank, log, update database, send webhook, etc.
pass
4. Use Dependency Injection¶
# Good - explicit dependencies
@on_event("twitch.chat.message")
async def handler(event: Event, twitch: TwitchClient, logger: Logger):
# Services are injected automatically
pass
Performance¶
The event bus is designed for high throughput:
- < 10ms dispatch latency
- 1000+ events/second throughput
- Concurrent handler execution
- Error isolation
- Zero polling - pure event-driven
Next Steps¶
- Learn about the Module System
- Explore Triggers & Filters
- Understand Dependency Injection