Services¶
Services form the business logic layer in StarStreamer, providing clean separation between data access and event handling. They encapsulate core functionality like user management and economy operations, making them reusable across different modules and handlers.
Architecture Overview¶
graph TB
A[Event Handlers] --> B[Services Layer]
B --> C[Database Layer]
D[Web Interface] --> B
E[Modules] --> B
F[Dependency Injection] --> B
subgraph "Services Layer"
G[UserService]
H[EconomyService]
I[Custom Services]
end
subgraph "Database Layer"
J[Database]
K[Repositories]
L[Models]
end
Services act as the business logic layer between your event handlers and the database, ensuring:
- Separation of Concerns: Business logic is separated from event handling
- Reusability: Services can be used across multiple handlers and modules
- Testability: Services can be tested independently with mocked dependencies
- Consistency: Business rules are enforced in one place
- Transaction Management: Complex operations are handled atomically
Service Pattern¶
Core Principles¶
Services in StarStreamer follow these principles:
- Single Responsibility: Each service handles one domain (users, economy, etc.)
- Dependency Injection: Services are injected into handlers automatically
- Database Abstraction: Services handle data persistence logic
- Error Handling: Services provide consistent error handling and logging
- Business Logic: Services contain the "what" and "why" of operations
Service vs Repository¶
- Services: Contain business logic, validation, and complex operations
- Repositories: Handle simple CRUD operations and data access patterns
# Repository - Simple data access
await user_repo.create_user(user_data)
await identity_repo.link_platform(user_id, platform, platform_user_id, platform_username)
# Service - Business logic + data access
await user_service.get_or_create_user_by_platform("twitch", twitch_id, username)
Available Services¶
UserService¶
Handles user management and profile operations with platform identity support.
from starstreamer.services.users import UserService
@on_event("twitch.chat.message")
async def track_user(event: Event, user_service: UserService) -> None:
user_data = event.data.get("user", {})
platform_user_id = user_data.get("id")
username = user_data.get("login")
# Get or create user automatically with platform identity
user = await user_service.get_or_create_user_by_platform("twitch", platform_user_id, username)
if user:
await user_service.update_watch_time_by_platform("twitch", platform_user_id, 30) # 30 seconds active
Key Methods:
| Method | Description | Returns |
|---|---|---|
get_or_create_user_by_platform(platform, platform_user_id, username) |
Get existing user or create new one with platform identity | User \| None |
update_watch_time_by_platform(platform, platform_user_id, seconds) |
Add watch time using platform identity | bool |
get_user_stats_by_platform(platform, platform_user_id) |
Get user's points and watch time using platform identity | dict[str, Any] |
link_platform_identity(user_id, platform, platform_user_id, username) |
Link additional platform to existing user | UserIdentity \| None |
get_user_identities(user_id) |
Get all platform identities for a user | list[UserIdentity] |
EconomyService¶
Manages currency, points, and economic transactions with platform identity support.
from starstreamer.services.economy import EconomyService
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!balance"))
async def balance_command(event: Event, economy: EconomyService, twitch: TwitchClient) -> None:
user_data = event.data.get("user", {})
platform_user_id = user_data.get("id")
balance = await economy.get_balance_by_platform("twitch", platform_user_id)
await twitch.send_message(f"💰 You have {balance} points!")
Key Methods:
| Method | Description | Returns |
|---|---|---|
get_balance_by_platform(platform, platform_user_id) |
Get user's balance using platform identity | int |
add_money_by_platform(platform, platform_user_id, amount, reason) |
Add points using platform identity | int (new balance) |
spend_money_by_platform(platform, platform_user_id, amount, reason) |
Spend points using platform identity | bool (success) |
transfer_money_by_platform(from_platform, from_id, to_platform, to_id, amount, reason) |
Transfer between platform users | bool (success) |
set_balance_by_platform(platform, platform_user_id, amount, reason) |
Set exact balance using platform identity | int (new balance) |
get_leaderboard(limit) |
Get top users by balance | list[tuple[str, int]] |
get_economy_stats() |
Get overall statistics | dict[str, int] |
Using Services in Handlers¶
Services are automatically injected into your event handlers through dependency injection:
from starstreamer.core.decorators import on_event
from starstreamer.triggers import CommandTrigger
from starstreamer.services.economy import EconomyService
from starstreamer.services.users import UserService
from starstreamer.plugins.twitch import TwitchClient
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!gamble"))
async def gamble_command(
event: Event,
economy: EconomyService, # Injected automatically
user_service: UserService, # Injected automatically
twitch: TwitchClient # Injected automatically
) -> None:
user_data = event.data.get("user", {})
user_id = user_data.get("id")
username = user_data.get("display_name", "User")
# Parse bet amount
message = event.data.get("message", "")
args = message.split()[1:] # Skip command
if not args:
await twitch.send_message(f"{username}, specify an amount! Usage: !gamble <amount>")
return
try:
bet_amount = int(args[0])
except ValueError:
await twitch.send_message(f"{username}, please enter a valid number!")
return
if bet_amount <= 0:
await twitch.send_message(f"{username}, bet amount must be positive!")
return
# Check if user has enough points
balance = await economy.get_balance(user_id)
if balance < bet_amount:
await twitch.send_message(f"{username}, you only have {balance} points!")
return
# Business logic - 50% chance to win
import random
won = random.choice([True, False])
if won:
new_balance = await economy.add_money(user_id, bet_amount, "Gambling win")
await twitch.send_message(f"🎉 {username} won {bet_amount} points! New balance: {new_balance}")
else:
success = await economy.spend_money(user_id, bet_amount, "Gambling loss")
if success:
new_balance = await economy.get_balance(user_id)
await twitch.send_message(f"💸 {username} lost {bet_amount} points! New balance: {new_balance}")
Creating Custom Services¶
Basic Service Structure¶
Create custom services by following the established pattern:
# src/starstreamer/services/quests.py
"""
Quest Service - Manages user quests and achievements
"""
import logging
from datetime import datetime
from typing import Dict, List, Optional
from starstreamer.db.database import Database
class QuestService:
"""Manages user quests and achievements"""
def __init__(self, db: Database) -> None:
self.db = db
self.logger = logging.getLogger(__name__)
async def create_quest(self, title: str, description: str, reward_points: int) -> int:
"""Create a new quest. Returns quest ID."""
try:
quest_id = await self.db.execute(
"INSERT INTO quests (title, description, reward_points, created_at) VALUES (?, ?, ?, ?)",
(title, description, reward_points, datetime.now().isoformat())
)
await self.db.commit()
self.logger.info(f"Created quest '{title}' with ID {quest_id}")
return quest_id
except Exception as e:
self.logger.error(f"Failed to create quest: {e}")
raise
async def assign_quest(self, user_id: str, quest_id: int) -> bool:
"""Assign quest to user. Returns True if successful."""
try:
# Check if user already has this quest
existing = await self.db.fetchone(
"SELECT id FROM user_quests WHERE user_id = ? AND quest_id = ?",
(user_id, quest_id)
)
if existing:
return False # User already has this quest
await self.db.execute(
"INSERT INTO user_quests (user_id, quest_id, assigned_at, status) VALUES (?, ?, ?, 'active')",
(user_id, quest_id, datetime.now().isoformat())
)
await self.db.commit()
self.logger.info(f"Assigned quest {quest_id} to user {user_id}")
return True
except Exception as e:
self.logger.error(f"Failed to assign quest: {e}")
return False
async def complete_quest(self, user_id: str, quest_id: int) -> Optional[int]:
"""Complete quest and award points. Returns points awarded or None if failed."""
try:
# Get quest details
quest = await self.db.fetchone(
"SELECT reward_points FROM quests WHERE id = ?",
(quest_id,)
)
if not quest:
return None
# Update quest status
await self.db.execute(
"UPDATE user_quests SET status = 'completed', completed_at = ? WHERE user_id = ? AND quest_id = ?",
(datetime.now().isoformat(), user_id, quest_id)
)
# Award points (would use EconomyService in real implementation)
reward_points = quest[0]
await self.db.execute(
"UPDATE users SET points = points + ? WHERE twitch_id = ?",
(reward_points, user_id)
)
await self.db.commit()
self.logger.info(f"User {user_id} completed quest {quest_id} for {reward_points} points")
return reward_points
except Exception as e:
self.logger.error(f"Failed to complete quest: {e}")
return None
async def get_user_quests(self, user_id: str, status: str = "active") -> List[Dict]:
"""Get user's quests by status"""
try:
rows = await self.db.fetchall("""
SELECT q.id, q.title, q.description, q.reward_points, uq.assigned_at, uq.status
FROM quests q
JOIN user_quests uq ON q.id = uq.quest_id
WHERE uq.user_id = ? AND uq.status = ?
""", (user_id, status))
return [
{
"id": row[0],
"title": row[1],
"description": row[2],
"reward_points": row[3],
"assigned_at": row[4],
"status": row[5]
}
for row in rows
]
except Exception as e:
self.logger.error(f"Failed to get user quests: {e}")
return []
Service Registration¶
Register your custom service in the dependency injection container:
# In src/main.py or your module's initialization
from starstreamer.services.quests import QuestService
async def initialize_services(self) -> None:
"""Initialize core services and register them in DI container"""
try:
# ... existing services ...
# Register custom service
quest_service = QuestService(self.database)
self.container.register_singleton(QuestService, quest_service)
self.logger.info("Core services initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize services: {e}")
raise
Using Custom Services¶
Once registered, your custom service can be injected into handlers:
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!quests"))
async def list_quests_command(
event: Event,
quest_service: QuestService, # Custom service injection
twitch: TwitchClient
) -> None:
user_data = event.data.get("user", {})
user_id = user_data.get("id")
username = user_data.get("display_name", "User")
quests = await quest_service.get_user_quests(user_id, "active")
if not quests:
await twitch.send_message(f"{username}, you have no active quests!")
return
quest_list = ", ".join([f"{q['title']} ({q['reward_points']} pts)" for q in quests])
await twitch.send_message(f"{username}'s quests: {quest_list}")
Service Composition¶
Services can use other services to build complex functionality:
class RewardService:
"""Manages rewards and payouts"""
def __init__(self, db: Database, economy: EconomyService, user_service: UserService) -> None:
self.db = db
self.economy = economy
self.user_service = user_service
self.logger = logging.getLogger(__name__)
async def reward_follow(self, user_id: str, username: str) -> int:
"""Reward user for following"""
# Ensure user exists
user = await self.user_service.get_or_create_user(user_id, username)
if not user:
return 0
# Award follow bonus
follow_bonus = 100
new_balance = await self.economy.add_money(user_id, follow_bonus, "New follower bonus")
self.logger.info(f"Rewarded {username} {follow_bonus} points for following")
return new_balance
async def reward_watch_time(self, user_id: str, minutes_watched: int) -> int:
"""Reward user for watch time"""
if minutes_watched < 5: # Minimum 5 minutes
return 0
# 1 point per minute watched
points_earned = minutes_watched
new_balance = await self.economy.add_money(user_id, points_earned, f"Watch time: {minutes_watched}m")
return new_balance
Testing Services¶
Services should be thoroughly tested with mocked dependencies:
# tests/unit/services/test_quest_service.py
import pytest
from unittest.mock import AsyncMock, Mock
from starstreamer.services.quests import QuestService
class TestQuestService:
"""Test QuestService functionality"""
@pytest.fixture
def mock_db(self):
"""Mock database for testing"""
db = Mock()
db.execute = AsyncMock()
db.fetchone = AsyncMock()
db.fetchall = AsyncMock()
db.commit = AsyncMock()
return db
@pytest.fixture
def quest_service(self, mock_db):
"""Create QuestService with mocked database"""
return QuestService(mock_db)
@pytest.mark.asyncio
async def test_create_quest(self, quest_service, mock_db):
"""Test quest creation"""
mock_db.execute.return_value = 123 # Mock quest ID
quest_id = await quest_service.create_quest(
"Test Quest",
"A test quest",
50
)
assert quest_id == 123
mock_db.execute.assert_called_once()
mock_db.commit.assert_called_once()
@pytest.mark.asyncio
async def test_assign_quest_success(self, quest_service, mock_db):
"""Test successful quest assignment"""
mock_db.fetchone.return_value = None # No existing quest
result = await quest_service.assign_quest("user123", 456)
assert result is True
mock_db.execute.assert_called_once()
mock_db.commit.assert_called_once()
@pytest.mark.asyncio
async def test_assign_quest_already_exists(self, quest_service, mock_db):
"""Test quest assignment when user already has quest"""
mock_db.fetchone.return_value = (1,) # Existing quest
result = await quest_service.assign_quest("user123", 456)
assert result is False
mock_db.execute.assert_not_called()
Best Practices¶
1. Service Responsibilities¶
Keep services focused on their domain:
# ✅ Good - Focused responsibility
class EconomyService:
async def add_money(self, user_id: str, amount: int, reason: str) -> int:
# Handle money operations only
pass
# ❌ Bad - Mixed responsibilities
class EconomyService:
async def add_money_and_send_notification(self, user_id: str, amount: int) -> int:
# Mixing economy logic with notifications
pass
2. Error Handling¶
Provide consistent error handling patterns:
async def transfer_money(self, from_id: str, to_id: str, amount: int) -> bool:
"""Transfer money between users. Returns True if successful."""
try:
# Business logic here
await self.db.commit()
return True
except Exception as e:
self.logger.error(f"Failed to transfer money: {e}")
# Rollback logic if needed
return False
3. Validation¶
Validate inputs at the service layer:
async def add_money(self, user_id: str, amount: int, reason: str = "") -> int:
"""Add money to user's balance"""
if amount <= 0:
raise ValueError("Amount must be positive")
if not user_id:
raise ValueError("User ID is required")
# Continue with business logic...
4. Logging¶
Include comprehensive logging for debugging:
async def complete_quest(self, user_id: str, quest_id: int) -> Optional[int]:
"""Complete quest and award points"""
try:
self.logger.debug(f"Attempting to complete quest {quest_id} for user {user_id}")
# Business logic...
self.logger.info(f"User {user_id} completed quest {quest_id} for {reward_points} points")
return reward_points
except Exception as e:
self.logger.error(f"Failed to complete quest {quest_id} for user {user_id}: {e}")
return None
5. Database Transactions¶
Handle database transactions properly:
async def complex_operation(self, user_id: str) -> bool:
"""Perform complex multi-step operation"""
try:
# Start transaction (if your DB supports it)
async with self.db.transaction():
await self.db.execute("UPDATE table1 SET ...")
await self.db.execute("INSERT INTO table2 ...")
await self.db.execute("DELETE FROM table3 ...")
# Automatic commit on success, rollback on exception
return True
except Exception as e:
self.logger.error(f"Complex operation failed: {e}")
return False
Service Lifecycle¶
Services follow this lifecycle in StarStreamer:
- Registration - Services are registered in the DI container during startup
- Injection - Services are automatically injected into handlers as needed
- Execution - Services perform business logic operations
- Cleanup - Services are cleaned up during application shutdown
# src/main.py - Service lifecycle example
class StarStreamer:
async def initialize_services(self) -> None:
"""Register all services in DI container"""
economy_service = EconomyService(self.database)
user_service = UserService(self.database)
self.container.register_singleton(EconomyService, economy_service)
self.container.register_singleton(UserService, user_service)
async def shutdown(self) -> None:
"""Cleanup services if needed"""
# Services are automatically cleaned up by DI container
pass
See Also¶
- Dependency Injection - How DI works with services
- Database - Database integration and repositories
- Event System - How services integrate with events
- Testing Guide - Testing services and business logic
- API Reference - Dependency Injection - DI container documentation