Skip to content

Dependency Injection

StarStreamer uses a powerful dependency injection (DI) system that makes your code cleaner, more testable, and easier to maintain. Instead of importing services globally, they're automatically provided to your handlers.

Implementation Status: ✅ Fully Implemented - Dependency injection system is complete with explicit handler support.

Why Dependency Injection?

Traditional approach (without DI):

# ❌ Hard to test, tightly coupled
from starstreamer.plugins.twitch import twitch_client
import logging

logger = logging.getLogger(__name__)

@on_event("twitch.chat.message")
async def handler(event):
    # Services are global imports
    logger.info("Got message")
    await twitch_client.send_message("Response")

StarStreamer approach (with DI):

# ✅ Easy to test, loosely coupled
@on_event("twitch.chat.message")
async def handler(event: Event, twitch: TwitchClient, logger: Logger):
    # Services are injected automatically
    logger.info("Got message")
    await twitch.send_message("Response")

How It Works

  1. Registration: Services are registered in a container
  2. Declaration: Handlers declare what services they need
  3. Resolution: The DI system provides the services at runtime
  4. Injection: Services are passed as handler parameters

Available Services

StarStreamer automatically provides these services:

Service Type Description
event Event The triggering event (always first parameter)
twitch TwitchClient Twitch API client
logger logging.Logger Configured logger instance
database Database SQLite database connection
economy EconomyService Economy/points management
users UserService User management service

Basic Usage

Simple Handler

from starstreamer import on_event
from starstreamer.runtime.types import Event
from starstreamer.plugins.twitch import TwitchClient

@on_event("twitch.chat.message")
async def chat_handler(event: Event, twitch: TwitchClient):
    """Handler with injected Twitch client"""
    username = event.data['user']['username']
    await twitch.send_message(f"Hello @{username}!")

Multiple Services

import logging
from starstreamer.services.economy import EconomyService

@on_event("twitch.raid")
async def raid_handler(
    event: Event,
    twitch: TwitchClient,
    economy: EconomyService,
    logger: logging.Logger
):
    """Handler using multiple services"""
    raider = event.data['from_broadcaster']['username']
    viewers = event.data['viewers']

    # Log the raid
    logger.info(f"Raid from {raider} with {viewers} viewers")

    # Thank in chat
    await twitch.send_message(f"Thanks for the raid @{raider}! Welcome raiders!")

    # Give bonus points to raider
    await economy.add_balance(raider, viewers * 10)

Service Resolution

Automatic Type Matching

The DI system uses type hints to determine what to inject:

@on_event("twitch.chat.message")
async def typed_handler(
    event: Event,                    # Resolved by type
    twitch: TwitchClient,            # Resolved by type
    logger: logging.Logger           # Resolved by type
):
    """Types determine what gets injected"""
    pass

Parameter Names as Hints

If types aren't specified, parameter names are used:

@on_event("twitch.chat.message")
async def named_handler(event, twitch, database, logger):
    """Parameter names match service names"""
    # Works but less explicit than using types
    pass

Custom Services

Register your own services for injection:

from starstreamer.core.decorators import setup_dependency_injection

# Create a custom service
class DatabaseService:
    async def save_message(self, user: str, message: str):
        # Database logic here
        pass

# Set up dependency injection and get container
container, registry = setup_dependency_injection()
container.register_singleton(DatabaseService, DatabaseService())

# Use in handlers
@on_event("twitch.chat.message")
async def save_chat(event: Event, db: DatabaseService):
    """Custom service injection"""
    await db.save_message(
        event.data['user']['username'],
        event.data['message']
    )

Service Lifetimes

Singleton Services

One instance shared across all handlers:

# Register singleton
container.register_singleton(
    MyService,
    MyService(config="shared")
)

# Same instance in all handlers
@on_event("event.a")
async def handler_a(service: MyService):
    # Gets the singleton instance
    pass

@on_event("event.b")
async def handler_b(service: MyService):
    # Gets the SAME singleton instance
    pass

Factory Services

New instance created for each injection:

# Register factory
container.register_factory(
    RequestContext,
    lambda: RequestContext(timestamp=time.time())
)

# Fresh instance each time
@on_event("event.a")
async def handler_a(ctx: RequestContext):
    # Gets a new instance
    pass

@on_event("event.b")
async def handler_b(ctx: RequestContext):
    # Gets a DIFFERENT new instance
    pass

Testing with DI

Dependency injection makes testing much easier:

# test_handler.py
import pytest
from unittest.mock import AsyncMock, MagicMock

@pytest.mark.asyncio
async def test_chat_handler():
    """Test handler with mock services"""
    # Create mocks
    mock_event = MagicMock()
    mock_event.data = {'user': {'username': 'testuser'}}

    mock_twitch = AsyncMock()
    mock_logger = MagicMock()

    # Call handler directly with mocks
    await chat_handler(mock_event, mock_twitch, mock_logger)

    # Verify behavior
    mock_twitch.send_message.assert_called_once_with(
        "Hello @testuser!"
    )
    mock_logger.info.assert_called()

Advanced Patterns

Optional Services

Make services optional with default values:

from typing import Optional

@on_event("twitch.chat.message")
async def flexible_handler(
    event: Event,
    twitch: TwitchClient,
    db: Optional[DatabaseService] = None
):
    """Database service is optional"""
    await twitch.send_message("Got message")

    if db:
        await db.save_message(...)

Service Composition

Build complex services from simpler ones:

class AlertService:
    def __init__(self, twitch: TwitchClient, economy: EconomyService):
        self.twitch = twitch
        self.economy = economy

    async def show_follow_alert(self, username: str, user_id: str):
        await self.twitch.send_message(f"Thanks for following @{username}!")
        await self.economy.add_balance(user_id, 100)  # Bonus for following

# Register composite service
container.register_factory(
    AlertService,
    lambda c: AlertService(
        c.resolve(TwitchClient),
        c.resolve(EconomyService)
    )
)

# Use in handler
@on_event("twitch.follow")
async def follow_alert(event: Event, alerts: AlertService):
    user = event.data['user']
    await alerts.show_follow_alert(user['username'], user['id'])

Context-Aware Services

Services that need event context:

class UserContext:
    def __init__(self, event: Event):
        self.username = event.data['user']['username']
        self.is_mod = 'moderator' in event.data.get('badges', {})
        self.is_sub = event.data['user'].get('subscriber', False)

# Register with event dependency
container.register_factory(
    UserContext,
    lambda c: UserContext(c.resolve(Event))
)

@on_event("twitch.chat.message")
async def context_handler(event: Event, user: UserContext):
    if user.is_mod:
        # Mod-specific logic
        pass

Service Discovery

Inspect available services:

from starstreamer.core.decorators import setup_dependency_injection

container, registry = setup_dependency_injection()

# List all registered services
# Note: Service discovery methods may vary based on implementation
print(f"Container configured with registry: {registry}")

# To check if service is available, you would register it first
container.register_singleton(DatabaseService, DatabaseService())
print("Database service is registered")

Best Practices

1. Use Type Hints

Always specify types for clarity:

# ✅ Good - explicit types
async def handler(event: Event, twitch: TwitchClient, logger: Logger):
    pass

# ❌ Avoid - ambiguous types
async def handler(event, twitch, logger):
    pass

2. Keep Services Focused

Each service should have a single responsibility:

# ✅ Good - focused services
class ChatLogger:
    async def log_message(self, user, message): ...

class ChatModerator:
    async def check_spam(self, message): ...

# ❌ Avoid - doing too much
class ChatEverything:
    async def log_message(self, ...): ...
    async def check_spam(self, ...): ...
    async def send_response(self, ...): ...

3. Prefer Injection Over Import

# ✅ Good - injected service
@on_event("twitch.chat.message")
async def handler(event: Event, twitch: TwitchClient):
    await twitch.send_message("Hello")

# ❌ Avoid - global import
from somewhere import global_twitch_client

@on_event("twitch.chat.message")
async def handler(event: Event):
    await global_twitch_client.send_message("Hello")

4. Document Service Requirements

@on_event("twitch.chat.message")
async def complex_handler(
    event: Event,
    twitch: TwitchClient,
    database: Database,
    economy: EconomyService,
    users: UserService
):
    """
    Handle chat messages with multiple integrations.

    Requires:
    - TwitchClient: For sending responses
    - Database: For data persistence
    - EconomyService: For points management
    - UserService: For user data management
    """
    pass

Troubleshooting

Service Not Found

# Error: No service registered for type 'CustomService'

# Solution: Register the service
container.register_singleton(CustomService, CustomService())

Circular Dependencies

# Error: Circular dependency detected

# Avoid circular references
class ServiceA:
    def __init__(self, b: ServiceB): ...  # A needs B

class ServiceB:
    def __init__(self, a: ServiceA): ...  # B needs A - circular!

# Solution: Use factory or redesign

Type Mismatch

# Error: Cannot resolve parameter 'logger'

# Solution: Use correct type
import logging  # Make sure to import

async def handler(event: Event, logger: logging.Logger):
    pass

Next Steps