Testing Guide¶
StarStreamer uses a comprehensive testing strategy to ensure reliability and maintainability. This guide covers testing patterns, tools, and best practices for both core development and plugin creation.
Testing Philosophy¶
StarStreamer follows these testing principles:
- Test-Driven Development (TDD) - Write tests before implementation when possible
- Comprehensive Coverage - Aim for high test coverage without sacrificing quality
- Fast Feedback - Tests should run quickly to enable rapid development
- Isolated Tests - Each test should be independent and predictable
- Real-world Scenarios - Tests should reflect actual usage patterns
Testing Stack¶
Core Testing Tools¶
- pytest - Primary testing framework with powerful fixtures and plugins
- pytest-asyncio - Async/await support for testing async handlers
- pytest-cov - Code coverage reporting and analysis
- unittest.mock - Mocking external dependencies and services
Additional Tools¶
- black - Code formatting for test files
- ruff - Linting test code for quality
- mypy - Type checking (optional in tests but recommended)
Project Structure¶
tests/
├── __init__.py
├── data/ # Test data files
├── unit/ # Fast, isolated unit tests
│ ├── core/ # Core framework tests
│ ├── modules/ # Module-specific tests
│ ├── services/ # Service layer tests
│ └── db/ # Database tests
└── conftest.py # Global pytest configuration
Running Tests¶
Basic Test Execution¶
# Run all tests
uv run pytest
# Run specific test file
uv run pytest tests/unit/modules/chat/test_basic_commands.py
# Run specific test function
uv run pytest tests/unit/modules/chat/test_basic_commands.py::TestBasicCommands::test_hello_command
# Run tests with coverage
uv run pytest --cov=starstreamer
# Run only unit tests
uv run pytest tests/unit/
Test Configuration¶
pytest.ini or pyproject.toml:
[tool.pytest.ini_options]
minversion = "7.0"
testpaths = ["tests"]
pythonpath = ["."]
addopts = [
"-ra", # Show test result summary
"--strict-markers", # Enforce marker registration
"--cov=starstreamer", # Coverage for main package
"--cov-report=term-missing", # Show missing lines
"--cov-report=html", # Generate HTML coverage report
"--cov-report=xml", # Generate XML for CI
]
Unit Testing Patterns¶
Testing Event Handlers¶
Basic Handler Test:
import pytest
from unittest.mock import AsyncMock
from starstreamer.runtime.types import Event
from modules.chat.actions.basic_commands import hello_command
class TestChatCommands:
@pytest.mark.asyncio
async def test_hello_command(self):
"""Test hello command sends greeting with username"""
# Arrange
event = Event(
type="twitch.chat.message",
data={"user": {"display_name": "TestUser"}, "message": "!hello"},
timestamp=1234567890.0,
source="test"
)
twitch = AsyncMock()
# Act
await hello_command(event, twitch)
# Assert
twitch.send_message.assert_called_once_with(
"Hello TestUser! 👋 Welcome to the stream!"
)
Testing with Dependency Injection:
import pytest
import logging
from unittest.mock import AsyncMock, Mock
from starstreamer.runtime.types import Event
from modules.rpg.actions.economy_commands import balance_command
@pytest.mark.asyncio
async def test_balance_command_with_dependencies():
"""Test balance command with all injected dependencies"""
# Arrange
event = Event(
type="twitch.chat.message",
data={"user": {"id": "123", "display_name": "TestUser"}},
timestamp=1234567890.0,
source="test"
)
# Mock services
twitch = AsyncMock()
economy = AsyncMock()
economy.get_balance.return_value = 500
users = AsyncMock()
logger = Mock(spec=logging.Logger)
# Act
await balance_command(event, twitch, economy, users, logger)
# Assert
economy.get_balance.assert_called_once_with("123")
twitch.send_message.assert_called_once_with("@TestUser Your balance: 500 coins")
logger.info.assert_called_once()
Testing with Edge Cases¶
@pytest.mark.asyncio
async def test_hello_command_missing_display_name():
"""Test hello command handles missing display_name gracefully"""
# Arrange - no display_name in user data
event = Event(
type="twitch.chat.message",
data={"user": {"username": "testuser"}, "message": "!hello"},
timestamp=1234567890.0,
source="test"
)
twitch = AsyncMock()
# Act
await hello_command(event, twitch)
# Assert - should fallback to username
twitch.send_message.assert_called_once_with(
"Hello testuser! 👋 Welcome to the stream!"
)
@pytest.mark.asyncio
async def test_hello_command_empty_user_data():
"""Test hello command handles completely missing user data"""
# Arrange
event = Event(
type="twitch.chat.message",
data={"message": "!hello"}, # No user data at all
timestamp=1234567890.0,
source="test"
)
twitch = AsyncMock()
# Act
await hello_command(event, twitch)
# Assert - should use fallback
twitch.send_message.assert_called_once_with(
"Hello friend! 👋 Welcome to the stream!"
)
Testing Error Handling¶
@pytest.mark.asyncio
async def test_command_handles_api_failure():
"""Test command gracefully handles external API failures"""
# Arrange
event = Event(
type="twitch.chat.message",
data={"user": {"display_name": "TestUser"}},
timestamp=1234567890.0,
source="test"
)
twitch = AsyncMock()
api_service = AsyncMock()
api_service.get_data.side_effect = Exception("API is down")
logger = Mock(spec=logging.Logger)
# Act
await api_dependent_command(event, twitch, api_service, logger)
# Assert
logger.error.assert_called_once()
twitch.send_message.assert_called_with("Sorry, service is temporarily unavailable!")
Integration Testing¶
Testing Complete Event Flow¶
import pytest
import asyncio
from starstreamer.core.decorators import create_di_event_bus
from starstreamer.runtime.types import Event
@pytest.mark.asyncio
async def test_complete_chat_command_flow():
"""Test end-to-end chat command processing"""
# Arrange
event_bus = create_di_event_bus()
container = event_bus.handler_registry.container
# Setup mock services
mock_twitch = AsyncMock()
container.register_singleton(TwitchClient, mock_twitch)
# Import modules to register handlers
import modules.chat.actions.basic_commands # noqa: F401
await event_bus.start()
# Act - Emit a chat message event
await event_bus.emit("twitch.chat.message", {
"user": {"display_name": "TestUser"},
"message": "!hello"
})
# Allow processing
await asyncio.sleep(0.1)
# Assert
mock_twitch.send_message.assert_called_with(
"Hello TestUser! 👋 Welcome to the stream!"
)
await event_bus.stop()
Testing Module Loading¶
@pytest.mark.asyncio
async def test_chat_module_registration():
"""Test that chat module registers all handlers correctly"""
# Arrange
from modules.chat.module import ChatModule
from modules.registry import ModuleRegistry
from starstreamer.db.database import Database
db = Database(":memory:") # In-memory database for testing
registry = ModuleRegistry(db)
module = ChatModule()
# Act
await module.on_load()
await module.register_actions()
await module.on_enable()
# Assert
assert module.module_name == "chat"
assert module.enabled is True
# Cleanup
await module.on_disable()
await module.on_unload()
Testing Dependency Injection¶
Testing Service Container¶
import pytest
from starstreamer.core.di.container import ServiceContainer
class TestServiceContainer:
def test_register_and_resolve_singleton(self):
"""Test singleton service registration and resolution"""
# Arrange
container = ServiceContainer()
service_instance = object()
# Act
container.register_singleton(object, service_instance)
resolved = container.resolve(object)
# Assert
assert resolved is service_instance
def test_register_and_resolve_factory(self):
"""Test factory service registration and resolution"""
# Arrange
container = ServiceContainer()
created_instances = []
def factory():
instance = object()
created_instances.append(instance)
return instance
# Act
container.register_factory(object, factory)
resolved1 = container.resolve(object)
resolved2 = container.resolve(object)
# Assert
assert len(created_instances) == 2
assert resolved1 is not resolved2 # Factory creates new instances
def test_resolve_unregistered_service_raises_error(self):
"""Test that resolving unregistered service raises KeyError"""
# Arrange
container = ServiceContainer()
# Act & Assert
with pytest.raises(KeyError, match="Service type.*is not registered"):
container.resolve(object)
Testing Handler Registry¶
import pytest
from unittest.mock import AsyncMock, Mock
from starstreamer.core.di.container import ServiceContainer
from starstreamer.core.di.registry import HandlerRegistry
from starstreamer.runtime.types import Event
@pytest.mark.asyncio
async def test_explicit_dependency_injection():
"""Test that explicit DI handlers get correct dependencies"""
# Arrange
container = ServiceContainer()
registry = HandlerRegistry(container)
# Register mock services
mock_service = Mock()
container.register_singleton(type(mock_service), mock_service)
# Create handler that expects the service
received_service = None
async def test_handler(event: Event, service: type(mock_service)):
nonlocal received_service
received_service = service
# Register handler
registry.register("test.event", test_handler)
# Act - Get and call the wrapped handler
handlers = registry.get_handlers("test.event")
wrapped_handler = handlers[0]["handler"]
test_event = Event("test.event", {}, 1234567890.0, "test")
await wrapped_handler(test_event, None) # Context not used in explicit style
# Assert
assert received_service is mock_service
Testing Database Operations¶
Testing with In-Memory Database¶
import pytest
from starstreamer.db.database import Database
from starstreamer.services.users import UserService
@pytest.fixture
async def test_db():
"""Create in-memory database for testing"""
db = Database(":memory:")
await db.initialize()
yield db
await db.close()
@pytest.fixture
async def user_service(test_db):
"""Create user service with test database"""
service = UserService(test_db)
await service.initialize()
return service
@pytest.mark.asyncio
async def test_user_creation(user_service):
"""Test creating a new user"""
# Act
user = await user_service.get_or_create_user("123", "TestUser")
# Assert
assert user.user_id == "123"
assert user.username == "TestUser"
assert user.balance == 0 # Default balance
@pytest.mark.asyncio
async def test_user_balance_operations(user_service):
"""Test user balance operations"""
# Arrange
user = await user_service.get_or_create_user("123", "TestUser")
# Act
await user_service.add_balance(user.user_id, 100)
updated_user = await user_service.get_user(user.user_id)
# Assert
assert updated_user.balance == 100
Testing Migrations¶
@pytest.mark.asyncio
async def test_database_migration():
"""Test that database migrations work correctly"""
# Arrange
db = Database(":memory:")
# Act
await db.initialize()
await db.run_migrations()
# Assert - Check that tables exist
tables = await db.fetchall("SELECT name FROM sqlite_master WHERE type='table'")
table_names = [row[0] for row in tables]
assert "users" in table_names
assert "events" in table_names
assert "modules" in table_names
Testing Triggers and Filters¶
Testing Command Triggers¶
import pytest
from starstreamer.triggers import CommandTrigger
from starstreamer.runtime.types import Event
class TestCommandTrigger:
def test_command_trigger_matches(self):
"""Test that command trigger matches correct commands"""
# Arrange
trigger = CommandTrigger("!hello")
event = Event(
"twitch.chat.message",
{"message": "!hello world"},
1234567890.0,
"test"
)
# Act
result = trigger.should_trigger(event)
# Assert
assert result.should_trigger is True
def test_command_trigger_case_insensitive(self):
"""Test case insensitive command matching"""
# Arrange
trigger = CommandTrigger("!hello", case_sensitive=False)
event = Event(
"twitch.chat.message",
{"message": "!HELLO world"},
1234567890.0,
"test"
)
# Act
result = trigger.should_trigger(event)
# Assert
assert result.should_trigger is True
def test_command_trigger_no_match(self):
"""Test that command trigger doesn't match incorrect commands"""
# Arrange
trigger = CommandTrigger("!hello")
event = Event(
"twitch.chat.message",
{"message": "!goodbye world"},
1234567890.0,
"test"
)
# Act
result = trigger.should_trigger(event)
# Assert
assert result.should_trigger is False
Testing Cooldown Triggers¶
import pytest
import time
from starstreamer.triggers import CooldownTrigger
from starstreamer.runtime.types import Event
@pytest.mark.asyncio
async def test_cooldown_trigger_blocks_rapid_execution():
"""Test that cooldown trigger prevents rapid re-execution"""
# Arrange
trigger = CooldownTrigger(cooldown_seconds=60, per_user=True)
event = Event(
"twitch.chat.message",
{"user": {"id": "123"}},
time.time(),
"test"
)
# Act & Assert
first_result = trigger.should_trigger(event)
assert first_result.should_trigger is True
# Immediate second attempt should be blocked
second_result = trigger.should_trigger(event)
assert second_result.should_trigger is False
@pytest.mark.asyncio
async def test_cooldown_trigger_different_users():
"""Test that per-user cooldowns don't affect other users"""
# Arrange
trigger = CooldownTrigger(cooldown_seconds=60, per_user=True)
event1 = Event("test", {"user": {"id": "123"}}, time.time(), "test")
event2 = Event("test", {"user": {"id": "456"}}, time.time(), "test")
# Act
result1 = trigger.should_trigger(event1)
result2 = trigger.should_trigger(event2)
# Assert
assert result1.should_trigger is True
assert result2.should_trigger is True # Different user, should work
Testing Async Operations¶
Testing Event Bus Operations¶
@pytest.mark.asyncio
async def test_event_bus_concurrent_handlers():
"""Test that multiple handlers run concurrently"""
# Arrange
event_bus = create_test_event_bus()
execution_order = []
async def slow_handler(event, ctx):
execution_order.append("slow_start")
await asyncio.sleep(0.1)
execution_order.append("slow_end")
async def fast_handler(event, ctx):
execution_order.append("fast")
event_bus.register_handler("test.event", slow_handler)
event_bus.register_handler("test.event", fast_handler)
await event_bus.start()
# Act
await event_bus.emit("test.event", {})
await asyncio.sleep(0.2) # Wait for completion
# Assert - Fast handler should complete before slow handler
assert "fast" in execution_order
assert "slow_start" in execution_order
assert "slow_end" in execution_order
assert execution_order.index("fast") < execution_order.index("slow_end")
await event_bus.stop()
Testing External API Calls¶
import pytest
import aiohttp
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_external_api_success():
"""Test successful external API call"""
# Arrange
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json.return_value = {"result": "success"}
mock_session = AsyncMock()
mock_session.get.return_value.__aenter__.return_value = mock_response
# Act
with patch('aiohttp.ClientSession', return_value=mock_session):
from my_plugin.services.api_client import ExternalAPIClient
client = ExternalAPIClient("test-key")
result = await client.fetch_data()
# Assert
assert result == {"result": "success"}
mock_session.get.assert_called_once()
@pytest.mark.asyncio
async def test_external_api_failure():
"""Test external API failure handling"""
# Arrange
mock_session = AsyncMock()
mock_session.get.side_effect = aiohttp.ClientError("Connection failed")
# Act & Assert
with patch('aiohttp.ClientSession', return_value=mock_session):
from my_plugin.services.api_client import ExternalAPIClient
client = ExternalAPIClient("test-key")
with pytest.raises(aiohttp.ClientError):
await client.fetch_data()
Test Fixtures and Utilities¶
Common Test Fixtures¶
conftest.py:
import pytest
import asyncio
from unittest.mock import AsyncMock, Mock
from starstreamer.core.di.container import ServiceContainer
from starstreamer.core.di.registry import HandlerRegistry
from starstreamer.core.event_bus import EventBus
@pytest.fixture
def event_loop():
"""Create event loop for async tests"""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_twitch_client():
"""Create mock Twitch client"""
client = AsyncMock()
client.send_message = AsyncMock()
client.send_announcement = AsyncMock()
return client
@pytest.fixture
def mock_logger():
"""Create mock logger"""
logger = Mock()
logger.info = Mock()
logger.error = Mock()
logger.warning = Mock()
return logger
@pytest.fixture
def test_container(mock_twitch_client, mock_logger):
"""Create service container with mock services"""
container = ServiceContainer()
container.register_singleton(TwitchClient, mock_twitch_client)
container.register_singleton(logging.Logger, mock_logger)
return container
@pytest.fixture
def test_event_bus(test_container):
"""Create event bus with test services"""
registry = HandlerRegistry(test_container)
event_bus = EventBus(registry)
return event_bus
@pytest.fixture
async def test_database():
"""Create in-memory test database"""
from starstreamer.db.database import Database
db = Database(":memory:")
await db.initialize()
yield db
await db.close()
Event Factory Utilities¶
def create_chat_event(username="TestUser", message="test message", user_id="123"):
"""Factory function for creating chat events"""
return Event(
type="twitch.chat.message",
data={
"user": {
"id": user_id,
"display_name": username,
"username": username.lower()
},
"message": message
},
timestamp=time.time(),
source="test"
)
def create_follow_event(username="TestFollower", user_id="456"):
"""Factory function for creating follow events"""
return Event(
type="twitch.follow",
data={
"user_id": user_id,
"user_name": username,
"followed_at": "2023-01-01T00:00:00Z"
},
timestamp=time.time(),
source="test"
)
def create_subscription_event(username="TestSub", tier="1000", user_id="789"):
"""Factory function for creating subscription events"""
return Event(
type="twitch.subscription",
data={
"user_id": user_id,
"user_name": username,
"tier": tier,
"is_gift": False
},
timestamp=time.time(),
source="test"
)
Testing Best Practices¶
Test Organization¶
- One test class per module/feature
- Descriptive test names that explain what is being tested
- Arrange-Act-Assert pattern for test structure
- Group related tests using test classes
- Use fixtures for common setup and teardown
Mocking Guidelines¶
# ✅ Good: Mock external dependencies
@patch('external_api.fetch_data')
async def test_handler_with_api(mock_fetch):
mock_fetch.return_value = {"status": "ok"}
# Test logic here
# ✅ Good: Mock at the boundary
def test_service_with_database():
mock_db = Mock()
service = MyService(database=mock_db)
# Test service logic without database
# ❌ Avoid: Over-mocking internal logic
def test_calculation():
# Don't mock the calculation itself
result = calculate_value(10, 20)
assert result == 30
Assertion Best Practices¶
# ✅ Good: Specific assertions
assert twitch.send_message.call_count == 1
assert "Hello TestUser" in twitch.send_message.call_args[0][0]
# ✅ Good: Multiple specific assertions
twitch.send_message.assert_called_once_with("Expected message")
logger.info.assert_called_once()
# ❌ Avoid: Vague assertions
assert twitch.send_message.called # Too general
assert len(result) > 0 # Not specific enough
Error Testing¶
# ✅ Test expected exceptions
with pytest.raises(ValueError, match="Invalid user ID"):
await user_service.get_user("invalid")
# ✅ Test error handling behavior
@pytest.mark.asyncio
async def test_graceful_error_handling():
# Arrange error condition
api_service.get_data.side_effect = ConnectionError()
# Act - should not raise
await resilient_handler(event, twitch, api_service)
# Assert error was handled gracefully
twitch.send_message.assert_called_with("Service temporarily unavailable")
Continuous Integration¶
GitHub Actions Example¶
.github/workflows/test.yml:
name: Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install dependencies
run: uv pip install -e ".[dev]"
- name: Run tests
run: uv run pytest --cov=starstreamer --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
Pre-commit Hooks¶
.pre-commit-config.yaml:
repos:
- repo: local
hooks:
- id: tests
name: Run tests
entry: uv run pytest tests/unit/
language: system
pass_filenames: false
always_run: true
Performance Testing¶
Testing Handler Performance¶
import time
import pytest
@pytest.mark.asyncio
async def test_handler_performance():
"""Test that handler completes within acceptable time"""
# Arrange
start_time = time.time()
# Act
await my_handler(event, services)
# Assert
execution_time = time.time() - start_time
assert execution_time < 0.1 # Should complete within 100ms
Load Testing¶
@pytest.mark.asyncio
async def test_event_bus_load():
"""Test event bus under load"""
event_bus = create_test_event_bus()
received_count = 0
async def counter_handler(event, ctx):
nonlocal received_count
received_count += 1
event_bus.register_handler("test.event", counter_handler)
await event_bus.start()
# Send many events rapidly
for i in range(1000):
await event_bus.emit("test.event", {"id": i})
# Wait for processing
await asyncio.sleep(1.0)
# Assert all events were processed
assert received_count == 1000
await event_bus.stop()
Plugin Testing¶
Testing Plugin Lifecycle¶
@pytest.mark.asyncio
async def test_plugin_lifecycle():
"""Test complete plugin lifecycle"""
from my_plugin import MyPlugin
plugin = MyPlugin(config={"api_key": "test"})
# Test loading
await plugin.on_load()
assert plugin.api_client is not None
# Test action registration
await plugin.register_actions()
# Test enabling
await plugin.on_enable()
assert plugin.enabled is True
# Test disabling
await plugin.on_disable()
assert plugin.enabled is False
# Test unloading
await plugin.on_unload()
Testing Plugin Commands¶
@pytest.mark.asyncio
async def test_plugin_command():
"""Test plugin-specific command"""
from my_plugin.actions.commands import awesome_command
event = create_chat_event(message="!awesome")
twitch = AsyncMock()
await awesome_command(event, twitch)
assert "awesome" in twitch.send_message.call_args[0][0].lower()
Debugging Tests¶
Pytest Debugging¶
# Run with verbose output
uv run pytest -v
# Stop on first failure
uv run pytest -x
# Run specific test with output
uv run pytest -s tests/unit/test_specific.py::test_function
# Debug with pdb
uv run pytest --pdb tests/unit/test_specific.py::test_function
# Show local variables on failure
uv run pytest -l
Logging in Tests¶
import logging
def test_with_logging(caplog):
"""Test that captures log output"""
with caplog.at_level(logging.INFO):
my_function_that_logs()
assert "Expected log message" in caplog.text
assert len(caplog.records) == 1
Test Maintenance¶
Keeping Tests Updated¶
- Update tests when requirements change
- Remove obsolete tests for removed features
- Refactor tests to match code refactoring
- Add tests for bug fixes to prevent regression
- Review test coverage regularly and add missing tests
Test Documentation¶
class TestChatCommands:
"""
Test suite for chat command handlers.
Tests cover:
- Basic command functionality
- Error handling and edge cases
- User permission validation
- Service integration
"""
def test_hello_command(self):
"""
Test that hello command sends personalized greeting.
Verifies:
- Username extraction from event data
- Fallback to generic greeting if no username
- Correct message formatting
"""
pass
This comprehensive testing guide provides the foundation for maintaining high-quality, reliable code in StarStreamer. Following these patterns will help ensure your modules and plugins work correctly and continue to work as the system evolves.