Skip to content

Services

Services form the business logic layer in StarStreamer, providing clean separation between data access and event handling. They encapsulate core functionality like user management and economy operations, making them reusable across different modules and handlers.

Architecture Overview

graph TB
    A[Event Handlers] --> B[Services Layer]
    B --> C[Database Layer]
    D[Web Interface] --> B
    E[Modules] --> B
    F[Dependency Injection] --> B

    subgraph "Services Layer"
        G[UserService]
        H[EconomyService]
        I[Custom Services]
    end

    subgraph "Database Layer"
        J[Database]
        K[Repositories]
        L[Models]
    end

Services act as the business logic layer between your event handlers and the database, ensuring:

  • Separation of Concerns: Business logic is separated from event handling
  • Reusability: Services can be used across multiple handlers and modules
  • Testability: Services can be tested independently with mocked dependencies
  • Consistency: Business rules are enforced in one place
  • Transaction Management: Complex operations are handled atomically

Service Pattern

Core Principles

Services in StarStreamer follow these principles:

  1. Single Responsibility: Each service handles one domain (users, economy, etc.)
  2. Dependency Injection: Services are injected into handlers automatically
  3. Database Abstraction: Services handle data persistence logic
  4. Error Handling: Services provide consistent error handling and logging
  5. Business Logic: Services contain the "what" and "why" of operations

Service vs Repository

  • Services: Contain business logic, validation, and complex operations
  • Repositories: Handle simple CRUD operations and data access patterns
# Repository - Simple data access
await user_repo.create_user(user_data)
await identity_repo.link_platform(user_id, platform, platform_user_id, platform_username)

# Service - Business logic + data access
await user_service.get_or_create_user_by_platform("twitch", twitch_id, username)

Available Services

UserService

Handles user management and profile operations with platform identity support.

from starstreamer.services.users import UserService

@on_event("twitch.chat.message")
async def track_user(event: Event, user_service: UserService) -> None:
    user_data = event.data.get("user", {})
    platform_user_id = user_data.get("id")
    username = user_data.get("login")

    # Get or create user automatically with platform identity
    user = await user_service.get_or_create_user_by_platform("twitch", platform_user_id, username)
    if user:
        await user_service.update_watch_time_by_platform("twitch", platform_user_id, 30)  # 30 seconds active

Key Methods:

Method Description Returns
get_or_create_user_by_platform(platform, platform_user_id, username) Get existing user or create new one with platform identity User \| None
update_watch_time_by_platform(platform, platform_user_id, seconds) Add watch time using platform identity bool
get_user_stats_by_platform(platform, platform_user_id) Get user's points and watch time using platform identity dict[str, Any]
link_platform_identity(user_id, platform, platform_user_id, username) Link additional platform to existing user UserIdentity \| None
get_user_identities(user_id) Get all platform identities for a user list[UserIdentity]

EconomyService

Manages currency, points, and economic transactions with platform identity support.

from starstreamer.services.economy import EconomyService

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!balance"))
async def balance_command(event: Event, economy: EconomyService, twitch: TwitchClient) -> None:
    user_data = event.data.get("user", {})
    platform_user_id = user_data.get("id")

    balance = await economy.get_balance_by_platform("twitch", platform_user_id)
    await twitch.send_message(f"💰 You have {balance} points!")

Key Methods:

Method Description Returns
get_balance_by_platform(platform, platform_user_id) Get user's balance using platform identity int
add_money_by_platform(platform, platform_user_id, amount, reason) Add points using platform identity int (new balance)
spend_money_by_platform(platform, platform_user_id, amount, reason) Spend points using platform identity bool (success)
transfer_money_by_platform(from_platform, from_id, to_platform, to_id, amount, reason) Transfer between platform users bool (success)
set_balance_by_platform(platform, platform_user_id, amount, reason) Set exact balance using platform identity int (new balance)
get_leaderboard(limit) Get top users by balance list[tuple[str, int]]
get_economy_stats() Get overall statistics dict[str, int]

Using Services in Handlers

Services are automatically injected into your event handlers through dependency injection:

from starstreamer.core.decorators import on_event
from starstreamer.triggers import CommandTrigger
from starstreamer.services.economy import EconomyService
from starstreamer.services.users import UserService
from starstreamer.plugins.twitch import TwitchClient

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!gamble"))
async def gamble_command(
    event: Event, 
    economy: EconomyService,  # Injected automatically
    user_service: UserService,  # Injected automatically
    twitch: TwitchClient  # Injected automatically
) -> None:
    user_data = event.data.get("user", {})
    user_id = user_data.get("id")
    username = user_data.get("display_name", "User")

    # Parse bet amount
    message = event.data.get("message", "")
    args = message.split()[1:]  # Skip command

    if not args:
        await twitch.send_message(f"{username}, specify an amount! Usage: !gamble <amount>")
        return

    try:
        bet_amount = int(args[0])
    except ValueError:
        await twitch.send_message(f"{username}, please enter a valid number!")
        return

    if bet_amount <= 0:
        await twitch.send_message(f"{username}, bet amount must be positive!")
        return

    # Check if user has enough points
    balance = await economy.get_balance(user_id)
    if balance < bet_amount:
        await twitch.send_message(f"{username}, you only have {balance} points!")
        return

    # Business logic - 50% chance to win
    import random
    won = random.choice([True, False])

    if won:
        new_balance = await economy.add_money(user_id, bet_amount, "Gambling win")
        await twitch.send_message(f"🎉 {username} won {bet_amount} points! New balance: {new_balance}")
    else:
        success = await economy.spend_money(user_id, bet_amount, "Gambling loss")
        if success:
            new_balance = await economy.get_balance(user_id)
            await twitch.send_message(f"💸 {username} lost {bet_amount} points! New balance: {new_balance}")

Creating Custom Services

Basic Service Structure

Create custom services by following the established pattern:

# src/starstreamer/services/quests.py
"""
Quest Service - Manages user quests and achievements
"""

import logging
from datetime import datetime
from typing import Dict, List, Optional

from starstreamer.db.database import Database


class QuestService:
    """Manages user quests and achievements"""

    def __init__(self, db: Database) -> None:
        self.db = db
        self.logger = logging.getLogger(__name__)

    async def create_quest(self, title: str, description: str, reward_points: int) -> int:
        """Create a new quest. Returns quest ID."""
        try:
            quest_id = await self.db.execute(
                "INSERT INTO quests (title, description, reward_points, created_at) VALUES (?, ?, ?, ?)",
                (title, description, reward_points, datetime.now().isoformat())
            )
            await self.db.commit()
            self.logger.info(f"Created quest '{title}' with ID {quest_id}")
            return quest_id
        except Exception as e:
            self.logger.error(f"Failed to create quest: {e}")
            raise

    async def assign_quest(self, user_id: str, quest_id: int) -> bool:
        """Assign quest to user. Returns True if successful."""
        try:
            # Check if user already has this quest
            existing = await self.db.fetchone(
                "SELECT id FROM user_quests WHERE user_id = ? AND quest_id = ?",
                (user_id, quest_id)
            )

            if existing:
                return False  # User already has this quest

            await self.db.execute(
                "INSERT INTO user_quests (user_id, quest_id, assigned_at, status) VALUES (?, ?, ?, 'active')",
                (user_id, quest_id, datetime.now().isoformat())
            )
            await self.db.commit()
            self.logger.info(f"Assigned quest {quest_id} to user {user_id}")
            return True

        except Exception as e:
            self.logger.error(f"Failed to assign quest: {e}")
            return False

    async def complete_quest(self, user_id: str, quest_id: int) -> Optional[int]:
        """Complete quest and award points. Returns points awarded or None if failed."""
        try:
            # Get quest details
            quest = await self.db.fetchone(
                "SELECT reward_points FROM quests WHERE id = ?",
                (quest_id,)
            )

            if not quest:
                return None

            # Update quest status
            await self.db.execute(
                "UPDATE user_quests SET status = 'completed', completed_at = ? WHERE user_id = ? AND quest_id = ?",
                (datetime.now().isoformat(), user_id, quest_id)
            )

            # Award points (would use EconomyService in real implementation)
            reward_points = quest[0]
            await self.db.execute(
                "UPDATE users SET points = points + ? WHERE twitch_id = ?",
                (reward_points, user_id)
            )

            await self.db.commit()
            self.logger.info(f"User {user_id} completed quest {quest_id} for {reward_points} points")
            return reward_points

        except Exception as e:
            self.logger.error(f"Failed to complete quest: {e}")
            return None

    async def get_user_quests(self, user_id: str, status: str = "active") -> List[Dict]:
        """Get user's quests by status"""
        try:
            rows = await self.db.fetchall("""
                SELECT q.id, q.title, q.description, q.reward_points, uq.assigned_at, uq.status
                FROM quests q
                JOIN user_quests uq ON q.id = uq.quest_id
                WHERE uq.user_id = ? AND uq.status = ?
            """, (user_id, status))

            return [
                {
                    "id": row[0],
                    "title": row[1],
                    "description": row[2],
                    "reward_points": row[3],
                    "assigned_at": row[4],
                    "status": row[5]
                }
                for row in rows
            ]

        except Exception as e:
            self.logger.error(f"Failed to get user quests: {e}")
            return []

Service Registration

Register your custom service in the dependency injection container:

# In src/main.py or your module's initialization
from starstreamer.services.quests import QuestService

async def initialize_services(self) -> None:
    """Initialize core services and register them in DI container"""
    try:
        # ... existing services ...

        # Register custom service
        quest_service = QuestService(self.database)
        self.container.register_singleton(QuestService, quest_service)

        self.logger.info("Core services initialized successfully")
    except Exception as e:
        self.logger.error(f"Failed to initialize services: {e}")
        raise

Using Custom Services

Once registered, your custom service can be injected into handlers:

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!quests"))
async def list_quests_command(
    event: Event, 
    quest_service: QuestService,  # Custom service injection
    twitch: TwitchClient
) -> None:
    user_data = event.data.get("user", {})
    user_id = user_data.get("id")
    username = user_data.get("display_name", "User")

    quests = await quest_service.get_user_quests(user_id, "active")

    if not quests:
        await twitch.send_message(f"{username}, you have no active quests!")
        return

    quest_list = ", ".join([f"{q['title']} ({q['reward_points']} pts)" for q in quests])
    await twitch.send_message(f"{username}'s quests: {quest_list}")

Service Composition

Services can use other services to build complex functionality:

class RewardService:
    """Manages rewards and payouts"""

    def __init__(self, db: Database, economy: EconomyService, user_service: UserService) -> None:
        self.db = db
        self.economy = economy
        self.user_service = user_service
        self.logger = logging.getLogger(__name__)

    async def reward_follow(self, user_id: str, username: str) -> int:
        """Reward user for following"""
        # Ensure user exists
        user = await self.user_service.get_or_create_user(user_id, username)
        if not user:
            return 0

        # Award follow bonus
        follow_bonus = 100
        new_balance = await self.economy.add_money(user_id, follow_bonus, "New follower bonus")

        self.logger.info(f"Rewarded {username} {follow_bonus} points for following")
        return new_balance

    async def reward_watch_time(self, user_id: str, minutes_watched: int) -> int:
        """Reward user for watch time"""
        if minutes_watched < 5:  # Minimum 5 minutes
            return 0

        # 1 point per minute watched
        points_earned = minutes_watched
        new_balance = await self.economy.add_money(user_id, points_earned, f"Watch time: {minutes_watched}m")

        return new_balance

Testing Services

Services should be thoroughly tested with mocked dependencies:

# tests/unit/services/test_quest_service.py
import pytest
from unittest.mock import AsyncMock, Mock

from starstreamer.services.quests import QuestService


class TestQuestService:
    """Test QuestService functionality"""

    @pytest.fixture
    def mock_db(self):
        """Mock database for testing"""
        db = Mock()
        db.execute = AsyncMock()
        db.fetchone = AsyncMock()
        db.fetchall = AsyncMock()
        db.commit = AsyncMock()
        return db

    @pytest.fixture
    def quest_service(self, mock_db):
        """Create QuestService with mocked database"""
        return QuestService(mock_db)

    @pytest.mark.asyncio
    async def test_create_quest(self, quest_service, mock_db):
        """Test quest creation"""
        mock_db.execute.return_value = 123  # Mock quest ID

        quest_id = await quest_service.create_quest(
            "Test Quest", 
            "A test quest", 
            50
        )

        assert quest_id == 123
        mock_db.execute.assert_called_once()
        mock_db.commit.assert_called_once()

    @pytest.mark.asyncio
    async def test_assign_quest_success(self, quest_service, mock_db):
        """Test successful quest assignment"""
        mock_db.fetchone.return_value = None  # No existing quest

        result = await quest_service.assign_quest("user123", 456)

        assert result is True
        mock_db.execute.assert_called_once()
        mock_db.commit.assert_called_once()

    @pytest.mark.asyncio
    async def test_assign_quest_already_exists(self, quest_service, mock_db):
        """Test quest assignment when user already has quest"""
        mock_db.fetchone.return_value = (1,)  # Existing quest

        result = await quest_service.assign_quest("user123", 456)

        assert result is False
        mock_db.execute.assert_not_called()

Best Practices

1. Service Responsibilities

Keep services focused on their domain:

# ✅ Good - Focused responsibility
class EconomyService:
    async def add_money(self, user_id: str, amount: int, reason: str) -> int:
        # Handle money operations only
        pass

# ❌ Bad - Mixed responsibilities  
class EconomyService:
    async def add_money_and_send_notification(self, user_id: str, amount: int) -> int:
        # Mixing economy logic with notifications
        pass

2. Error Handling

Provide consistent error handling patterns:

async def transfer_money(self, from_id: str, to_id: str, amount: int) -> bool:
    """Transfer money between users. Returns True if successful."""
    try:
        # Business logic here
        await self.db.commit()
        return True
    except Exception as e:
        self.logger.error(f"Failed to transfer money: {e}")
        # Rollback logic if needed
        return False

3. Validation

Validate inputs at the service layer:

async def add_money(self, user_id: str, amount: int, reason: str = "") -> int:
    """Add money to user's balance"""
    if amount <= 0:
        raise ValueError("Amount must be positive")

    if not user_id:
        raise ValueError("User ID is required")

    # Continue with business logic...

4. Logging

Include comprehensive logging for debugging:

async def complete_quest(self, user_id: str, quest_id: int) -> Optional[int]:
    """Complete quest and award points"""
    try:
        self.logger.debug(f"Attempting to complete quest {quest_id} for user {user_id}")

        # Business logic...

        self.logger.info(f"User {user_id} completed quest {quest_id} for {reward_points} points")
        return reward_points
    except Exception as e:
        self.logger.error(f"Failed to complete quest {quest_id} for user {user_id}: {e}")
        return None

5. Database Transactions

Handle database transactions properly:

async def complex_operation(self, user_id: str) -> bool:
    """Perform complex multi-step operation"""
    try:
        # Start transaction (if your DB supports it)
        async with self.db.transaction():
            await self.db.execute("UPDATE table1 SET ...")
            await self.db.execute("INSERT INTO table2 ...")
            await self.db.execute("DELETE FROM table3 ...")
            # Automatic commit on success, rollback on exception

        return True
    except Exception as e:
        self.logger.error(f"Complex operation failed: {e}")
        return False

Service Lifecycle

Services follow this lifecycle in StarStreamer:

  1. Registration - Services are registered in the DI container during startup
  2. Injection - Services are automatically injected into handlers as needed
  3. Execution - Services perform business logic operations
  4. Cleanup - Services are cleaned up during application shutdown
# src/main.py - Service lifecycle example
class StarStreamer:
    async def initialize_services(self) -> None:
        """Register all services in DI container"""
        economy_service = EconomyService(self.database)
        user_service = UserService(self.database)

        self.container.register_singleton(EconomyService, economy_service)
        self.container.register_singleton(UserService, user_service)

    async def shutdown(self) -> None:
        """Cleanup services if needed"""
        # Services are automatically cleaned up by DI container
        pass

See Also