Skip to content

Platform Plugin Development Guide

StarStreamer uses a plugin architecture to integrate with external platforms like Twitch, OBS, Discord, and other streaming services. Plugins provide the connection layer that modules use to interact with these platforms.

Overview

In StarStreamer's architecture:

  • Plugins - Platform integrations that connect to external services (Twitch, OBS, Discord)
  • Modules - Feature implementations that use plugins to provide functionality (chat commands, alerts, economy)

This separation allows for clean, testable code where business logic (modules) is decoupled from platform-specific details (plugins).

Plugin Architecture

Plugin vs Module

  • Plugin = Platform integration layer (connects to Twitch, OBS, etc.)
  • Module = Feature layer (chat commands, alerts, etc. using plugins)

Plugin Structure

src/starstreamer/plugins/
├── __init__.py              # Plugin exports
├── twitch/                  # Twitch platform plugin
│   ├── __init__.py          # TwitchClient export
│   ├── client.py            # Main plugin orchestrator
│   ├── api.py               # Helix API integration
│   ├── eventsub.py          # EventSub WebSocket manager
│   └── models.py            # Twitch data models
├── obs/                     # OBS WebSocket plugin
│   ├── __init__.py          # OBSClient export
│   ├── client.py            # Main OBS client (110+ methods)
│   └── models.py            # OBS configuration models
├── litellm/                 # AI integration plugin
│   ├── __init__.py          # AIClient export
│   └── client.py            # LiteLLM/OpenRouter client
└── elevenlabs/              # Text-to-speech plugin
    ├── __init__.py          # ElevenLabsClient export
    ├── client.py            # ElevenLabs API client
    └── models.py            # Voice and TTS models

Understanding the Twitch Plugin

The Twitch plugin serves as the reference implementation for all platform plugins.

Components

client.py - Main Orchestrator

class TwitchClient:
    """Twitch API client with EventSub WebSocket support"""

    def __init__(self, config: TwitchConfig | None = None):
        self.config = config
        self.api = HelixAPI(self)           # REST API client
        self.eventsub = EventSubManager(self)  # WebSocket events

    async def connect(self) -> None:
        """Connect to Twitch services"""
        await self.eventsub.connect()

    # Convenience methods that delegate to API
    async def send_message(self, message: str) -> bool:
        return await self.api.send_chat_message(message)

api.py - REST API Integration

class HelixAPI:
    """Twitch Helix API client"""

    async def send_chat_message(self, message: str) -> bool:
        """Send chat message via Helix API"""
        # Implementation for POST /chat/messages

    async def get_user_info(self, user_id: str) -> dict:
        """Get user information"""
        # Implementation for GET /users

eventsub.py - Event Stream Manager

class EventSubManager:
    """Manages Twitch EventSub WebSocket connection"""

    async def connect(self) -> None:
        """Connect to EventSub WebSocket"""
        # WebSocket connection and subscription management

    async def _handle_event(self, event_data: dict) -> None:
        """Transform and emit events to StarStreamer event bus"""
        # Convert Twitch events to StarStreamer events

How Modules Use Plugins

Modules access plugins through dependency injection:

# In a module's action handler
from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger
from starstreamer.plugins.twitch import TwitchClient

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!hello"))
async def hello_command(event: Event, twitch: TwitchClient) -> None:
    """Example of module using Twitch plugin"""
    username = event.data['user']['display_name']
    await twitch.send_message(f"Hello {username}!")  # Uses plugin

The TwitchClient is injected by the dependency injection system, providing modules with a clean interface to platform functionality.

Understanding the LiteLLM Plugin

The LiteLLM plugin provides AI integration capabilities through OpenRouter, demonstrating how to integrate third-party APIs with StarStreamer.

Plugin Architecture

client.py - AI Client

class AIClient:
    """AI client using OpenRouter through LiteLLM for language model access"""

    _instance: "AIClient | None" = None  # Singleton pattern

    def __init__(self, config: AIConfig | None = None):
        self.config = config
        self.connected = False

    @classmethod
    def get_instance(cls) -> "AIClient":
        """Get singleton instance"""
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    async def complete(self, prompt: str, **kwargs: Any) -> AIResponse:
        """Complete a single prompt"""
        messages = [{"role": "user", "content": prompt}]
        return await self.chat(messages, **kwargs)

    async def chat(self, messages: list[dict[str, str]], **kwargs: Any) -> AIResponse:
        """Chat with conversation history"""
        import litellm

        params = {
            "model": kwargs.get("model", "openrouter/anthropic/claude-3.5-sonnet"),  # Default model
            "messages": messages,
            "temperature": kwargs.get("temperature", 0.7),  # Default temperature
            "max_tokens": kwargs.get("max_tokens", 150),     # Default max_tokens
        }

        response = await litellm.acompletion(**params)

        return AIResponse(
            content=response["choices"][0]["message"]["content"],
            model=response["model"],
            usage=response.get("usage", {}),
            finish_reason=response["choices"][0].get("finish_reason"),
        )

Key Features

Singleton Pattern: Ensures efficient resource management with a single AI client instance.

LiteLLM Integration: Provides unified interface to multiple AI providers through OpenRouter.

Response Wrapper: AIResponse class provides structured access to AI responses and metadata.

Module Usage Example

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!ask"))
async def ask_command(event: Event, ai: AIClient, twitch: TwitchClient) -> None:
    """AI-powered question answering using LiteLLM plugin"""
    question = event.data.get("message", "").split(maxsplit=1)[1]
    # Use Claude 3.5 Sonnet for question answering
    response = await ai.complete(
        f"Answer this question concisely: {question}",
        model="openrouter/anthropic/claude-3.5-sonnet"
    )
    await twitch.send_message(f"AI: {response.content}")

Configuration Integration

The plugin integrates with StarStreamer's configuration system:

ai:
  openrouter:
    enabled: true
    api_key: "${OPENROUTER_API_KEY}"
    # Models are specified at the module level, not in configuration

Creating a New Platform Plugin

Step 1: Plugin Structure

Create a new plugin directory under src/starstreamer/plugins/:

mkdir -p src/starstreamer/plugins/discord
touch src/starstreamer/plugins/discord/__init__.py
touch src/starstreamer/plugins/discord/client.py
touch src/starstreamer/plugins/discord/gateway.py
touch src/starstreamer/plugins/discord/models.py

Step 2: Define Plugin Models

models.py:

"""Discord plugin data models"""

from pydantic import BaseModel

class DiscordConfig(BaseModel):
    """Discord bot configuration"""
    bot_token: str
    guild_id: str | None = None
    command_prefix: str = "!"

class DiscordUser(BaseModel):
    """Discord user representation"""
    id: str
    username: str
    display_name: str
    avatar_url: str | None = None

class DiscordMessage(BaseModel):
    """Discord message representation"""
    id: str
    content: str
    author: DiscordUser
    channel_id: str
    guild_id: str | None = None

Step 3: Implement Main Client

client.py:

"""Discord client that orchestrates Discord integrations"""

import logging
from typing import Any

from starstreamer.models import DiscordConfig
from .gateway import DiscordGateway

logger = logging.getLogger(__name__)

class DiscordClient:
    """Discord bot client with gateway support"""

    def __init__(self, config: DiscordConfig | None = None) -> None:
        if config is None:
            import os
            config = DiscordConfig(
                bot_token=os.getenv("DISCORD_BOT_TOKEN", ""),
                guild_id=os.getenv("DISCORD_GUILD_ID"),
                command_prefix=os.getenv("DISCORD_PREFIX", "!"),
            )

        self.config = config
        self.gateway = DiscordGateway(self)
        self.connected = False

    async def connect(self) -> None:
        """Connect to Discord gateway"""
        if not self.config.bot_token:
            raise ValueError("Discord bot token is required")

        await self.gateway.connect()
        self.connected = True
        logger.info("Connected to Discord")

    async def disconnect(self) -> None:
        """Disconnect from Discord gateway"""
        await self.gateway.disconnect()
        self.connected = False
        logger.info("Disconnected from Discord")

    async def send_message(self, channel_id: str, content: str) -> bool:
        """Send message to Discord channel"""
        return await self.gateway.send_message(channel_id, content)

    async def send_embed(self, channel_id: str, embed: dict[str, Any]) -> bool:
        """Send embed to Discord channel"""
        return await self.gateway.send_embed(channel_id, embed)

Step 4: Implement Event Gateway

gateway.py:

"""Discord gateway WebSocket manager"""

import asyncio
import json
import logging
from typing import Any

import websockets

from starstreamer.core.event_bus import get_event_bus

logger = logging.getLogger(__name__)

class DiscordGateway:
    """Manages Discord gateway WebSocket connection"""

    def __init__(self, client: "DiscordClient"):
        self.client = client
        self.websocket = None
        self.heartbeat_task = None

    async def connect(self) -> None:
        """Connect to Discord gateway"""
        # Implementation for Discord gateway connection
        gateway_url = "wss://gateway.discord.gg/?v=10&encoding=json"

        self.websocket = await websockets.connect(gateway_url)

        # Start heartbeat and event handling
        self.heartbeat_task = asyncio.create_task(self._heartbeat_loop())
        asyncio.create_task(self._event_loop())

    async def disconnect(self) -> None:
        """Disconnect from gateway"""
        if self.heartbeat_task:
            self.heartbeat_task.cancel()

        if self.websocket:
            await self.websocket.close()

    async def send_message(self, channel_id: str, content: str) -> bool:
        """Send message via REST API"""
        # Implementation for Discord REST API message sending
        return True

    async def _event_loop(self) -> None:
        """Handle incoming gateway events"""
        async for message in self.websocket:
            try:
                data = json.loads(message)
                await self._handle_event(data)
            except Exception as e:
                logger.error(f"Error handling Discord event: {e}")

    async def _handle_event(self, event_data: dict[str, Any]) -> None:
        """Transform Discord events to StarStreamer events"""
        event_bus = get_event_bus()

        # Transform Discord events to StarStreamer format
        if event_data.get("t") == "MESSAGE_CREATE":
            message_data = event_data["d"]

            # Convert to StarStreamer event format
            starstreamer_event = {
                "type": "discord.message",
                "data": {
                    "message": {
                        "id": message_data["id"],
                        "content": message_data["content"],
                    },
                    "user": {
                        "id": message_data["author"]["id"],
                        "username": message_data["author"]["username"],
                        "display_name": message_data["author"].get("display_name") or message_data["author"]["username"],
                    },
                    "channel_id": message_data["channel_id"],
                },
                "source": "discord",
                "timestamp": message_data["timestamp"],
            }

            await event_bus.emit("discord.message", starstreamer_event["data"])

Step 5: Export Plugin

__init__.py:

"""Discord platform plugin for StarStreamer"""

from .client import DiscordClient

__all__ = ["DiscordClient"]

Step 6: Register with Dependency Injection

src/starstreamer/plugins/__init__.py:

"""Platform integrations for StarStreamer"""

from .twitch import TwitchClient
from .discord import DiscordClient

__all__ = ["TwitchClient", "DiscordClient"]

Step 7: Configure in Main Application

src/main.py:

# Add Discord to dependency injection
from starstreamer.plugins.discord import DiscordClient

# In setup_dependencies()
discord_config = DiscordConfig()  # From environment
discord_client = DiscordClient(discord_config)
container.register_singleton(DiscordClient, discord_client)

# In startup
await discord_client.connect()

Plugin Patterns and Best Practices

1. Configuration Management

Use Pydantic models for type-safe configuration:

class PlatformConfig(BaseModel):
    """Base configuration pattern"""
    api_key: str
    base_url: str = "https://api.platform.com"
    timeout: int = 30

    @classmethod
    def from_env(cls) -> "PlatformConfig":
        """Load from environment variables"""
        import os
        return cls(
            api_key=os.getenv("PLATFORM_API_KEY", ""),
            base_url=os.getenv("PLATFORM_BASE_URL", cls.__fields__["base_url"].default),
            timeout=int(os.getenv("PLATFORM_TIMEOUT", str(cls.__fields__["timeout"].default))),
        )

2. Event Transformation

Always transform platform events to StarStreamer's standard format:

async def _transform_platform_event(self, platform_event: dict) -> dict:
    """Transform platform event to StarStreamer format"""
    return {
        "type": f"platform.{platform_event['event_type']}",
        "data": {
            "user": {
                "id": platform_event["user"]["id"],
                "username": platform_event["user"]["login"],
                "display_name": platform_event["user"]["display_name"],
            },
            "message": platform_event.get("message", ""),
        },
        "source": "platform",
        "timestamp": platform_event["timestamp"],
    }

3. Connection Management

Implement proper lifecycle management:

class PlatformClient:
    async def __aenter__(self) -> "PlatformClient":
        """Async context manager entry"""
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit"""
        await self.disconnect()

4. Error Handling

Handle connection failures gracefully:

async def connect(self) -> None:
    """Connect with retry logic"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            await self._establish_connection()
            self.connected = True
            return
        except Exception as e:
            logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

5. Dependency Injection Integration

Make plugins available to modules via DI:

# In module handlers
@on_event("platform.message")
async def handle_platform_message(
    event: Event,
    platform: PlatformClient,  # Injected automatically
    logger: logging.Logger     # Also injected
) -> None:
    """Module using platform plugin"""
    username = event.data["user"]["display_name"]
    await platform.send_message(f"Hello {username}!")

Testing Plugins

Unit Testing

Test plugin components in isolation:

import pytest
from unittest.mock import AsyncMock, Mock
from your_plugin.client import PlatformClient

@pytest.mark.asyncio
async def test_platform_client_connect():
    """Test platform client connection"""
    config = Mock()
    config.api_key = "test-key"

    client = PlatformClient(config)
    client._establish_connection = AsyncMock()

    await client.connect()

    assert client.connected is True
    client._establish_connection.assert_called_once()

Integration Testing

Test plugin with event bus integration:

@pytest.mark.asyncio
async def test_plugin_event_emission():
    """Test plugin emits events correctly"""
    from starstreamer.core.event_bus import EventBus

    event_bus = EventBus()
    events_received = []

    @event_bus.on("platform.message")
    async def capture_event(event_data):
        events_received.append(event_data)

    # Test platform event triggers StarStreamer event
    plugin = PlatformClient()
    await plugin._handle_platform_event({
        "type": "message",
        "user": {"id": "123", "username": "testuser"},
        "content": "Hello world"
    })

    assert len(events_received) == 1
    assert events_received[0]["user"]["username"] == "testuser"

Plugin Examples

Simple HTTP API Plugin

class HTTPAPIPlugin:
    """Plugin for HTTP-based APIs"""

    def __init__(self, config: HTTPConfig):
        self.config = config
        self.session = None

    async def connect(self) -> None:
        import aiohttp
        self.session = aiohttp.ClientSession(
            headers={"Authorization": f"Bearer {self.config.api_key}"}
        )

    async def disconnect(self) -> None:
        if self.session:
            await self.session.close()

    async def send_request(self, endpoint: str, data: dict) -> dict:
        """Send API request"""
        async with self.session.post(f"{self.config.base_url}/{endpoint}", json=data) as response:
            return await response.json()

WebSocket-Based Plugin

class WebSocketPlugin:
    """Plugin for WebSocket-based platforms"""

    async def connect(self) -> None:
        import websockets
        self.websocket = await websockets.connect(self.config.websocket_url)
        asyncio.create_task(self._event_loop())

    async def _event_loop(self) -> None:
        """Handle incoming WebSocket messages"""
        async for message in self.websocket:
            data = json.loads(message)
            await self._handle_event(data)

Platform-Specific Considerations

Twitch

  • Uses EventSub WebSocket for real-time events
  • Helix API for actions (sending messages, moderation)
  • OAuth2 with specific scopes required

Discord

  • Uses Gateway WebSocket for events
  • REST API for sending messages
  • Bot token authentication

OBS (Future)

  • Uses obs-websocket protocol
  • Scene management and source control
  • Local WebSocket connection

YouTube (Future)

  • YouTube Live Streaming API
  • Chat messages via Live Chat API
  • OAuth2 with YouTube scopes

Next Steps


Ready to connect new platforms! 🚀

Create platform plugins that provide clean, consistent interfaces for modules to interact with any streaming service or tool.