Dependency Injection¶
StarStreamer uses a powerful dependency injection (DI) system that makes your code cleaner, more testable, and easier to maintain. Instead of importing services globally, they're automatically provided to your handlers.
Implementation Status: ✅ Fully Implemented - Dependency injection system is complete with explicit handler support.
Why Dependency Injection?¶
Traditional approach (without DI):
# ❌ Hard to test, tightly coupled
from starstreamer.plugins.twitch import twitch_client
import logging
logger = logging.getLogger(__name__)
@on_event("twitch.chat.message")
async def handler(event):
# Services are global imports
logger.info("Got message")
await twitch_client.send_message("Response")
StarStreamer approach (with DI):
# ✅ Easy to test, loosely coupled
@on_event("twitch.chat.message")
async def handler(event: Event, twitch: TwitchClient, logger: Logger):
# Services are injected automatically
logger.info("Got message")
await twitch.send_message("Response")
How It Works¶
- Registration: Services are registered in a container
- Declaration: Handlers declare what services they need
- Resolution: The DI system provides the services at runtime
- Injection: Services are passed as handler parameters
Available Services¶
StarStreamer automatically provides these services:
| Service | Type | Description |
|---|---|---|
event |
Event |
The triggering event (always first parameter) |
twitch |
TwitchClient |
Twitch API client |
logger |
logging.Logger |
Configured logger instance |
database |
Database |
SQLite database connection |
economy |
EconomyService |
Economy/points management |
users |
UserService |
User management service |
Basic Usage¶
Simple Handler¶
from starstreamer import on_event
from starstreamer.runtime.types import Event
from starstreamer.plugins.twitch import TwitchClient
@on_event("twitch.chat.message")
async def chat_handler(event: Event, twitch: TwitchClient):
"""Handler with injected Twitch client"""
username = event.data['user']['username']
await twitch.send_message(f"Hello @{username}!")
Multiple Services¶
import logging
from starstreamer.services.economy import EconomyService
@on_event("twitch.raid")
async def raid_handler(
event: Event,
twitch: TwitchClient,
economy: EconomyService,
logger: logging.Logger
):
"""Handler using multiple services"""
raider = event.data['from_broadcaster']['username']
viewers = event.data['viewers']
# Log the raid
logger.info(f"Raid from {raider} with {viewers} viewers")
# Thank in chat
await twitch.send_message(f"Thanks for the raid @{raider}! Welcome raiders!")
# Give bonus points to raider
await economy.add_balance(raider, viewers * 10)
Service Resolution¶
Automatic Type Matching¶
The DI system uses type hints to determine what to inject:
@on_event("twitch.chat.message")
async def typed_handler(
event: Event, # Resolved by type
twitch: TwitchClient, # Resolved by type
logger: logging.Logger # Resolved by type
):
"""Types determine what gets injected"""
pass
Parameter Names as Hints¶
If types aren't specified, parameter names are used:
@on_event("twitch.chat.message")
async def named_handler(event, twitch, database, logger):
"""Parameter names match service names"""
# Works but less explicit than using types
pass
Custom Services¶
Register your own services for injection:
from starstreamer.core.decorators import setup_dependency_injection
# Create a custom service
class DatabaseService:
async def save_message(self, user: str, message: str):
# Database logic here
pass
# Set up dependency injection and get container
container, registry = setup_dependency_injection()
container.register_singleton(DatabaseService, DatabaseService())
# Use in handlers
@on_event("twitch.chat.message")
async def save_chat(event: Event, db: DatabaseService):
"""Custom service injection"""
await db.save_message(
event.data['user']['username'],
event.data['message']
)
Service Lifetimes¶
Singleton Services¶
One instance shared across all handlers:
# Register singleton
container.register_singleton(
MyService,
MyService(config="shared")
)
# Same instance in all handlers
@on_event("event.a")
async def handler_a(service: MyService):
# Gets the singleton instance
pass
@on_event("event.b")
async def handler_b(service: MyService):
# Gets the SAME singleton instance
pass
Factory Services¶
New instance created for each injection:
# Register factory
container.register_factory(
RequestContext,
lambda: RequestContext(timestamp=time.time())
)
# Fresh instance each time
@on_event("event.a")
async def handler_a(ctx: RequestContext):
# Gets a new instance
pass
@on_event("event.b")
async def handler_b(ctx: RequestContext):
# Gets a DIFFERENT new instance
pass
Testing with DI¶
Dependency injection makes testing much easier:
# test_handler.py
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_chat_handler():
"""Test handler with mock services"""
# Create mocks
mock_event = MagicMock()
mock_event.data = {'user': {'username': 'testuser'}}
mock_twitch = AsyncMock()
mock_logger = MagicMock()
# Call handler directly with mocks
await chat_handler(mock_event, mock_twitch, mock_logger)
# Verify behavior
mock_twitch.send_message.assert_called_once_with(
"Hello @testuser!"
)
mock_logger.info.assert_called()
Advanced Patterns¶
Optional Services¶
Make services optional with default values:
from typing import Optional
@on_event("twitch.chat.message")
async def flexible_handler(
event: Event,
twitch: TwitchClient,
db: Optional[DatabaseService] = None
):
"""Database service is optional"""
await twitch.send_message("Got message")
if db:
await db.save_message(...)
Service Composition¶
Build complex services from simpler ones:
class AlertService:
def __init__(self, twitch: TwitchClient, economy: EconomyService):
self.twitch = twitch
self.economy = economy
async def show_follow_alert(self, username: str, user_id: str):
await self.twitch.send_message(f"Thanks for following @{username}!")
await self.economy.add_balance(user_id, 100) # Bonus for following
# Register composite service
container.register_factory(
AlertService,
lambda c: AlertService(
c.resolve(TwitchClient),
c.resolve(EconomyService)
)
)
# Use in handler
@on_event("twitch.follow")
async def follow_alert(event: Event, alerts: AlertService):
user = event.data['user']
await alerts.show_follow_alert(user['username'], user['id'])
Context-Aware Services¶
Services that need event context:
class UserContext:
def __init__(self, event: Event):
self.username = event.data['user']['username']
self.is_mod = 'moderator' in event.data.get('badges', {})
self.is_sub = event.data['user'].get('subscriber', False)
# Register with event dependency
container.register_factory(
UserContext,
lambda c: UserContext(c.resolve(Event))
)
@on_event("twitch.chat.message")
async def context_handler(event: Event, user: UserContext):
if user.is_mod:
# Mod-specific logic
pass
Service Discovery¶
Inspect available services:
from starstreamer.core.decorators import setup_dependency_injection
container, registry = setup_dependency_injection()
# List all registered services
# Note: Service discovery methods may vary based on implementation
print(f"Container configured with registry: {registry}")
# To check if service is available, you would register it first
container.register_singleton(DatabaseService, DatabaseService())
print("Database service is registered")
Best Practices¶
1. Use Type Hints¶
Always specify types for clarity:
# ✅ Good - explicit types
async def handler(event: Event, twitch: TwitchClient, logger: Logger):
pass
# ❌ Avoid - ambiguous types
async def handler(event, twitch, logger):
pass
2. Keep Services Focused¶
Each service should have a single responsibility:
# ✅ Good - focused services
class ChatLogger:
async def log_message(self, user, message): ...
class ChatModerator:
async def check_spam(self, message): ...
# ❌ Avoid - doing too much
class ChatEverything:
async def log_message(self, ...): ...
async def check_spam(self, ...): ...
async def send_response(self, ...): ...
3. Prefer Injection Over Import¶
# ✅ Good - injected service
@on_event("twitch.chat.message")
async def handler(event: Event, twitch: TwitchClient):
await twitch.send_message("Hello")
# ❌ Avoid - global import
from somewhere import global_twitch_client
@on_event("twitch.chat.message")
async def handler(event: Event):
await global_twitch_client.send_message("Hello")
4. Document Service Requirements¶
@on_event("twitch.chat.message")
async def complex_handler(
event: Event,
twitch: TwitchClient,
database: Database,
economy: EconomyService,
users: UserService
):
"""
Handle chat messages with multiple integrations.
Requires:
- TwitchClient: For sending responses
- Database: For data persistence
- EconomyService: For points management
- UserService: For user data management
"""
pass
Troubleshooting¶
Service Not Found¶
# Error: No service registered for type 'CustomService'
# Solution: Register the service
container.register_singleton(CustomService, CustomService())
Circular Dependencies¶
# Error: Circular dependency detected
# Avoid circular references
class ServiceA:
def __init__(self, b: ServiceB): ... # A needs B
class ServiceB:
def __init__(self, a: ServiceA): ... # B needs A - circular!
# Solution: Use factory or redesign
Type Mismatch¶
# Error: Cannot resolve parameter 'logger'
# Solution: Use correct type
import logging # Make sure to import
async def handler(event: Event, logger: logging.Logger):
pass
Next Steps¶
- Explore Chat Commands using DI
- Learn about Testing with mock services
- See API Reference for container API