Skip to content

Dependency Injection API Reference

StarStreamer's dependency injection system provides clean, testable code architecture by automatically resolving service dependencies for event handlers. The system supports both singleton instances and factory functions for maximum flexibility.

Core Components

ServiceContainer

ServiceContainer

ServiceContainer()

A lightweight dependency injection container that supports both singleton instances and factory functions for creating services.

Source code in src/starstreamer/core/di/container.py
def __init__(self) -> None:
    self._singletons: dict[type, Any] = {}
    self._factories: dict[type, Callable[[], Any]] = {}

clear

clear() -> None

Clear all registered services.

Source code in src/starstreamer/core/di/container.py
def clear(self) -> None:
    """Clear all registered services."""
    self._singletons.clear()
    self._factories.clear()

get_registered_types

get_registered_types() -> set[type]

Get all registered service types.

Returns:

Type Description
set[type]

Set of all registered service types

Source code in src/starstreamer/core/di/container.py
def get_registered_types(self) -> set[type]:
    """
    Get all registered service types.

    Returns:
        Set of all registered service types
    """
    return set(self._singletons.keys()) | set(self._factories.keys())

is_registered

is_registered(service_type: type) -> bool

Check if a service type is registered.

Parameters:

Name Type Description Default
service_type type

The type to check

required

Returns:

Type Description
bool

True if the service is registered, False otherwise

Source code in src/starstreamer/core/di/container.py
def is_registered(self, service_type: type) -> bool:
    """
    Check if a service type is registered.

    Args:
        service_type: The type to check

    Returns:
        True if the service is registered, False otherwise
    """
    return service_type in self._singletons or service_type in self._factories

register_factory

register_factory(service_type: type[T], factory: Callable[[], T]) -> None

Register a factory function for creating service instances.

Parameters:

Name Type Description Default
service_type type[T]

The type/interface that this service implements

required
factory Callable[[], T]

A callable that returns a new service instance

required
Example

container.register_factory(Logger, lambda: logging.getLogger("starstreamer"))

Source code in src/starstreamer/core/di/container.py
def register_factory(self, service_type: type[T], factory: Callable[[], T]) -> None:
    """
    Register a factory function for creating service instances.

    Args:
        service_type: The type/interface that this service implements
        factory: A callable that returns a new service instance

    Example:
        container.register_factory(Logger, lambda: logging.getLogger("starstreamer"))
    """
    self._factories[service_type] = factory

register_singleton

register_singleton(service_type: type[T], instance: T) -> None

Register a singleton service instance.

Parameters:

Name Type Description Default
service_type type[T]

The type/interface that this service implements

required
instance T

The actual service instance

required
Example

container.register_singleton(TwitchClient, twitch_client_instance)

Source code in src/starstreamer/core/di/container.py
def register_singleton(self, service_type: type[T], instance: T) -> None:
    """
    Register a singleton service instance.

    Args:
        service_type: The type/interface that this service implements
        instance: The actual service instance

    Example:
        container.register_singleton(TwitchClient, twitch_client_instance)
    """
    self._singletons[service_type] = instance

resolve

resolve(service_type: type[T]) -> T

Resolve a service by its type.

Parameters:

Name Type Description Default
service_type type[T]

The type of service to resolve

required

Returns:

Type Description
T

The service instance

Raises:

Type Description
KeyError

If the service type is not registered

Example

twitch_client = container.resolve(TwitchClient)

Source code in src/starstreamer/core/di/container.py
def resolve(self, service_type: type[T]) -> T:
    """
    Resolve a service by its type.

    Args:
        service_type: The type of service to resolve

    Returns:
        The service instance

    Raises:
        KeyError: If the service type is not registered

    Example:
        twitch_client = container.resolve(TwitchClient)
    """
    # Check singletons first
    if service_type in self._singletons:
        return cast("T", self._singletons[service_type])

    # Check factories
    if service_type in self._factories:
        factory = self._factories[service_type]
        return cast("T", factory())

    # Service not found
    raise KeyError(f"Service type {service_type.__name__} is not registered")

The ServiceContainer is the heart of the dependency injection system, managing service registration and resolution.

Constructor

def __init__(self) -> None:
    """Initialize an empty service container"""

Creates a new container with no registered services.

Service Registration

register_singleton(service_type: type[T], instance: T) -> None

Register a singleton service instance that will be reused for all requests.

Parameters: - service_type: The type/interface that this service implements - instance: The actual service instance

Usage:

from starstreamer.core.di.container import ServiceContainer
from starstreamer.plugins.twitch import TwitchClient
from starstreamer.plugins.obs import OBSClient
from starstreamer.plugins.elevenlabs import ElevenLabsClient
import logging

container = ServiceContainer()

# Register singleton services
twitch_client = TwitchClient()  # Uses config.yaml settings
container.register_singleton(TwitchClient, twitch_client)

obs_client = OBSClient()
container.register_singleton(OBSClient, obs_client)

elevenlabs_client = ElevenLabsClient()
container.register_singleton(ElevenLabsClient, elevenlabs_client)

# Register other services
logger = logging.getLogger("starstreamer")
container.register_singleton(logging.Logger, logger)

register_factory(service_type: type[T], factory: Callable[[], T]) -> None

Register a factory function that creates new service instances on each request.

Parameters: - service_type: The type/interface that this service implements
- factory: A callable that returns a new service instance

Usage:

# Register factory for per-request instances
container.register_factory(
    DatabaseConnection,
    lambda: DatabaseConnection("sqlite:///data.db")
)

# Register with configuration
def create_logger():
    logger = logging.getLogger(f"handler-{uuid4()}")
    logger.setLevel(logging.INFO)
    return logger

container.register_factory(logging.Logger, create_logger)

Service Resolution

resolve(service_type: type[T]) -> T

Resolve a service by its type.

Parameters: - service_type: The type of service to resolve

Returns: The service instance

Raises: KeyError if the service type is not registered

Usage:

# Resolve services
twitch = container.resolve(TwitchClient)
logger = container.resolve(logging.Logger)

# Use in event handlers (done automatically)
async def my_handler(event: Event, twitch: TwitchClient, logger: logging.Logger):
    await twitch.send_message("Hello!")
    logger.info("Message sent")

Container Management

is_registered(service_type: type) -> bool

Check if a service type is registered.

if container.is_registered(TwitchClient):
    twitch = container.resolve(TwitchClient)
else:
    print("TwitchClient not registered")
get_registered_types() -> set[type]

Get all registered service types.

registered_types = container.get_registered_types()
print(f"Registered services: {[t.__name__ for t in registered_types]}")
clear() -> None

Clear all registered services.

container.clear()  # Remove all registrations

HandlerRegistry

HandlerRegistry

HandlerRegistry(container: ServiceContainer)

Registry that manages handler registration and dependency injection.

Supports both legacy (event, ctx) and explicit dependency injection styles.

Source code in src/starstreamer/core/di/registry.py
def __init__(self, container: ServiceContainer) -> None:
    self.container = container
    self.handlers: dict[str, list[dict[str, Any]]] = defaultdict(list)
    self.inspector = HandlerInspector()
    # Track handlers by module for hot reloading
    self.handlers_by_module: dict[str, list[tuple[str, Callable[..., Any]]]] = defaultdict(list)

clear_handlers

clear_handlers(event_type: str | None = None) -> None

Clear handlers for a specific event type or all event types.

Parameters:

Name Type Description Default
event_type str | None

Event type to clear, or None to clear all

None
Source code in src/starstreamer/core/di/registry.py
def clear_handlers(self, event_type: str | None = None) -> None:
    """
    Clear handlers for a specific event type or all event types.

    Args:
        event_type: Event type to clear, or None to clear all
    """
    if event_type is None:
        self.handlers.clear()
        logger.info("Cleared all registered handlers")
    else:
        self.handlers[event_type].clear()
        logger.info(f"Cleared handlers for event type {event_type}")

create_active_trigger_wrapper

create_active_trigger_wrapper(handler: Callable[..., Any]) -> Callable[[Any, Any], Awaitable[None]]

Create a wrapper for active trigger handlers with dependency injection.

This allows active triggers to use the same DI system as event handlers.

Parameters:

Name Type Description Default
handler Callable[..., Any]

The handler function to wrap

required

Returns:

Type Description
Callable[[Any, Any], Awaitable[None]]

Wrapped handler compatible with active trigger execution

Source code in src/starstreamer/core/di/registry.py
def create_active_trigger_wrapper(self, handler: Callable[..., Any]) -> Callable[[Any, Any], Awaitable[None]]:
    """
    Create a wrapper for active trigger handlers with dependency injection.

    This allows active triggers to use the same DI system as event handlers.

    Args:
        handler: The handler function to wrap

    Returns:
        Wrapped handler compatible with active trigger execution
    """
    handler_info = self.inspector.inspect_handler(handler)

    if not handler_info["is_valid"]:
        raise ValueError(f"Invalid handler: {handler_info['validation_message']}")

    if handler_info["style"] == HandlerStyle.LEGACY:
        return self._create_legacy_wrapper(handler, handler_info)
    return self._create_explicit_wrapper(handler, handler_info)

get_handlers

get_handlers(event_type: str) -> list[dict[str, Any]]

Get all registered handlers for an event type.

Parameters:

Name Type Description Default
event_type str

The event type to get handlers for

required

Returns:

Type Description
list[dict[str, Any]]

List of handler records sorted by priority

Source code in src/starstreamer/core/di/registry.py
def get_handlers(self, event_type: str) -> list[dict[str, Any]]:
    """
    Get all registered handlers for an event type.

    Args:
        event_type: The event type to get handlers for

    Returns:
        List of handler records sorted by priority
    """
    return self.handlers.get(event_type, [])

get_stats

get_stats() -> dict[str, Any]

Get statistics about registered handlers.

Returns:

Type Description
dict[str, Any]

Dictionary with handler statistics

Source code in src/starstreamer/core/di/registry.py
def get_stats(self) -> dict[str, Any]:
    """
    Get statistics about registered handlers.

    Returns:
        Dictionary with handler statistics
    """
    total_handlers = sum(len(handlers) for handlers in self.handlers.values())

    style_counts = {"legacy": 0, "explicit": 0}
    for handlers in self.handlers.values():
        for handler_record in handlers:
            style = handler_record["style"].value
            style_counts[style] += 1

    return {
        "total_handlers": total_handlers,
        "event_types": list(self.handlers.keys()),
        "handlers_per_event": {event: len(handlers) for event, handlers in self.handlers.items()},
        "style_distribution": style_counts,
        "registered_services": len(self.container.get_registered_types()),
    }

register

register(event_type: str, handler: Callable[..., Any], filters: list[Callable[..., bool]] | None = None, priority: int = 5) -> None

Register a handler for an event type with automatic dependency injection.

Parameters:

Name Type Description Default
event_type str

The event type to handle (e.g., "twitch.chat.message")

required
handler Callable[..., Any]

The handler function (legacy or explicit style)

required
filters list[Callable[..., bool]] | None

Optional filter functions

None
priority int

Handler priority (lower = higher priority)

5
Source code in src/starstreamer/core/di/registry.py
def register(
    self,
    event_type: str,
    handler: Callable[..., Any],
    filters: list[Callable[..., bool]] | None = None,
    priority: int = 5,
) -> None:
    """
    Register a handler for an event type with automatic dependency injection.

    Args:
        event_type: The event type to handle (e.g., "twitch.chat.message")
        handler: The handler function (legacy or explicit style)
        filters: Optional filter functions
        priority: Handler priority (lower = higher priority)
    """
    # Inspect the handler to determine its style and dependencies
    handler_info = self.inspector.inspect_handler(handler)

    if not handler_info["is_valid"]:
        logger.warning(
            f"Handler {handler_info['handler_name']} failed validation: {handler_info['validation_message']}"
        )
        return

    # Create the appropriate wrapper based on handler style
    if handler_info["style"] == HandlerStyle.LEGACY:
        wrapped_handler = self._create_legacy_wrapper(handler, handler_info)
    else:
        wrapped_handler = self._create_explicit_wrapper(handler, handler_info)

    # Detect calling module for tracking
    calling_module = self._get_calling_module()

    # Store handler registration info
    handler_record = {
        "handler": wrapped_handler,
        "original_handler": handler,
        "filters": filters or [],
        "priority": priority,
        "style": handler_info["style"],
        "dependencies": handler_info.get("dependencies", []),
        "handler_name": handler_info["handler_name"],
        "is_async": handler_info["is_async"],
        "module": calling_module,
    }

    self.handlers[event_type].append(handler_record)

    # Track handler by module for hot reloading
    if calling_module:
        self.handlers_by_module[calling_module].append((event_type, handler))

    # Sort by priority after adding
    self.handlers[event_type].sort(key=lambda h: h["priority"])

    logger.info(
        f"Registered {handler_info['style'].value} style handler "
        f"{handler_info['handler_name']} for event {event_type}"
        f"{f' (module: {calling_module})' if calling_module else ''}"
    )

unregister

unregister(event_type: str, handler: Callable[..., Any]) -> bool

Unregister a handler for an event type.

Parameters:

Name Type Description Default
event_type str

The event type

required
handler Callable[..., Any]

The original handler function

required

Returns:

Type Description
bool

True if handler was found and removed, False otherwise

Source code in src/starstreamer/core/di/registry.py
def unregister(self, event_type: str, handler: Callable[..., Any]) -> bool:
    """
    Unregister a handler for an event type.

    Args:
        event_type: The event type
        handler: The original handler function

    Returns:
        True if handler was found and removed, False otherwise
    """
    handlers = self.handlers.get(event_type, [])

    for i, handler_record in enumerate(handlers):
        if handler_record["original_handler"] is handler:
            del handlers[i]
            logger.info(f"Unregistered handler {handler.__name__} for event {event_type}")
            return True

    return False

unregister_module_handlers

unregister_module_handlers(module_name: str) -> int

Unregister all handlers from a specific module.

Parameters:

Name Type Description Default
module_name str

The module name (e.g., "chat", "alerts")

required

Returns:

Type Description
int

Number of handlers removed

Source code in src/starstreamer/core/di/registry.py
def unregister_module_handlers(self, module_name: str) -> int:
    """
    Unregister all handlers from a specific module.

    Args:
        module_name: The module name (e.g., "chat", "alerts")

    Returns:
        Number of handlers removed
    """
    handlers_to_remove = self.handlers_by_module.get(module_name, [])
    removed_count = 0

    for event_type, handler in handlers_to_remove:
        if self.unregister(event_type, handler):
            removed_count += 1

    # Clear the module tracking
    if module_name in self.handlers_by_module:
        del self.handlers_by_module[module_name]

    logger.info(f"Unregistered {removed_count} handlers from module '{module_name}'")
    return removed_count

validate_dependencies

validate_dependencies() -> list[str]

Validate that all handler dependencies can be resolved.

Returns:

Type Description
list[str]

List of error messages for missing dependencies

Source code in src/starstreamer/core/di/registry.py
def validate_dependencies(self) -> list[str]:
    """
    Validate that all handler dependencies can be resolved.

    Returns:
        List of error messages for missing dependencies
    """
    errors = []

    for event_type, handlers in self.handlers.items():
        for handler_record in handlers:
            if handler_record["style"] == HandlerStyle.EXPLICIT:
                dependencies = handler_record.get("dependencies", [])
                for dep_type in dependencies:
                    if not self.container.is_registered(dep_type):
                        errors.append(
                            f"Handler {handler_record['handler_name']} for event "
                            f"{event_type} requires unregistered dependency "
                            f"{dep_type.__name__}"
                        )

    return errors

The HandlerRegistry manages event handler registration and creates dependency injection wrappers.

Constructor

def __init__(self, container: ServiceContainer) -> None:
    """Initialize registry with a service container"""

Handler Registration

register(event_type: str, handler: Callable, filters: list = None, priority: int = 5) -> None

Register a handler with automatic dependency injection.

Parameters: - event_type: The event type to handle (e.g., "twitch.chat.message") - handler: The handler function (legacy or explicit style) - filters: Optional filter functions - priority: Handler priority (lower = higher priority)

Handler Styles:

Legacy Style (maintained for compatibility):

async def legacy_handler(event, ctx):
    """Old style handler with context object"""
    await ctx.twitch.send_message("Hello from legacy handler!")

Explicit Style (recommended):

async def explicit_handler(event: Event, twitch: TwitchClient, logger: logging.Logger):
    """New style with explicit dependency injection"""
    user = event.data.get("user", {})
    username = user.get("display_name", "Unknown")

    await twitch.send_message(f"Hello {username}!")
    logger.info(f"Greeted {username}")

Handler Management

get_handlers(event_type: str) -> list[dict]

Get all registered handlers for an event type.

handlers = registry.get_handlers("twitch.chat.message")
for handler_info in handlers:
    print(f"Handler: {handler_info['handler_name']}")
    print(f"Style: {handler_info['style']}")
    print(f"Dependencies: {handler_info['dependencies']}")
unregister(event_type: str, handler: Callable) -> bool

Unregister a handler.

success = registry.unregister("twitch.chat.message", my_handler)
if success:
    print("Handler unregistered")
clear_handlers(event_type: str = None) -> None

Clear handlers for a specific event type or all event types.

registry.clear_handlers("twitch.chat.message")  # Clear specific event
registry.clear_handlers()  # Clear all handlers

Statistics and Validation

get_stats() -> dict

Get statistics about registered handlers.

stats = registry.get_stats()
print(f"Total handlers: {stats['total_handlers']}")
print(f"Event types: {stats['event_types']}")
print(f"Style distribution: {stats['style_distribution']}")
print(f"Registered services: {stats['registered_services']}")
validate_dependencies() -> list[str]

Validate that all handler dependencies can be resolved.

errors = registry.validate_dependencies()
if errors:
    for error in errors:
        print(f"Dependency error: {error}")

HandlerInspector

HandlerInspector

Utility class for inspecting handler function signatures to determine their style and extract dependency information.

get_dependencies staticmethod

get_dependencies(handler: Callable[..., Any]) -> list[type]

Extract dependency types from an explicit-style handler.

Parameters:

Name Type Description Default
handler Callable[..., Any]

The handler function to inspect

required

Returns:

Type Description
list[type]

List of parameter types (excluding the first 'event' parameter)

Example

async def handler(event: Event, twitch: TwitchClient, logger: Logger): pass get_dependencies(handler) -> [TwitchClient, Logger]

Source code in src/starstreamer/core/di/inspector.py
@staticmethod
def get_dependencies(handler: Callable[..., Any]) -> list[type]:
    """
    Extract dependency types from an explicit-style handler.

    Args:
        handler: The handler function to inspect

    Returns:
        List of parameter types (excluding the first 'event' parameter)

    Example:
        async def handler(event: Event, twitch: TwitchClient, logger: Logger):
            pass
        get_dependencies(handler) -> [TwitchClient, Logger]
    """
    dependencies = []

    try:
        # Use get_type_hints to resolve string annotations from __future__ import annotations
        hints = get_type_hints(handler)

        # Get parameter names (skip first 'event' parameter)
        sig = inspect.signature(handler)
        param_names = list(sig.parameters.keys())[1:]  # Skip 'event'

        # Extract types for non-event parameters
        for param_name in param_names:
            if param_name in hints:
                param_type = hints[param_name]

                # Skip Any types
                if param_type is Any:
                    continue

                # Handle generic types (e.g., Optional[TwitchClient] or X | None)
                origin = get_origin(param_type)
                # Check for Union types (both old style Optional[X] and new style X | None)
                if origin is Union or (
                    hasattr(param_type, "__class__") and param_type.__class__.__name__ == "UnionType"
                ):
                    # For Union types (like Optional), get the first non-None type
                    args = get_args(param_type)
                    if args:
                        for arg in args:
                            if arg is not type(None):
                                dependencies.append(arg)
                                break
                else:
                    # Regular type annotation
                    dependencies.append(param_type)

    except Exception:
        # If get_type_hints fails, fallback to original method
        try:
            sig = inspect.signature(handler)
            params = list(sig.parameters.values())

            # Skip first parameter (event) and extract types from remaining parameters
            for param in params[1:]:
                param_type = param.annotation

                # Skip parameters without type annotations or with Any
                if param_type in (inspect.Parameter.empty, Any):
                    continue

                # Handle forward references and string annotations
                if isinstance(param_type, str):
                    # Skip string annotations if get_type_hints failed
                    continue

                # Handle generic types (e.g., Optional[TwitchClient])
                origin = get_origin(param_type)
                if origin is not None:
                    # For Union types (like Optional), get the first non-None type
                    args = get_args(param_type)
                    if args:
                        for arg in args:
                            if arg is not type(None):
                                dependencies.append(arg)
                                break
                else:
                    # Regular type annotation
                    dependencies.append(param_type)

        except Exception:
            # If all inspection fails, return empty list
            # Silently ignore inspection errors to avoid breaking handler registration
            return []

    return dependencies

get_parameter_names staticmethod

get_parameter_names(handler: Callable[..., Any]) -> list[str]

Get the parameter names of a handler function.

Parameters:

Name Type Description Default
handler Callable[..., Any]

The handler function to inspect

required

Returns:

Type Description
list[str]

List of parameter names

Example

async def handler(event, twitch, logger): pass get_parameter_names(handler) -> ['event', 'twitch', 'logger']

Source code in src/starstreamer/core/di/inspector.py
@staticmethod
def get_parameter_names(handler: Callable[..., Any]) -> list[str]:
    """
    Get the parameter names of a handler function.

    Args:
        handler: The handler function to inspect

    Returns:
        List of parameter names

    Example:
        async def handler(event, twitch, logger): pass
        get_parameter_names(handler) -> ['event', 'twitch', 'logger']
    """
    try:
        # Handle mock objects that shouldn't be treated as real handlers
        if hasattr(handler, "_mock_name") or str(type(handler).__name__) == "MagicMock":
            return []

        sig = inspect.signature(handler)
        return list(sig.parameters.keys())
    except Exception:
        return []

get_style staticmethod

get_style(handler: Callable[..., Any]) -> HandlerStyle

Determine the style of a handler function.

Parameters:

Name Type Description Default
handler Callable[..., Any]

The handler function to inspect

required

Returns:

Type Description
HandlerStyle

HandlerStyle.LEGACY if it's (event, ctx) style

HandlerStyle

HandlerStyle.EXPLICIT if it's explicit dependency style

Example

Legacy style

async def old_handler(event, ctx): ... get_style(old_handler) -> HandlerStyle.LEGACY

Explicit style

async def new_handler(event, twitch: TwitchClient): ... get_style(new_handler) -> HandlerStyle.EXPLICIT

Source code in src/starstreamer/core/di/inspector.py
@staticmethod
def get_style(handler: Callable[..., Any]) -> HandlerStyle:
    """
    Determine the style of a handler function.

    Args:
        handler: The handler function to inspect

    Returns:
        HandlerStyle.LEGACY if it's (event, ctx) style
        HandlerStyle.EXPLICIT if it's explicit dependency style

    Example:
        # Legacy style
        async def old_handler(event, ctx): ...
        get_style(old_handler) -> HandlerStyle.LEGACY

        # Explicit style
        async def new_handler(event, twitch: TwitchClient): ...
        get_style(new_handler) -> HandlerStyle.EXPLICIT
    """
    try:
        sig = inspect.signature(handler)
        params = list(sig.parameters.values())

        # Must have at least event parameter
        if len(params) < 1:
            return HandlerStyle.EXPLICIT

        # Check for legacy pattern: exactly 2 params where second is 'ctx' or 'context'
        if len(params) == 2:
            second_param = params[1]
            if second_param.name.lower() in ("ctx", "context"):
                return HandlerStyle.LEGACY

        # Everything else is considered explicit style
        return HandlerStyle.EXPLICIT

    except Exception:
        # If we can't inspect, assume explicit style
        return HandlerStyle.EXPLICIT

inspect_handler classmethod

inspect_handler(handler: Callable[..., Any]) -> dict[str, Any]

Perform a complete inspection of a handler function.

Parameters:

Name Type Description Default
handler Callable[..., Any]

The handler function to inspect

required

Returns:

Type Description
dict[str, Any]

Dictionary containing all inspection results

Example

info = HandlerInspector.inspect_handler(my_handler) print(f"Style: {info['style']}") print(f"Dependencies: {info['dependencies']}")

Source code in src/starstreamer/core/di/inspector.py
@classmethod
def inspect_handler(cls, handler: Callable[..., Any]) -> dict[str, Any]:
    """
    Perform a complete inspection of a handler function.

    Args:
        handler: The handler function to inspect

    Returns:
        Dictionary containing all inspection results

    Example:
        info = HandlerInspector.inspect_handler(my_handler)
        print(f"Style: {info['style']}")
        print(f"Dependencies: {info['dependencies']}")
    """
    style = cls.get_style(handler)
    dependencies = cls.get_dependencies(handler) if style == HandlerStyle.EXPLICIT else []
    param_names = cls.get_parameter_names(handler)
    is_async = cls.is_async_handler(handler)
    is_valid, validation_message = cls.validate_handler(handler)

    return {
        "style": style,
        "dependencies": dependencies,
        "parameter_names": param_names,
        "is_async": is_async,
        "is_valid": is_valid,
        "validation_message": validation_message,
        "handler_name": getattr(handler, "__name__", "unknown"),
    }

is_async_handler staticmethod

is_async_handler(handler: Callable[..., Any]) -> bool

Check if a handler is an async function.

Parameters:

Name Type Description Default
handler Callable[..., Any]

The handler function to inspect

required

Returns:

Type Description
bool

True if the handler is async, False otherwise

Source code in src/starstreamer/core/di/inspector.py
@staticmethod
def is_async_handler(handler: Callable[..., Any]) -> bool:
    """
    Check if a handler is an async function.

    Args:
        handler: The handler function to inspect

    Returns:
        True if the handler is async, False otherwise
    """
    return inspect.iscoroutinefunction(handler)

validate_handler staticmethod

validate_handler(handler: Callable[..., Any]) -> tuple[bool, str]

Validate that a handler has a valid signature.

Parameters:

Name Type Description Default
handler Callable[..., Any]

The handler function to validate

required

Returns:

Type Description
tuple[bool, str]

Tuple of (is_valid, error_message)

Example

is_valid, error = validate_handler(my_handler) if not is_valid: print(f"Invalid handler: {error}")

Source code in src/starstreamer/core/di/inspector.py
@staticmethod
def validate_handler(handler: Callable[..., Any]) -> tuple[bool, str]:
    """
    Validate that a handler has a valid signature.

    Args:
        handler: The handler function to validate

    Returns:
        Tuple of (is_valid, error_message)

    Example:
        is_valid, error = validate_handler(my_handler)
        if not is_valid:
            print(f"Invalid handler: {error}")
    """
    try:
        # Must be callable
        if not callable(handler):
            return False, "Handler must be callable"

        # Get signature
        sig = inspect.signature(handler)
        params = list(sig.parameters.values())

        # Must have at least one parameter (event)
        if len(params) < 1:
            return False, "Handler must accept at least one parameter (event)"

        # Check if it's async (recommended but not required)
        if not inspect.iscoroutinefunction(handler):
            return True, "Warning: Handler is not async (will be wrapped)"

        return True, ""

    except Exception as e:
        return False, f"Failed to inspect handler signature: {e}"

The HandlerInspector analyzes handler function signatures to determine their style and dependencies.

Handler Style Detection

get_style(handler: Callable) -> HandlerStyle

Determine if a handler uses legacy or explicit style.

from starstreamer.core.di.inspector import HandlerInspector, HandlerStyle

async def legacy_handler(event, ctx): pass
async def explicit_handler(event: Event, twitch: TwitchClient): pass

assert HandlerInspector.get_style(legacy_handler) == HandlerStyle.LEGACY
assert HandlerInspector.get_style(explicit_handler) == HandlerStyle.EXPLICIT

Dependency Analysis

get_dependencies(handler: Callable) -> list[type]

Extract dependency types from explicit-style handlers.

async def handler(event: Event, twitch: TwitchClient, logger: logging.Logger):
    pass

dependencies = HandlerInspector.get_dependencies(handler)
# Returns: [TwitchClient, logging.Logger]

Advanced Type Handling:

from typing import Optional

async def advanced_handler(
    event: Event,
    twitch: TwitchClient,
    optional_service: Optional[MyService],  # Handles Optional types
    logger: logging.Logger
):
    pass

dependencies = HandlerInspector.get_dependencies(advanced_handler)
# Returns: [TwitchClient, MyService, logging.Logger]

Handler Validation

validate_handler(handler: Callable) -> tuple[bool, str]

Validate that a handler has a correct signature.

is_valid, error_message = HandlerInspector.validate_handler(my_handler)
if not is_valid:
    print(f"Invalid handler: {error_message}")
inspect_handler(handler: Callable) -> dict

Perform complete handler inspection.

info = HandlerInspector.inspect_handler(my_handler)
print(f"Style: {info['style']}")
print(f"Dependencies: {info['dependencies']}")
print(f"Parameter names: {info['parameter_names']}")
print(f"Is async: {info['is_async']}")
print(f"Is valid: {info['is_valid']}")

Setup Functions

StarStreamer provides helper functions for easy dependency injection setup.

setup_dependency_injection() -> tuple[ServiceContainer, HandlerRegistry]

setup_dependency_injection

setup_dependency_injection() -> tuple[Any, Any]

Set up dependency injection for the event system.

Returns:

Type Description
tuple[Any, Any]

Tuple of (ServiceContainer, HandlerRegistry) configured for DI

Example

container, registry = setup_dependency_injection()

Register services

container.register_singleton(TwitchClient, twitch_client) container.register_singleton(Logger, logger)

Set up EventBus with DI

event_bus = get_event_bus() event_bus.set_handler_registry(registry)

Source code in src/starstreamer/core/decorators.py
def setup_dependency_injection() -> tuple[Any, Any]:
    """
    Set up dependency injection for the event system.

    Returns:
        Tuple of (ServiceContainer, HandlerRegistry) configured for DI

    Example:
        container, registry = setup_dependency_injection()

        # Register services
        container.register_singleton(TwitchClient, twitch_client)
        container.register_singleton(Logger, logger)

        # Set up EventBus with DI
        event_bus = get_event_bus()
        event_bus.set_handler_registry(registry)
    """
    from starstreamer.core.di.container import ServiceContainer
    from starstreamer.core.di.registry import HandlerRegistry

    container = ServiceContainer()
    registry = HandlerRegistry(container)

    return container, registry

Set up a complete dependency injection system.

from starstreamer.core.decorators import setup_dependency_injection

# Create DI system
container, registry = setup_dependency_injection()

# Register services
container.register_singleton(TwitchClient, twitch_client)
container.register_singleton(logging.Logger, logger)

# Use with event bus
from starstreamer.core.event_bus import EventBus
event_bus = EventBus(registry)

configure_event_bus_with_di(container: ServiceContainer, registry: HandlerRegistry) -> None

configure_event_bus_with_di

configure_event_bus_with_di(_container: Any, registry: Any) -> None

Configure the global event bus to use dependency injection.

Parameters:

Name Type Description Default
_container Any

ServiceContainer instance (unused, kept for API compatibility)

required
registry Any

HandlerRegistry instance

required

This updates the global event bus to use the dependency injection system for all future handler registrations.

Source code in src/starstreamer/core/decorators.py
def configure_event_bus_with_di(_container: Any, registry: Any) -> None:
    """
    Configure the global event bus to use dependency injection.

    Args:
        _container: ServiceContainer instance (unused, kept for API compatibility)
        registry: HandlerRegistry instance

    This updates the global event bus to use the dependency injection
    system for all future handler registrations.
    """
    event_bus = get_event_bus()
    if event_bus:
        event_bus.set_handler_registry(registry)
        logger.info("EventBus configured with dependency injection support")
    else:
        logger.warning("No EventBus available to configure with DI")

Configure the global event bus to use dependency injection.

from starstreamer.core.decorators import (
    setup_dependency_injection, 
    configure_event_bus_with_di
)

container, registry = setup_dependency_injection()
configure_event_bus_with_di(container, registry)

# Now all @on_event handlers will use DI automatically

register_common_services(container: ServiceContainer) -> None

register_common_services

register_common_services(container: Any) -> None

Register commonly used services in the container.

Parameters:

Name Type Description Default
container Any

ServiceContainer to register services in

required

This is a convenience function that registers standard services that most handlers will need. Call this after setting up DI.

Source code in src/starstreamer/core/decorators.py
def register_common_services(container: Any) -> None:
    """
    Register commonly used services in the container.

    Args:
        container: ServiceContainer to register services in

    This is a convenience function that registers standard services
    that most handlers will need. Call this after setting up DI.
    """
    import logging

    # Register a default logger
    default_logger = logging.getLogger("starstreamer")
    container.register_singleton(logging.Logger, default_logger)

Register commonly used services.

from starstreamer.core.decorators import register_common_services

container, registry = setup_dependency_injection()
register_common_services(container)  # Registers default logger

# Add your platform services
container.register_singleton(TwitchClient, twitch_client)

create_di_event_bus() -> EventBus

create_di_event_bus

create_di_event_bus() -> Any

Create a new EventBus instance with dependency injection configured.

Returns:

Type Description
Any

EventBus instance with DI support enabled

Example

Create DI-enabled event bus

event_bus = create_di_event_bus()

Get the container to register services

container = event_bus.handler_registry.container container.register_singleton(MyService, service_instance)

Now handlers can use explicit dependency injection

Source code in src/starstreamer/core/decorators.py
def create_di_event_bus() -> Any:
    """
    Create a new EventBus instance with dependency injection configured.

    Returns:
        EventBus instance with DI support enabled

    Example:
        # Create DI-enabled event bus
        event_bus = create_di_event_bus()

        # Get the container to register services
        container = event_bus.handler_registry.container
        container.register_singleton(MyService, service_instance)

        # Now handlers can use explicit dependency injection
    """
    from starstreamer.core.event_bus import EventBus

    container, registry = setup_dependency_injection()
    register_common_services(container)

    return EventBus(registry)

Create an EventBus with dependency injection pre-configured.

from starstreamer.core.decorators import create_di_event_bus

# Get fully configured event bus
event_bus = create_di_event_bus()

# Access container to register services
container = event_bus.handler_registry.container
container.register_singleton(TwitchClient, twitch_client)

Complete Setup Example

Here's a complete example of setting up dependency injection in a StarStreamer application:

Main Application Setup

import asyncio
import logging
from starstreamer.core.decorators import setup_dependency_injection, configure_event_bus_with_di
from starstreamer.plugins.twitch import TwitchClient
from starstreamer.services.economy import EconomyService
from starstreamer.services.users import UserService

async def main():
    # Setup dependency injection
    container, registry = setup_dependency_injection()

    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger("starstreamer")

    # Create and register services
    twitch_client = TwitchClient()  # Uses config.yaml and database settings

    economy_service = EconomyService()
    user_service = UserService()

    # Register services in container
    container.register_singleton(TwitchClient, twitch_client)
    container.register_singleton(EconomyService, economy_service)
    container.register_singleton(UserService, user_service)
    container.register_singleton(logging.Logger, logger)

    # Configure global event bus
    configure_event_bus_with_di(container, registry)

    # Validate all dependencies
    errors = registry.validate_dependencies()
    if errors:
        for error in errors:
            logger.error(f"Dependency error: {error}")
        return

    # Import modules to register handlers
    import modules.chat.actions.basic_commands
    import modules.rpg.actions.economy_commands

    # Start the application
    await twitch_client.connect()
    logger.info("StarStreamer started with dependency injection")

if __name__ == "__main__":
    asyncio.run(main())

Service Implementation

from starstreamer.core.di.container import ServiceContainer

class EconomyService:
    """Business logic service for economy features"""

    def __init__(self, db_connection=None):
        self.db = db_connection or get_default_db()

    async def get_balance(self, user_id: str) -> int:
        """Get user's currency balance"""
        # Database logic
        return await self.db.fetchval(
            "SELECT balance FROM users WHERE user_id = ?", 
            (user_id,)
        ) or 0

    async def add_currency(self, user_id: str, amount: int) -> int:
        """Add currency to user's balance"""
        # Transaction logic
        pass

# Register with factory for database connection
def create_economy_service():
    db = DatabaseConnection("sqlite:///economy.db")
    return EconomyService(db)

container.register_factory(EconomyService, create_economy_service)

Handler Implementation

from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger, CooldownTrigger
from starstreamer.plugins.twitch import TwitchClient
from starstreamer.plugins.elevenlabs import ElevenLabsClient
from starstreamer.services.economy import EconomyService
from starstreamer.runtime.types import Event
import logging

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!balance"))
async def balance_command(
    event: Event,
    twitch: TwitchClient,           # Injected service
    economy: EconomyService,        # Injected service  
    logger: logging.Logger          # Injected service
) -> None:
    """Check user's currency balance"""
    user = event.data.get("user", {})
    user_id = user.get("id", "")
    username = user.get("display_name", "Unknown")

    if not user_id:
        await twitch.send_message(f"@{username} Unable to get your user ID")
        return

    # Use injected services
    balance = await economy.get_balance(user_id)
    await twitch.send_message(f"@{username} Your balance: {balance} coins")
    logger.info(f"Balance check: {username} has {balance} coins")

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!tts"))
async def tts_command(
    event: Event,
    elevenlabs: ElevenLabsClient,    # ElevenLabs TTS service
    twitch: TwitchClient,           # Twitch messaging service
    logger: logging.Logger          # Logging service
) -> None:
    """Text-to-speech command with multiple service injection"""
    message = event.data.get("message", "")
    parts = message.split(maxsplit=1)

    user = event.data.get("user", {})
    username = user.get("display_name", "Someone")

    if len(parts) < 2:
        await twitch.send_message(f"@{username} Usage: !tts <text to speak>")
        return

    text_to_speak = parts[1].strip()

    # Use injected ElevenLabs service
    voices = await elevenlabs.get_voices_as_objects()
    if not voices:
        await twitch.send_message(f"@{username} No voices available")
        return

    # Generate speech with first available voice
    audio_bytes = await elevenlabs.text_to_speech(text_to_speak, voice=voices[0])

    # Use injected Twitch and logging services
    await twitch.send_message(f"🔊 Generated TTS for {username}")
    logger.info(f"TTS generated: {text_to_speak[:50]}...")

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(300, per_user=True))  # 5 minute cooldown
async def work_command(
    event: Event,
    twitch: TwitchClient,
    economy: EconomyService,
    logger: logging.Logger
) -> None:
    """Work to earn currency with dependency injection"""
    user = event.data.get("user", {})
    user_id = user.get("id", "")
    username = user.get("display_name", "Unknown")

    # Generate work reward
    import random
    earned = random.randint(50, 150)

    # Use injected economy service
    new_balance = await economy.add_currency(user_id, earned)

    await twitch.send_message(
        f"@{username} You worked hard and earned {earned} coins! "
        f"Your new balance: {new_balance} coins"
    )
    logger.info(f"Work command: {username} earned {earned} coins")

Advanced Patterns

Custom Service Factory

class ConfigurableService:
    def __init__(self, config: dict):
        self.config = config

    def get_setting(self, key: str):
        return self.config.get(key)

# Register with custom factory
def create_configurable_service():
    config = load_config_from_file("settings.json")
    return ConfigurableService(config)

container.register_factory(ConfigurableService, create_configurable_service)

Service Composition

class CompositeService:
    def __init__(self, twitch: TwitchClient, economy: EconomyService):
        self.twitch = twitch
        self.economy = economy

    async def reward_user(self, user_id: str, amount: int):
        """Composite operation using multiple services"""
        new_balance = await self.economy.add_currency(user_id, amount)
        await self.twitch.send_message(f"Rewarded! New balance: {new_balance}")

# Register composite service (resolved dependencies automatically)
def create_composite_service():
    twitch = container.resolve(TwitchClient)
    economy = container.resolve(EconomyService)
    return CompositeService(twitch, economy)

container.register_factory(CompositeService, create_composite_service)

Testing with Dependency Injection

import pytest
from unittest.mock import Mock, AsyncMock

@pytest.fixture
def test_container():
    """Create container with mock services for testing"""
    container = ServiceContainer()

    # Mock services
    mock_twitch = Mock(spec=TwitchClient)
    mock_twitch.send_message = AsyncMock()

    mock_economy = Mock(spec=EconomyService)
    mock_economy.get_balance = AsyncMock(return_value=100)

    # Register mocks
    container.register_singleton(TwitchClient, mock_twitch)
    container.register_singleton(EconomyService, mock_economy)

    return container

@pytest.mark.asyncio
async def test_balance_command(test_container):
    """Test balance command with mocked dependencies"""
    from modules.rpg.actions.economy_commands import balance_command

    # Create test event
    event = Event(
        type="twitch.chat.message",
        data={"user": {"id": "123", "display_name": "TestUser"}},
        timestamp=time.time(),
        source="test"
    )

    # Resolve mocked services
    twitch = test_container.resolve(TwitchClient)
    economy = test_container.resolve(EconomyService)
    logger = Mock()

    # Call handler with injected dependencies
    await balance_command(event, twitch, economy, logger)

    # Verify interactions
    economy.get_balance.assert_called_once_with("123")
    twitch.send_message.assert_called_once_with("@TestUser Your balance: 100 coins")

See Also