Platform Plugin Development Guide¶
StarStreamer uses a plugin architecture to integrate with external platforms like Twitch, OBS, Discord, and other streaming services. Plugins provide the connection layer that modules use to interact with these platforms.
Overview¶
In StarStreamer's architecture:
- Plugins - Platform integrations that connect to external services (Twitch, OBS, Discord)
- Modules - Feature implementations that use plugins to provide functionality (chat commands, alerts, economy)
This separation allows for clean, testable code where business logic (modules) is decoupled from platform-specific details (plugins).
Plugin Architecture¶
Plugin vs Module¶
- Plugin = Platform integration layer (connects to Twitch, OBS, etc.)
- Module = Feature layer (chat commands, alerts, etc. using plugins)
Plugin Structure¶
src/starstreamer/plugins/
├── __init__.py # Plugin exports
├── twitch/ # Twitch platform plugin
│ ├── __init__.py # TwitchClient export
│ ├── client.py # Main plugin orchestrator
│ ├── api.py # Helix API integration
│ ├── eventsub.py # EventSub WebSocket manager
│ └── models.py # Twitch data models
├── obs/ # OBS WebSocket plugin
│ ├── __init__.py # OBSClient export
│ ├── client.py # Main OBS client (110+ methods)
│ └── models.py # OBS configuration models
├── litellm/ # AI integration plugin
│ ├── __init__.py # AIClient export
│ └── client.py # LiteLLM/OpenRouter client
└── elevenlabs/ # Text-to-speech plugin
├── __init__.py # ElevenLabsClient export
├── client.py # ElevenLabs API client
└── models.py # Voice and TTS models
Understanding the Twitch Plugin¶
The Twitch plugin serves as the reference implementation for all platform plugins.
Components¶
client.py - Main Orchestrator
class TwitchClient:
"""Twitch API client with EventSub WebSocket support"""
def __init__(self, config: TwitchConfig | None = None):
self.config = config
self.api = HelixAPI(self) # REST API client
self.eventsub = EventSubManager(self) # WebSocket events
async def connect(self) -> None:
"""Connect to Twitch services"""
await self.eventsub.connect()
# Convenience methods that delegate to API
async def send_message(self, message: str) -> bool:
return await self.api.send_chat_message(message)
api.py - REST API Integration
class HelixAPI:
"""Twitch Helix API client"""
async def send_chat_message(self, message: str) -> bool:
"""Send chat message via Helix API"""
# Implementation for POST /chat/messages
async def get_user_info(self, user_id: str) -> dict:
"""Get user information"""
# Implementation for GET /users
eventsub.py - Event Stream Manager
class EventSubManager:
"""Manages Twitch EventSub WebSocket connection"""
async def connect(self) -> None:
"""Connect to EventSub WebSocket"""
# WebSocket connection and subscription management
async def _handle_event(self, event_data: dict) -> None:
"""Transform and emit events to StarStreamer event bus"""
# Convert Twitch events to StarStreamer events
How Modules Use Plugins¶
Modules access plugins through dependency injection:
# In a module's action handler
from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger
from starstreamer.plugins.twitch import TwitchClient
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!hello"))
async def hello_command(event: Event, twitch: TwitchClient) -> None:
"""Example of module using Twitch plugin"""
username = event.data['user']['display_name']
await twitch.send_message(f"Hello {username}!") # Uses plugin
The TwitchClient is injected by the dependency injection system, providing modules with a clean interface to platform functionality.
Understanding the LiteLLM Plugin¶
The LiteLLM plugin provides AI integration capabilities through OpenRouter, demonstrating how to integrate third-party APIs with StarStreamer.
Plugin Architecture¶
client.py - AI Client
class AIClient:
"""AI client using OpenRouter through LiteLLM for language model access"""
_instance: "AIClient | None" = None # Singleton pattern
def __init__(self, config: AIConfig | None = None):
self.config = config
self.connected = False
@classmethod
def get_instance(cls) -> "AIClient":
"""Get singleton instance"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
async def complete(self, prompt: str, **kwargs: Any) -> AIResponse:
"""Complete a single prompt"""
messages = [{"role": "user", "content": prompt}]
return await self.chat(messages, **kwargs)
async def chat(self, messages: list[dict[str, str]], **kwargs: Any) -> AIResponse:
"""Chat with conversation history"""
import litellm
params = {
"model": kwargs.get("model", "openrouter/anthropic/claude-3.5-sonnet"), # Default model
"messages": messages,
"temperature": kwargs.get("temperature", 0.7), # Default temperature
"max_tokens": kwargs.get("max_tokens", 150), # Default max_tokens
}
response = await litellm.acompletion(**params)
return AIResponse(
content=response["choices"][0]["message"]["content"],
model=response["model"],
usage=response.get("usage", {}),
finish_reason=response["choices"][0].get("finish_reason"),
)
Key Features¶
Singleton Pattern: Ensures efficient resource management with a single AI client instance.
LiteLLM Integration: Provides unified interface to multiple AI providers through OpenRouter.
Response Wrapper: AIResponse class provides structured access to AI responses and metadata.
Module Usage Example¶
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!ask"))
async def ask_command(event: Event, ai: AIClient, twitch: TwitchClient) -> None:
"""AI-powered question answering using LiteLLM plugin"""
question = event.data.get("message", "").split(maxsplit=1)[1]
# Use Claude 3.5 Sonnet for question answering
response = await ai.complete(
f"Answer this question concisely: {question}",
model="openrouter/anthropic/claude-3.5-sonnet"
)
await twitch.send_message(f"AI: {response.content}")
Configuration Integration¶
The plugin integrates with StarStreamer's configuration system:
ai:
openrouter:
enabled: true
api_key: "${OPENROUTER_API_KEY}"
# Models are specified at the module level, not in configuration
Creating a New Platform Plugin¶
Step 1: Plugin Structure¶
Create a new plugin directory under src/starstreamer/plugins/:
mkdir -p src/starstreamer/plugins/discord
touch src/starstreamer/plugins/discord/__init__.py
touch src/starstreamer/plugins/discord/client.py
touch src/starstreamer/plugins/discord/gateway.py
touch src/starstreamer/plugins/discord/models.py
Step 2: Define Plugin Models¶
models.py:
"""Discord plugin data models"""
from pydantic import BaseModel
class DiscordConfig(BaseModel):
"""Discord bot configuration"""
bot_token: str
guild_id: str | None = None
command_prefix: str = "!"
class DiscordUser(BaseModel):
"""Discord user representation"""
id: str
username: str
display_name: str
avatar_url: str | None = None
class DiscordMessage(BaseModel):
"""Discord message representation"""
id: str
content: str
author: DiscordUser
channel_id: str
guild_id: str | None = None
Step 3: Implement Main Client¶
client.py:
"""Discord client that orchestrates Discord integrations"""
import logging
from typing import Any
from starstreamer.models import DiscordConfig
from .gateway import DiscordGateway
logger = logging.getLogger(__name__)
class DiscordClient:
"""Discord bot client with gateway support"""
def __init__(self, config: DiscordConfig | None = None) -> None:
if config is None:
import os
config = DiscordConfig(
bot_token=os.getenv("DISCORD_BOT_TOKEN", ""),
guild_id=os.getenv("DISCORD_GUILD_ID"),
command_prefix=os.getenv("DISCORD_PREFIX", "!"),
)
self.config = config
self.gateway = DiscordGateway(self)
self.connected = False
async def connect(self) -> None:
"""Connect to Discord gateway"""
if not self.config.bot_token:
raise ValueError("Discord bot token is required")
await self.gateway.connect()
self.connected = True
logger.info("Connected to Discord")
async def disconnect(self) -> None:
"""Disconnect from Discord gateway"""
await self.gateway.disconnect()
self.connected = False
logger.info("Disconnected from Discord")
async def send_message(self, channel_id: str, content: str) -> bool:
"""Send message to Discord channel"""
return await self.gateway.send_message(channel_id, content)
async def send_embed(self, channel_id: str, embed: dict[str, Any]) -> bool:
"""Send embed to Discord channel"""
return await self.gateway.send_embed(channel_id, embed)
Step 4: Implement Event Gateway¶
gateway.py:
"""Discord gateway WebSocket manager"""
import asyncio
import json
import logging
from typing import Any
import websockets
from starstreamer.core.event_bus import get_event_bus
logger = logging.getLogger(__name__)
class DiscordGateway:
"""Manages Discord gateway WebSocket connection"""
def __init__(self, client: "DiscordClient"):
self.client = client
self.websocket = None
self.heartbeat_task = None
async def connect(self) -> None:
"""Connect to Discord gateway"""
# Implementation for Discord gateway connection
gateway_url = "wss://gateway.discord.gg/?v=10&encoding=json"
self.websocket = await websockets.connect(gateway_url)
# Start heartbeat and event handling
self.heartbeat_task = asyncio.create_task(self._heartbeat_loop())
asyncio.create_task(self._event_loop())
async def disconnect(self) -> None:
"""Disconnect from gateway"""
if self.heartbeat_task:
self.heartbeat_task.cancel()
if self.websocket:
await self.websocket.close()
async def send_message(self, channel_id: str, content: str) -> bool:
"""Send message via REST API"""
# Implementation for Discord REST API message sending
return True
async def _event_loop(self) -> None:
"""Handle incoming gateway events"""
async for message in self.websocket:
try:
data = json.loads(message)
await self._handle_event(data)
except Exception as e:
logger.error(f"Error handling Discord event: {e}")
async def _handle_event(self, event_data: dict[str, Any]) -> None:
"""Transform Discord events to StarStreamer events"""
event_bus = get_event_bus()
# Transform Discord events to StarStreamer format
if event_data.get("t") == "MESSAGE_CREATE":
message_data = event_data["d"]
# Convert to StarStreamer event format
starstreamer_event = {
"type": "discord.message",
"data": {
"message": {
"id": message_data["id"],
"content": message_data["content"],
},
"user": {
"id": message_data["author"]["id"],
"username": message_data["author"]["username"],
"display_name": message_data["author"].get("display_name") or message_data["author"]["username"],
},
"channel_id": message_data["channel_id"],
},
"source": "discord",
"timestamp": message_data["timestamp"],
}
await event_bus.emit("discord.message", starstreamer_event["data"])
Step 5: Export Plugin¶
__init__.py:
"""Discord platform plugin for StarStreamer"""
from .client import DiscordClient
__all__ = ["DiscordClient"]
Step 6: Register with Dependency Injection¶
src/starstreamer/plugins/__init__.py:
"""Platform integrations for StarStreamer"""
from .twitch import TwitchClient
from .discord import DiscordClient
__all__ = ["TwitchClient", "DiscordClient"]
Step 7: Configure in Main Application¶
src/main.py:
# Add Discord to dependency injection
from starstreamer.plugins.discord import DiscordClient
# In setup_dependencies()
discord_config = DiscordConfig() # From environment
discord_client = DiscordClient(discord_config)
container.register_singleton(DiscordClient, discord_client)
# In startup
await discord_client.connect()
Plugin Patterns and Best Practices¶
1. Configuration Management¶
Use Pydantic models for type-safe configuration:
class PlatformConfig(BaseModel):
"""Base configuration pattern"""
api_key: str
base_url: str = "https://api.platform.com"
timeout: int = 30
@classmethod
def from_env(cls) -> "PlatformConfig":
"""Load from environment variables"""
import os
return cls(
api_key=os.getenv("PLATFORM_API_KEY", ""),
base_url=os.getenv("PLATFORM_BASE_URL", cls.__fields__["base_url"].default),
timeout=int(os.getenv("PLATFORM_TIMEOUT", str(cls.__fields__["timeout"].default))),
)
2. Event Transformation¶
Always transform platform events to StarStreamer's standard format:
async def _transform_platform_event(self, platform_event: dict) -> dict:
"""Transform platform event to StarStreamer format"""
return {
"type": f"platform.{platform_event['event_type']}",
"data": {
"user": {
"id": platform_event["user"]["id"],
"username": platform_event["user"]["login"],
"display_name": platform_event["user"]["display_name"],
},
"message": platform_event.get("message", ""),
},
"source": "platform",
"timestamp": platform_event["timestamp"],
}
3. Connection Management¶
Implement proper lifecycle management:
class PlatformClient:
async def __aenter__(self) -> "PlatformClient":
"""Async context manager entry"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit"""
await self.disconnect()
4. Error Handling¶
Handle connection failures gracefully:
async def connect(self) -> None:
"""Connect with retry logic"""
max_retries = 3
for attempt in range(max_retries):
try:
await self._establish_connection()
self.connected = True
return
except Exception as e:
logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
5. Dependency Injection Integration¶
Make plugins available to modules via DI:
# In module handlers
@on_event("platform.message")
async def handle_platform_message(
event: Event,
platform: PlatformClient, # Injected automatically
logger: logging.Logger # Also injected
) -> None:
"""Module using platform plugin"""
username = event.data["user"]["display_name"]
await platform.send_message(f"Hello {username}!")
Testing Plugins¶
Unit Testing¶
Test plugin components in isolation:
import pytest
from unittest.mock import AsyncMock, Mock
from your_plugin.client import PlatformClient
@pytest.mark.asyncio
async def test_platform_client_connect():
"""Test platform client connection"""
config = Mock()
config.api_key = "test-key"
client = PlatformClient(config)
client._establish_connection = AsyncMock()
await client.connect()
assert client.connected is True
client._establish_connection.assert_called_once()
Integration Testing¶
Test plugin with event bus integration:
@pytest.mark.asyncio
async def test_plugin_event_emission():
"""Test plugin emits events correctly"""
from starstreamer.core.event_bus import EventBus
event_bus = EventBus()
events_received = []
@event_bus.on("platform.message")
async def capture_event(event_data):
events_received.append(event_data)
# Test platform event triggers StarStreamer event
plugin = PlatformClient()
await plugin._handle_platform_event({
"type": "message",
"user": {"id": "123", "username": "testuser"},
"content": "Hello world"
})
assert len(events_received) == 1
assert events_received[0]["user"]["username"] == "testuser"
Plugin Examples¶
Simple HTTP API Plugin¶
class HTTPAPIPlugin:
"""Plugin for HTTP-based APIs"""
def __init__(self, config: HTTPConfig):
self.config = config
self.session = None
async def connect(self) -> None:
import aiohttp
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.config.api_key}"}
)
async def disconnect(self) -> None:
if self.session:
await self.session.close()
async def send_request(self, endpoint: str, data: dict) -> dict:
"""Send API request"""
async with self.session.post(f"{self.config.base_url}/{endpoint}", json=data) as response:
return await response.json()
WebSocket-Based Plugin¶
class WebSocketPlugin:
"""Plugin for WebSocket-based platforms"""
async def connect(self) -> None:
import websockets
self.websocket = await websockets.connect(self.config.websocket_url)
asyncio.create_task(self._event_loop())
async def _event_loop(self) -> None:
"""Handle incoming WebSocket messages"""
async for message in self.websocket:
data = json.loads(message)
await self._handle_event(data)
Platform-Specific Considerations¶
Twitch¶
- Uses EventSub WebSocket for real-time events
- Helix API for actions (sending messages, moderation)
- OAuth2 with specific scopes required
Discord¶
- Uses Gateway WebSocket for events
- REST API for sending messages
- Bot token authentication
OBS (Future)¶
- Uses obs-websocket protocol
- Scene management and source control
- Local WebSocket connection
YouTube (Future)¶
- YouTube Live Streaming API
- Chat messages via Live Chat API
- OAuth2 with YouTube scopes
Next Steps¶
- Study the Twitch plugin implementation as a reference
- Learn about Module Development to use plugins in features
- Understand Dependency Injection for plugin access
- Check out Event System for event transformation patterns
Ready to connect new platforms! 🚀
Create platform plugins that provide clean, consistent interfaces for modules to interact with any streaming service or tool.