Database¶
StarStreamer uses SQLite as its primary database for storing user data, economy information, chat logs, variables, and application state. The database system provides async support, automatic schema management, and optimized performance for streaming workloads.
Overview¶
The database system is built around several key components:
- Database Manager - Core SQLite connection and query interface
- Models - Data classes representing database entities
- Repositories - Type-safe data access layer with CRUD operations
- Services - Business logic layer that uses repositories
- Schema Management - Automatic table creation and optimization
Quick Start¶
Basic Database Usage¶
from starstreamer.db.database import get_database, init_database
# Get database instance
db = get_database("data/my_stream.db")
# Initialize database (creates tables, optimizes settings)
await init_database("data/my_stream.db")
# Use database
user_count = await db.fetchval("SELECT COUNT(*) FROM users")
print(f"Total users: {user_count}")
Using Services¶
from starstreamer.services import UserService, EconomyService
# Services are automatically injected in event handlers
@on_event("twitch.chat.message")
async def welcome_user(event: Event, users: UserService, economy: EconomyService) -> None:
user_data = event.data.get("user", {})
platform_user_id = user_data.get("id", "")
username = user_data.get("display_name", "Unknown")
# Get or create user by platform identity
user = await users.get_or_create_user_by_platform("twitch", platform_user_id, username)
# Check their balance using platform identity
balance = await economy.get_balance_by_platform("twitch", platform_user_id)
print(f"{username} has {balance} points")
Database Configuration¶
File Location¶
By default, StarStreamer stores the database at data/stream.db. You can customize this:
# Custom database path
db = get_database("custom/path/stream.db")
await db.initialize()
# In-memory database (testing only)
db = get_database(":memory:")
await db.initialize()
Environment Variables¶
# Custom database location
STARSTREAMER_DB_PATH=data/custom_stream.db
# Database settings (advanced)
STARSTREAMER_DB_CACHE_SIZE=16000 # 16MB cache
STARSTREAMER_DB_WAL_MODE=true # Enable WAL mode
Performance Optimization¶
StarStreamer automatically applies SQLite optimizations for streaming workloads:
-- Applied during initialization
PRAGMA journal_mode = WAL; -- Better concurrency
PRAGMA synchronous = NORMAL; -- Faster writes
PRAGMA cache_size = -16000; -- 16MB cache
PRAGMA temp_store = memory; -- Memory for temp tables
PRAGMA foreign_keys = ON; -- Enforce relationships
Database Schema¶
Core Tables¶
Users Table¶
Stores user information and statistics.
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
display_name TEXT,
points INTEGER DEFAULT 0,
watch_time INTEGER DEFAULT 0,
first_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
);
Key Fields:
- id - Internal user identifier
- points - Virtual currency balance
- watch_time - Total watch time in seconds
User Identities Table¶
Links platform accounts to internal users.
CREATE TABLE user_identities (
user_id INTEGER NOT NULL,
platform TEXT NOT NULL,
platform_user_id TEXT NOT NULL,
platform_username TEXT NOT NULL,
is_primary BOOLEAN DEFAULT 0,
linked_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (platform, platform_user_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
Key Fields:
- platform - Platform name (e.g., 'twitch', 'discord')
- platform_user_id - Platform-specific user identifier
- is_primary - Whether this is the user's primary identity
Events Table¶
Logs all events for analytics and debugging.
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
user_id INTEGER,
data JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
);
Chat Messages Table¶
Stores chat history for moderation and analytics.
CREATE TABLE chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
);
Commands Table¶
Stores custom chat commands.
CREATE TABLE commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
response TEXT NOT NULL,
enabled BOOLEAN DEFAULT 1,
cooldown_seconds INTEGER DEFAULT 0,
mod_only BOOLEAN DEFAULT 0,
usage_count INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Cooldowns Table¶
Tracks command cooldowns to prevent spam.
CREATE TABLE cooldowns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
action TEXT NOT NULL,
expires_at DATETIME NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id),
UNIQUE(user_id, action)
);
Variable Storage¶
Persistent Variables¶
Variables that survive application restarts.
CREATE TABLE persistent_variables (
name TEXT PRIMARY KEY,
value TEXT NOT NULL,
data_type TEXT DEFAULT 'string',
description TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Session Variables¶
Variables cleared when the application restarts.
CREATE TABLE session_variables (
name TEXT PRIMARY KEY,
value TEXT NOT NULL,
data_type TEXT DEFAULT 'string',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Group Management¶
Groups Table¶
Defines user groups for permissions and features.
CREATE TABLE groups (
name TEXT PRIMARY KEY,
description TEXT DEFAULT '',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Group Members Table¶
Maps users to groups.
CREATE TABLE group_members (
group_name TEXT NOT NULL,
username TEXT NOT NULL,
added_at DATETIME DEFAULT CURRENT_TIMESTAMP,
added_by TEXT DEFAULT '',
PRIMARY KEY (group_name, username),
FOREIGN KEY (group_name) REFERENCES groups(name) ON DELETE CASCADE
);
Database API¶
Database Class¶
The Database class provides the core interface for database operations:
from starstreamer.db.database import Database
db = Database("data/stream.db")
await db.initialize()
# Execute queries
await db.execute("INSERT INTO users (twitch_id, username) VALUES (?, ?)", ("123", "user"))
# Fetch data
user = await db.fetchone("SELECT * FROM users WHERE twitch_id = ?", ("123",))
users = await db.fetchall("SELECT * FROM users ORDER BY points DESC")
count = await db.fetchval("SELECT COUNT(*) FROM users")
# Transaction management
await db.commit()
await db.rollback()
# Cleanup
await db.close()
Context Manager Support¶
# Async context manager (recommended)
async with Database("data/stream.db") as db:
users = await db.fetchall("SELECT * FROM users")
# Database automatically closes
Database Statistics¶
stats = await db.get_stats()
print(f"Total users: {stats['users_count']}")
print(f"Database size: {stats['file_size_bytes']} bytes")
print(f"Total pages: {stats['total_pages']}")
Global Database Instance¶
from starstreamer.db.database import get_database, init_database
# Get the global instance
db = get_database()
# Initialize if needed
await init_database()
# Custom path
db = get_database("custom/path.db")
Services¶
UserService¶
Manages user data and operations using platform identities:
from starstreamer.services import UserService
# UserService is automatically injected in handlers
# For manual creation:
users = UserService(user_repo, identity_repo)
# Get or create user by platform identity
user = await users.get_or_create_user_by_platform("twitch", "twitch_user_123", "username")
# Update watch time using platform identity
await users.update_watch_time_by_platform("twitch", "twitch_user_123", 300) # 5 minutes
# Get user statistics using platform identity
stats = await users.get_user_stats_by_platform("twitch", "twitch_user_123")
print(f"Points: {stats['points']}, Watch time: {stats['watch_time']}s")
# Link additional platform to existing user
await users.link_platform_identity(user.id, "discord", "discord_user_456", "discord_name")
# Get all platform identities for a user
identities = await users.get_user_identities(user.id)
EconomyService¶
Handles virtual currency and economy features using platform identities:
from starstreamer.services import EconomyService
# EconomyService is automatically injected in handlers
# For manual creation:
economy = EconomyService(economy_repo, identity_repo)
# Check balance using platform identity
balance = await economy.get_balance_by_platform("twitch", "twitch_user_123")
# Add money using platform identity
new_balance = await economy.add_money_by_platform("twitch", "twitch_user_123", 100, "Daily bonus")
# Spend money using platform identity
success = await economy.spend_money_by_platform("twitch", "twitch_user_123", 50, "Store purchase")
# Transfer between users using platform identities
success = await economy.transfer_money_by_platform(
"twitch", "twitch_user_123", # From
"twitch", "twitch_user_456", # To
25, "Gift"
)
# Set exact balance using platform identity
new_balance = await economy.set_balance_by_platform("twitch", "twitch_user_123", 500, "Reset balance")
# Get leaderboard (returns usernames with balances)
top_users = await economy.get_leaderboard(limit=10)
for username, balance in top_users:
print(f"{username}: {balance} points")
# Economy statistics
stats = await economy.get_economy_stats()
print(f"Total users: {stats['total_users']}")
print(f"Total money in circulation: {stats['total_money']}")
Variable System¶
VariableRepository¶
StarStreamer provides a powerful variable system similar to Streamer.bot:
from starstreamer.db.repositories.variable_repository import VariableRepository
variables = VariableRepository(db)
# Set variables (persistent by default)
await variables.set("counter", 0, description="Stream counter")
await variables.set("last_follower", "john_doe")
await variables.set("stream_config", {"quality": "1080p", "bitrate": 6000})
# Get variables with type conversion
counter = await variables.get("counter", default=0) # Returns int
name = await variables.get("last_follower", default="") # Returns str
config = await variables.get("stream_config", default={}) # Returns dict
# Session variables (cleared on restart)
await variables.set("current_song", "Epic Song", persistent=False)
song = await variables.get("current_song", persistent=False)
# Increment numeric variables
new_count = await variables.increment("counter", amount=1)
print(f"Counter: {new_count}")
# Append to lists
await variables.set("recent_followers", [])
followers = await variables.append_to_list("recent_followers", "new_user", max_length=10)
# Check existence
if await variables.exists("special_event"):
event_data = await variables.get("special_event")
# List all variables
persistent_vars = await variables.list_variables(persistent=True)
session_vars = await variables.list_variables(persistent=False)
# Delete variables
await variables.delete("old_variable")
# Clear all session variables
cleared_count = await variables.clear_session_variables()
Variable Types¶
Variables support automatic type conversion:
# String variables
await variables.set("username", "streamer_bob")
name = await variables.get("username") # Returns: "streamer_bob"
# Numeric variables
await variables.set("viewer_count", 150)
count = await variables.get("viewer_count") # Returns: 150 (int)
await variables.set("donation_goal", 500.75)
goal = await variables.get("donation_goal") # Returns: 500.75 (float)
# Boolean variables
await variables.set("stream_live", True)
is_live = await variables.get("stream_live") # Returns: True (bool)
# JSON variables (lists, dicts)
await variables.set("stream_settings", {
"quality": "1080p",
"fps": 60,
"alerts": ["follow", "sub", "donation"]
})
settings = await variables.get("stream_settings") # Returns: dict
Using Variables in Commands¶
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!counter"))
async def counter_command(event: Event, twitch: TwitchClient, variables: VariableRepository) -> None:
"""Increment and display counter"""
count = await variables.increment("stream_counter", default=0)
await twitch.send_message(f"Stream counter: {count}")
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!setgoal"))
async def set_goal_command(event: Event, twitch: TwitchClient, variables: VariableRepository) -> None:
"""Set donation goal: !setgoal 1000"""
message = event.data.get("message", "")
parts = message.split()
if len(parts) < 2:
await twitch.send_message("Usage: !setgoal <amount>")
return
try:
goal = float(parts[1])
await variables.set("donation_goal", goal, description="Current donation goal")
await twitch.send_message(f"Donation goal set to ${goal}")
except ValueError:
await twitch.send_message("Invalid amount!")
Database Maintenance¶
Backup and Restore¶
# Create backup
await db.backup("backups/stream_backup_2023_12_01.db")
# Restore from backup (manual process)
# 1. Stop the application
# 2. Replace the main database file with backup
# 3. Restart the application
Vacuum and Optimization¶
# Reclaim space and optimize
await db.vacuum()
# Get database statistics to monitor health
stats = await db.get_stats()
print(f"Database size: {stats['file_size_bytes']} bytes")
print(f"Total events: {stats['events_count']}")
Cleanup Old Data¶
# Clean up old events (older than 30 days)
await db.execute("""
DELETE FROM events
WHERE created_at < datetime('now', '-30 days')
""")
# Clean up old chat messages (older than 7 days)
await db.execute("""
DELETE FROM chat_messages
WHERE timestamp < datetime('now', '-7 days')
""")
# Clean up expired cooldowns
await db.execute("""
DELETE FROM cooldowns
WHERE expires_at < datetime('now')
""")
await db.commit()
Integration with Dependency Injection¶
Services are automatically registered with the DI container and injected into event handlers:
@on_event("twitch.chat.message")
async def my_handler(
event: Event,
twitch: TwitchClient,
users: UserService, # Automatically injected
economy: EconomyService, # Automatically injected
variables: VariableRepository, # Automatically injected
db: Database # Automatically injected
) -> None:
# Use services directly
user = await users.get_or_create_user("123", "username")
balance = await economy.get_balance("123")
config = await variables.get("app_config", default={})
Manual Service Creation¶
For advanced use cases, create services manually:
from starstreamer.db.database import get_database
from starstreamer.services import UserService, EconomyService
from starstreamer.db.repositories.variable_repository import VariableRepository
db = get_database()
users = UserService(db)
economy = EconomyService(db)
variables = VariableRepository(db)
# Use services
await users.get_or_create_user("123", "test_user")
Advanced Usage¶
Raw SQL Queries¶
For complex operations, use raw SQL:
# Complex analytics query
top_chatters = await db.fetchall("""
SELECT u.username, COUNT(m.id) as message_count
FROM users u
JOIN chat_messages m ON u.id = m.user_id
WHERE m.timestamp > datetime('now', '-7 days')
GROUP BY u.id, u.username
ORDER BY message_count DESC
LIMIT 10
""")
for username, count in top_chatters:
print(f"{username}: {count} messages")
Transaction Management¶
try:
# Start transaction (autocommit is disabled by default)
await db.execute("INSERT INTO users (twitch_id, username) VALUES (?, ?)", ("123", "user1"))
await db.execute("INSERT INTO users (twitch_id, username) VALUES (?, ?)", ("456", "user2"))
# Commit transaction
await db.commit()
print("Transaction successful")
except Exception as e:
# Rollback on error
await db.rollback()
print(f"Transaction failed: {e}")
Custom Models¶
Create custom data classes for specific use cases:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class StreamSession:
id: int | None = None
start_time: datetime | None = None
end_time: datetime | None = None
title: str = ""
category: str = ""
viewer_count: int = 0
async def log_stream_session(db: Database, session: StreamSession) -> None:
"""Log a stream session to a custom table"""
await db.execute("""
INSERT INTO stream_sessions (start_time, end_time, title, category, viewer_count)
VALUES (?, ?, ?, ?, ?)
""", (session.start_time, session.end_time, session.title, session.category, session.viewer_count))
await db.commit()
Performance Considerations¶
Indexing¶
StarStreamer automatically creates indexes for common queries:
-- Automatically created indexes
CREATE INDEX idx_events_type ON events(type);
CREATE INDEX idx_events_created ON events(created_at);
CREATE INDEX idx_cooldowns_expires ON cooldowns(expires_at);
CREATE INDEX idx_chat_timestamp ON chat_messages(timestamp);
CREATE INDEX idx_users_twitch_id ON users(twitch_id);
Query Optimization¶
# ✅ Good: Use parameterized queries
users = await db.fetchall("SELECT * FROM users WHERE points > ?", (100,))
# ✅ Good: Limit results for large tables
recent_messages = await db.fetchall("""
SELECT * FROM chat_messages
ORDER BY timestamp DESC
LIMIT 100
""")
# ❌ Avoid: String concatenation (SQL injection risk)
# users = await db.fetchall(f"SELECT * FROM users WHERE username = '{username}'")
# ✅ Good: Use indexes for WHERE clauses
user = await db.fetchone("SELECT * FROM users WHERE twitch_id = ?", (user_id,))
Memory Management¶
# Monitor database size
stats = await db.get_stats()
if stats['file_size_bytes'] > 100_000_000: # 100MB
logger.warning("Database size is large, consider cleanup")
# Regular cleanup of old data
async def cleanup_old_data():
"""Clean up old data to keep database manageable"""
# Keep only last 30 days of events
await db.execute("DELETE FROM events WHERE created_at < datetime('now', '-30 days')")
# Keep only last 7 days of chat messages
await db.execute("DELETE FROM chat_messages WHERE timestamp < datetime('now', '-7 days')")
# Clean expired cooldowns
await db.execute("DELETE FROM cooldowns WHERE expires_at < datetime('now')")
await db.commit()
await db.vacuum() # Reclaim space
Testing¶
Test Database Setup¶
import pytest
from starstreamer.db.database import Database
@pytest.fixture
async def test_db():
"""Create test database in memory"""
db = Database(":memory:")
await db.initialize()
yield db
await db.close()
@pytest.mark.asyncio
async def test_user_creation(test_db):
"""Test user creation"""
await test_db.execute(
"INSERT INTO users (twitch_id, username) VALUES (?, ?)",
("test_123", "test_user")
)
user = await test_db.fetchone(
"SELECT twitch_id, username FROM users WHERE twitch_id = ?",
("test_123",)
)
assert user[0] == "test_123"
assert user[1] == "test_user"
Service Testing¶
@pytest.mark.asyncio
async def test_economy_service(test_db):
"""Test economy service operations"""
from starstreamer.services import EconomyService
economy = EconomyService(test_db)
# Test adding money
balance = await economy.add_money("user_123", 100, "Test bonus")
assert balance == 100
# Test spending money
success = await economy.spend_money("user_123", 50, "Test purchase")
assert success is True
final_balance = await economy.get_balance("user_123")
assert final_balance == 50
Migration and Upgrades¶
Schema Changes¶
StarStreamer handles schema updates automatically through the SCHEMA_SQL definition. For major changes:
- Backup the database before updating
- Test schema changes on a copy first
- Update the
SCHEMA_SQLinmodels.py - Add migration logic if needed for existing data
Data Migration Example¶
async def migrate_user_data():
"""Example migration for adding new user fields"""
# Check if migration is needed
columns = await db.fetchall("PRAGMA table_info(users)")
has_new_field = any(col[1] == "new_field" for col in columns)
if not has_new_field:
# Add new column
await db.execute("ALTER TABLE users ADD COLUMN new_field TEXT DEFAULT ''")
# Update existing data
await db.execute("UPDATE users SET new_field = 'default_value'")
await db.commit()
logger.info("User data migration completed")
Troubleshooting¶
Common Issues¶
Database Locked:
# If you get "database is locked" errors
# Ensure proper connection management:
async with Database("data/stream.db") as db:
# Use database
pass # Connection automatically closed
Performance Issues:
# Monitor query performance
import time
start_time = time.time()
result = await db.fetchall("SELECT * FROM large_table")
elapsed = time.time() - start_time
if elapsed > 1.0: # Slow query
logger.warning(f"Slow query took {elapsed:.2f}s")
Corruption Recovery:
# Check database integrity
try:
result = await db.fetchval("PRAGMA integrity_check")
if result != "ok":
logger.error(f"Database integrity issues: {result}")
# Restore from backup or rebuild
except Exception as e:
logger.error(f"Database corruption detected: {e}")
# Restore from backup
Debugging¶
# Enable SQLite debugging
import logging
logging.getLogger("aiosqlite").setLevel(logging.DEBUG)
# Log all database operations
class DebugDatabase(Database):
async def execute(self, query: str, parameters=None):
logger.debug(f"SQL: {query} | Params: {parameters}")
return await super().execute(query, parameters)
Best Practices¶
- Always use parameterized queries to prevent SQL injection
- Close connections properly using async context managers
- Backup regularly before schema changes or major updates
- Monitor database size and clean up old data periodically
- Use transactions for multi-step operations
- Test database operations thoroughly with unit tests
- Use services and repositories instead of raw SQL when possible
- Index frequently queried columns for better performance
See Also¶
- Services - Understanding the service layer
- Dependency Injection - How services are injected
- Testing Guide - Testing database operations
- Custom Commands Examples - Using database in commands