Skip to content

Testing Guide

StarStreamer uses a comprehensive testing strategy to ensure reliability and maintainability. This guide covers testing patterns, tools, and best practices for both core development and plugin creation.

Testing Philosophy

StarStreamer follows these testing principles:

  • Test-Driven Development (TDD) - Write tests before implementation when possible
  • Comprehensive Coverage - Aim for high test coverage without sacrificing quality
  • Fast Feedback - Tests should run quickly to enable rapid development
  • Isolated Tests - Each test should be independent and predictable
  • Real-world Scenarios - Tests should reflect actual usage patterns

Testing Stack

Core Testing Tools

  • pytest - Primary testing framework with powerful fixtures and plugins
  • pytest-asyncio - Async/await support for testing async handlers
  • pytest-cov - Code coverage reporting and analysis
  • unittest.mock - Mocking external dependencies and services

Additional Tools

  • black - Code formatting for test files
  • ruff - Linting test code for quality
  • mypy - Type checking (optional in tests but recommended)

Project Structure

tests/
├── __init__.py
├── data/                        # Test data files
├── unit/                        # Fast, isolated unit tests
│   ├── core/                    # Core framework tests
│   ├── modules/                 # Module-specific tests
│   ├── services/                # Service layer tests
│   └── db/                      # Database tests
└── conftest.py                  # Global pytest configuration

Running Tests

Basic Test Execution

# Run all tests
uv run pytest

# Run specific test file
uv run pytest tests/unit/modules/chat/test_basic_commands.py

# Run specific test function
uv run pytest tests/unit/modules/chat/test_basic_commands.py::TestBasicCommands::test_hello_command

# Run tests with coverage
uv run pytest --cov=starstreamer

# Run only unit tests
uv run pytest tests/unit/

Test Configuration

pytest.ini or pyproject.toml:

[tool.pytest.ini_options]
minversion = "7.0"
testpaths = ["tests"]
pythonpath = ["."]
addopts = [
    "-ra",                      # Show test result summary
    "--strict-markers",         # Enforce marker registration
    "--cov=starstreamer",       # Coverage for main package
    "--cov-report=term-missing", # Show missing lines
    "--cov-report=html",        # Generate HTML coverage report
    "--cov-report=xml",         # Generate XML for CI
]

Unit Testing Patterns

Testing Event Handlers

Basic Handler Test:

import pytest
from unittest.mock import AsyncMock
from starstreamer.runtime.types import Event
from modules.chat.actions.basic_commands import hello_command

class TestChatCommands:
    @pytest.mark.asyncio
    async def test_hello_command(self):
        """Test hello command sends greeting with username"""
        # Arrange
        event = Event(
            type="twitch.chat.message",
            data={"user": {"display_name": "TestUser"}, "message": "!hello"},
            timestamp=1234567890.0,
            source="test"
        )
        twitch = AsyncMock()

        # Act
        await hello_command(event, twitch)

        # Assert
        twitch.send_message.assert_called_once_with(
            "Hello TestUser! 👋 Welcome to the stream!"
        )

Testing with Dependency Injection:

import pytest
import logging
from unittest.mock import AsyncMock, Mock
from starstreamer.runtime.types import Event
from modules.rpg.actions.economy_commands import balance_command

@pytest.mark.asyncio
async def test_balance_command_with_dependencies():
    """Test balance command with all injected dependencies"""
    # Arrange
    event = Event(
        type="twitch.chat.message",
        data={"user": {"id": "123", "display_name": "TestUser"}},
        timestamp=1234567890.0,
        source="test"
    )

    # Mock services
    twitch = AsyncMock()
    economy = AsyncMock()
    economy.get_balance.return_value = 500
    users = AsyncMock()
    logger = Mock(spec=logging.Logger)

    # Act
    await balance_command(event, twitch, economy, users, logger)

    # Assert
    economy.get_balance.assert_called_once_with("123")
    twitch.send_message.assert_called_once_with("@TestUser Your balance: 500 coins")
    logger.info.assert_called_once()

Testing with Edge Cases

@pytest.mark.asyncio
async def test_hello_command_missing_display_name():
    """Test hello command handles missing display_name gracefully"""
    # Arrange - no display_name in user data
    event = Event(
        type="twitch.chat.message",
        data={"user": {"username": "testuser"}, "message": "!hello"},
        timestamp=1234567890.0,
        source="test"
    )
    twitch = AsyncMock()

    # Act
    await hello_command(event, twitch)

    # Assert - should fallback to username
    twitch.send_message.assert_called_once_with(
        "Hello testuser! 👋 Welcome to the stream!"
    )

@pytest.mark.asyncio
async def test_hello_command_empty_user_data():
    """Test hello command handles completely missing user data"""
    # Arrange
    event = Event(
        type="twitch.chat.message",
        data={"message": "!hello"},  # No user data at all
        timestamp=1234567890.0,
        source="test"
    )
    twitch = AsyncMock()

    # Act
    await hello_command(event, twitch)

    # Assert - should use fallback
    twitch.send_message.assert_called_once_with(
        "Hello friend! 👋 Welcome to the stream!"
    )

Testing Error Handling

@pytest.mark.asyncio
async def test_command_handles_api_failure():
    """Test command gracefully handles external API failures"""
    # Arrange
    event = Event(
        type="twitch.chat.message",
        data={"user": {"display_name": "TestUser"}},
        timestamp=1234567890.0,
        source="test"
    )
    twitch = AsyncMock()
    api_service = AsyncMock()
    api_service.get_data.side_effect = Exception("API is down")
    logger = Mock(spec=logging.Logger)

    # Act
    await api_dependent_command(event, twitch, api_service, logger)

    # Assert
    logger.error.assert_called_once()
    twitch.send_message.assert_called_with("Sorry, service is temporarily unavailable!")

Integration Testing

Testing Complete Event Flow

import pytest
import asyncio
from starstreamer.core.decorators import create_di_event_bus
from starstreamer.runtime.types import Event

@pytest.mark.asyncio
async def test_complete_chat_command_flow():
    """Test end-to-end chat command processing"""
    # Arrange
    event_bus = create_di_event_bus()
    container = event_bus.handler_registry.container

    # Setup mock services
    mock_twitch = AsyncMock()
    container.register_singleton(TwitchClient, mock_twitch)

    # Import modules to register handlers
    import modules.chat.actions.basic_commands  # noqa: F401

    await event_bus.start()

    # Act - Emit a chat message event
    await event_bus.emit("twitch.chat.message", {
        "user": {"display_name": "TestUser"},
        "message": "!hello"
    })

    # Allow processing
    await asyncio.sleep(0.1)

    # Assert
    mock_twitch.send_message.assert_called_with(
        "Hello TestUser! 👋 Welcome to the stream!"
    )

    await event_bus.stop()

Testing Module Loading

@pytest.mark.asyncio
async def test_chat_module_registration():
    """Test that chat module registers all handlers correctly"""
    # Arrange
    from modules.chat.module import ChatModule
    from modules.registry import ModuleRegistry
    from starstreamer.db.database import Database

    db = Database(":memory:")  # In-memory database for testing
    registry = ModuleRegistry(db)
    module = ChatModule()

    # Act
    await module.on_load()
    await module.register_actions()
    await module.on_enable()

    # Assert
    assert module.module_name == "chat"
    assert module.enabled is True

    # Cleanup
    await module.on_disable()
    await module.on_unload()

Testing Dependency Injection

Testing Service Container

import pytest
from starstreamer.core.di.container import ServiceContainer

class TestServiceContainer:
    def test_register_and_resolve_singleton(self):
        """Test singleton service registration and resolution"""
        # Arrange
        container = ServiceContainer()
        service_instance = object()

        # Act
        container.register_singleton(object, service_instance)
        resolved = container.resolve(object)

        # Assert
        assert resolved is service_instance

    def test_register_and_resolve_factory(self):
        """Test factory service registration and resolution"""
        # Arrange
        container = ServiceContainer()
        created_instances = []

        def factory():
            instance = object()
            created_instances.append(instance)
            return instance

        # Act
        container.register_factory(object, factory)
        resolved1 = container.resolve(object)
        resolved2 = container.resolve(object)

        # Assert
        assert len(created_instances) == 2
        assert resolved1 is not resolved2  # Factory creates new instances

    def test_resolve_unregistered_service_raises_error(self):
        """Test that resolving unregistered service raises KeyError"""
        # Arrange
        container = ServiceContainer()

        # Act & Assert
        with pytest.raises(KeyError, match="Service type.*is not registered"):
            container.resolve(object)

Testing Handler Registry

import pytest
from unittest.mock import AsyncMock, Mock
from starstreamer.core.di.container import ServiceContainer
from starstreamer.core.di.registry import HandlerRegistry
from starstreamer.runtime.types import Event

@pytest.mark.asyncio
async def test_explicit_dependency_injection():
    """Test that explicit DI handlers get correct dependencies"""
    # Arrange
    container = ServiceContainer()
    registry = HandlerRegistry(container)

    # Register mock services
    mock_service = Mock()
    container.register_singleton(type(mock_service), mock_service)

    # Create handler that expects the service
    received_service = None

    async def test_handler(event: Event, service: type(mock_service)):
        nonlocal received_service
        received_service = service

    # Register handler
    registry.register("test.event", test_handler)

    # Act - Get and call the wrapped handler
    handlers = registry.get_handlers("test.event")
    wrapped_handler = handlers[0]["handler"]

    test_event = Event("test.event", {}, 1234567890.0, "test")
    await wrapped_handler(test_event, None)  # Context not used in explicit style

    # Assert
    assert received_service is mock_service

Testing Database Operations

Testing with In-Memory Database

import pytest
from starstreamer.db.database import Database
from starstreamer.services.users import UserService

@pytest.fixture
async def test_db():
    """Create in-memory database for testing"""
    db = Database(":memory:")
    await db.initialize()
    yield db
    await db.close()

@pytest.fixture
async def user_service(test_db):
    """Create user service with test database"""
    service = UserService(test_db)
    await service.initialize()
    return service

@pytest.mark.asyncio
async def test_user_creation(user_service):
    """Test creating a new user"""
    # Act
    user = await user_service.get_or_create_user("123", "TestUser")

    # Assert
    assert user.user_id == "123"
    assert user.username == "TestUser"
    assert user.balance == 0  # Default balance

@pytest.mark.asyncio
async def test_user_balance_operations(user_service):
    """Test user balance operations"""
    # Arrange
    user = await user_service.get_or_create_user("123", "TestUser")

    # Act
    await user_service.add_balance(user.user_id, 100)
    updated_user = await user_service.get_user(user.user_id)

    # Assert
    assert updated_user.balance == 100

Testing Migrations

@pytest.mark.asyncio
async def test_database_migration():
    """Test that database migrations work correctly"""
    # Arrange
    db = Database(":memory:")

    # Act
    await db.initialize()
    await db.run_migrations()

    # Assert - Check that tables exist
    tables = await db.fetchall("SELECT name FROM sqlite_master WHERE type='table'")
    table_names = [row[0] for row in tables]

    assert "users" in table_names
    assert "events" in table_names
    assert "modules" in table_names

Testing Triggers and Filters

Testing Command Triggers

import pytest
from starstreamer.triggers import CommandTrigger
from starstreamer.runtime.types import Event

class TestCommandTrigger:
    def test_command_trigger_matches(self):
        """Test that command trigger matches correct commands"""
        # Arrange
        trigger = CommandTrigger("!hello")
        event = Event(
            "twitch.chat.message",
            {"message": "!hello world"},
            1234567890.0,
            "test"
        )

        # Act
        result = trigger.should_trigger(event)

        # Assert
        assert result.should_trigger is True

    def test_command_trigger_case_insensitive(self):
        """Test case insensitive command matching"""
        # Arrange
        trigger = CommandTrigger("!hello", case_sensitive=False)
        event = Event(
            "twitch.chat.message",
            {"message": "!HELLO world"},
            1234567890.0,
            "test"
        )

        # Act
        result = trigger.should_trigger(event)

        # Assert
        assert result.should_trigger is True

    def test_command_trigger_no_match(self):
        """Test that command trigger doesn't match incorrect commands"""
        # Arrange
        trigger = CommandTrigger("!hello")
        event = Event(
            "twitch.chat.message",
            {"message": "!goodbye world"},
            1234567890.0,
            "test"
        )

        # Act
        result = trigger.should_trigger(event)

        # Assert
        assert result.should_trigger is False

Testing Cooldown Triggers

import pytest
import time
from starstreamer.triggers import CooldownTrigger
from starstreamer.runtime.types import Event

@pytest.mark.asyncio
async def test_cooldown_trigger_blocks_rapid_execution():
    """Test that cooldown trigger prevents rapid re-execution"""
    # Arrange
    trigger = CooldownTrigger(cooldown_seconds=60, per_user=True)
    event = Event(
        "twitch.chat.message",
        {"user": {"id": "123"}},
        time.time(),
        "test"
    )

    # Act & Assert
    first_result = trigger.should_trigger(event)
    assert first_result.should_trigger is True

    # Immediate second attempt should be blocked
    second_result = trigger.should_trigger(event)
    assert second_result.should_trigger is False

@pytest.mark.asyncio
async def test_cooldown_trigger_different_users():
    """Test that per-user cooldowns don't affect other users"""
    # Arrange
    trigger = CooldownTrigger(cooldown_seconds=60, per_user=True)

    event1 = Event("test", {"user": {"id": "123"}}, time.time(), "test")
    event2 = Event("test", {"user": {"id": "456"}}, time.time(), "test")

    # Act
    result1 = trigger.should_trigger(event1)
    result2 = trigger.should_trigger(event2)

    # Assert
    assert result1.should_trigger is True
    assert result2.should_trigger is True  # Different user, should work

Testing Async Operations

Testing Event Bus Operations

@pytest.mark.asyncio
async def test_event_bus_concurrent_handlers():
    """Test that multiple handlers run concurrently"""
    # Arrange
    event_bus = create_test_event_bus()
    execution_order = []

    async def slow_handler(event, ctx):
        execution_order.append("slow_start")
        await asyncio.sleep(0.1)
        execution_order.append("slow_end")

    async def fast_handler(event, ctx):
        execution_order.append("fast")

    event_bus.register_handler("test.event", slow_handler)
    event_bus.register_handler("test.event", fast_handler)

    await event_bus.start()

    # Act
    await event_bus.emit("test.event", {})
    await asyncio.sleep(0.2)  # Wait for completion

    # Assert - Fast handler should complete before slow handler
    assert "fast" in execution_order
    assert "slow_start" in execution_order
    assert "slow_end" in execution_order
    assert execution_order.index("fast") < execution_order.index("slow_end")

    await event_bus.stop()

Testing External API Calls

import pytest
import aiohttp
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_external_api_success():
    """Test successful external API call"""
    # Arrange
    mock_response = AsyncMock()
    mock_response.status = 200
    mock_response.json.return_value = {"result": "success"}

    mock_session = AsyncMock()
    mock_session.get.return_value.__aenter__.return_value = mock_response

    # Act
    with patch('aiohttp.ClientSession', return_value=mock_session):
        from my_plugin.services.api_client import ExternalAPIClient
        client = ExternalAPIClient("test-key")
        result = await client.fetch_data()

    # Assert
    assert result == {"result": "success"}
    mock_session.get.assert_called_once()

@pytest.mark.asyncio
async def test_external_api_failure():
    """Test external API failure handling"""
    # Arrange
    mock_session = AsyncMock()
    mock_session.get.side_effect = aiohttp.ClientError("Connection failed")

    # Act & Assert
    with patch('aiohttp.ClientSession', return_value=mock_session):
        from my_plugin.services.api_client import ExternalAPIClient
        client = ExternalAPIClient("test-key")

        with pytest.raises(aiohttp.ClientError):
            await client.fetch_data()

Test Fixtures and Utilities

Common Test Fixtures

conftest.py:

import pytest
import asyncio
from unittest.mock import AsyncMock, Mock
from starstreamer.core.di.container import ServiceContainer
from starstreamer.core.di.registry import HandlerRegistry
from starstreamer.core.event_bus import EventBus

@pytest.fixture
def event_loop():
    """Create event loop for async tests"""
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()

@pytest.fixture
def mock_twitch_client():
    """Create mock Twitch client"""
    client = AsyncMock()
    client.send_message = AsyncMock()
    client.send_announcement = AsyncMock()
    return client

@pytest.fixture
def mock_logger():
    """Create mock logger"""
    logger = Mock()
    logger.info = Mock()
    logger.error = Mock()
    logger.warning = Mock()
    return logger

@pytest.fixture
def test_container(mock_twitch_client, mock_logger):
    """Create service container with mock services"""
    container = ServiceContainer()
    container.register_singleton(TwitchClient, mock_twitch_client)
    container.register_singleton(logging.Logger, mock_logger)
    return container

@pytest.fixture
def test_event_bus(test_container):
    """Create event bus with test services"""
    registry = HandlerRegistry(test_container)
    event_bus = EventBus(registry)
    return event_bus

@pytest.fixture
async def test_database():
    """Create in-memory test database"""
    from starstreamer.db.database import Database
    db = Database(":memory:")
    await db.initialize()
    yield db
    await db.close()

Event Factory Utilities

def create_chat_event(username="TestUser", message="test message", user_id="123"):
    """Factory function for creating chat events"""
    return Event(
        type="twitch.chat.message",
        data={
            "user": {
                "id": user_id,
                "display_name": username,
                "username": username.lower()
            },
            "message": message
        },
        timestamp=time.time(),
        source="test"
    )

def create_follow_event(username="TestFollower", user_id="456"):
    """Factory function for creating follow events"""
    return Event(
        type="twitch.follow",
        data={
            "user_id": user_id,
            "user_name": username,
            "followed_at": "2023-01-01T00:00:00Z"
        },
        timestamp=time.time(),
        source="test"
    )

def create_subscription_event(username="TestSub", tier="1000", user_id="789"):
    """Factory function for creating subscription events"""
    return Event(
        type="twitch.subscription",
        data={
            "user_id": user_id,
            "user_name": username,
            "tier": tier,
            "is_gift": False
        },
        timestamp=time.time(),
        source="test"
    )

Testing Best Practices

Test Organization

  1. One test class per module/feature
  2. Descriptive test names that explain what is being tested
  3. Arrange-Act-Assert pattern for test structure
  4. Group related tests using test classes
  5. Use fixtures for common setup and teardown

Mocking Guidelines

# ✅ Good: Mock external dependencies
@patch('external_api.fetch_data')
async def test_handler_with_api(mock_fetch):
    mock_fetch.return_value = {"status": "ok"}
    # Test logic here

# ✅ Good: Mock at the boundary
def test_service_with_database():
    mock_db = Mock()
    service = MyService(database=mock_db)
    # Test service logic without database

# ❌ Avoid: Over-mocking internal logic
def test_calculation():
    # Don't mock the calculation itself
    result = calculate_value(10, 20)
    assert result == 30

Assertion Best Practices

# ✅ Good: Specific assertions
assert twitch.send_message.call_count == 1
assert "Hello TestUser" in twitch.send_message.call_args[0][0]

# ✅ Good: Multiple specific assertions
twitch.send_message.assert_called_once_with("Expected message")
logger.info.assert_called_once()

# ❌ Avoid: Vague assertions
assert twitch.send_message.called  # Too general
assert len(result) > 0  # Not specific enough

Error Testing

# ✅ Test expected exceptions
with pytest.raises(ValueError, match="Invalid user ID"):
    await user_service.get_user("invalid")

# ✅ Test error handling behavior
@pytest.mark.asyncio
async def test_graceful_error_handling():
    # Arrange error condition
    api_service.get_data.side_effect = ConnectionError()

    # Act - should not raise
    await resilient_handler(event, twitch, api_service)

    # Assert error was handled gracefully
    twitch.send_message.assert_called_with("Service temporarily unavailable")

Continuous Integration

GitHub Actions Example

.github/workflows/test.yml:

name: Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.11", "3.12"]

    steps:
    - uses: actions/checkout@v4

    - name: Set up Python ${{ matrix.python-version }}
      uses: actions/setup-python@v4
      with:
        python-version: ${{ matrix.python-version }}

    - name: Install uv
      run: curl -LsSf https://astral.sh/uv/install.sh | sh

    - name: Install dependencies
      run: uv pip install -e ".[dev]"

    - name: Run tests
      run: uv run pytest --cov=starstreamer --cov-report=xml

    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

Pre-commit Hooks

.pre-commit-config.yaml:

repos:
  - repo: local
    hooks:
      - id: tests
        name: Run tests
        entry: uv run pytest tests/unit/
        language: system
        pass_filenames: false
        always_run: true

Performance Testing

Testing Handler Performance

import time
import pytest

@pytest.mark.asyncio
async def test_handler_performance():
    """Test that handler completes within acceptable time"""
    # Arrange
    start_time = time.time()

    # Act
    await my_handler(event, services)

    # Assert
    execution_time = time.time() - start_time
    assert execution_time < 0.1  # Should complete within 100ms

Load Testing

@pytest.mark.asyncio
async def test_event_bus_load():
    """Test event bus under load"""
    event_bus = create_test_event_bus()
    received_count = 0

    async def counter_handler(event, ctx):
        nonlocal received_count
        received_count += 1

    event_bus.register_handler("test.event", counter_handler)
    await event_bus.start()

    # Send many events rapidly
    for i in range(1000):
        await event_bus.emit("test.event", {"id": i})

    # Wait for processing
    await asyncio.sleep(1.0)

    # Assert all events were processed
    assert received_count == 1000

    await event_bus.stop()

Plugin Testing

Testing Plugin Lifecycle

@pytest.mark.asyncio
async def test_plugin_lifecycle():
    """Test complete plugin lifecycle"""
    from my_plugin import MyPlugin

    plugin = MyPlugin(config={"api_key": "test"})

    # Test loading
    await plugin.on_load()
    assert plugin.api_client is not None

    # Test action registration
    await plugin.register_actions()

    # Test enabling
    await plugin.on_enable()
    assert plugin.enabled is True

    # Test disabling
    await plugin.on_disable()
    assert plugin.enabled is False

    # Test unloading
    await plugin.on_unload()

Testing Plugin Commands

@pytest.mark.asyncio
async def test_plugin_command():
    """Test plugin-specific command"""
    from my_plugin.actions.commands import awesome_command

    event = create_chat_event(message="!awesome")
    twitch = AsyncMock()

    await awesome_command(event, twitch)

    assert "awesome" in twitch.send_message.call_args[0][0].lower()

Debugging Tests

Pytest Debugging

# Run with verbose output
uv run pytest -v

# Stop on first failure
uv run pytest -x

# Run specific test with output
uv run pytest -s tests/unit/test_specific.py::test_function

# Debug with pdb
uv run pytest --pdb tests/unit/test_specific.py::test_function

# Show local variables on failure
uv run pytest -l

Logging in Tests

import logging

def test_with_logging(caplog):
    """Test that captures log output"""
    with caplog.at_level(logging.INFO):
        my_function_that_logs()

    assert "Expected log message" in caplog.text
    assert len(caplog.records) == 1

Test Maintenance

Keeping Tests Updated

  1. Update tests when requirements change
  2. Remove obsolete tests for removed features
  3. Refactor tests to match code refactoring
  4. Add tests for bug fixes to prevent regression
  5. Review test coverage regularly and add missing tests

Test Documentation

class TestChatCommands:
    """
    Test suite for chat command handlers.

    Tests cover:
    - Basic command functionality
    - Error handling and edge cases
    - User permission validation
    - Service integration
    """

    def test_hello_command(self):
        """
        Test that hello command sends personalized greeting.

        Verifies:
        - Username extraction from event data
        - Fallback to generic greeting if no username
        - Correct message formatting
        """
        pass

This comprehensive testing guide provides the foundation for maintaining high-quality, reliable code in StarStreamer. Following these patterns will help ensure your modules and plugins work correctly and continue to work as the system evolves.

See Also