Cooldowns¶
StarStreamer's cooldown system prevents command spam and rate-limits actions to maintain a healthy chat experience. The system supports both per-user and global cooldowns with flexible configuration options.
Overview¶
The cooldown system works by tracking when commands or actions were last executed and preventing re-execution until a specified time period has elapsed. This helps:
- Prevent Spam - Stop users from repeatedly triggering the same command
- Manage Resources - Rate-limit expensive operations like API calls
- Improve Chat Quality - Reduce noise from command overuse
- Fair Usage - Ensure all users get a chance to use commands
Quick Start¶
Basic Per-User Cooldown¶
from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger, CooldownTrigger
from starstreamer.plugins.twitch import TwitchClient
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(300, per_user=True))  # 5 minutes per user
async def work_command(event: Event, twitch: TwitchClient) -> None:
    """Work command with per-user cooldown"""
    user = event.data.get("user", {})
    username = user.get("display_name", "Unknown")
    # Command logic here
    await twitch.send_message(f"@{username} You worked hard and earned coins!")
Global Cooldown¶
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!uptime"))
@trigger(CooldownTrigger(30, per_user=False))  # 30-second global cooldown
async def uptime_command(event: Event, twitch: TwitchClient) -> None:
    """Uptime command with global cooldown"""
    uptime = await get_stream_uptime()
    await twitch.send_message(f"Stream has been live for: {uptime}")
CooldownTrigger API¶
Constructor¶
CooldownTrigger(
    cooldown_seconds: float,
    per_user: bool = True,
    message: str | None = None,
    action_name: str | None = None,
    tracker: CooldownTracker | None = None
)
Parameters:
- cooldown_seconds- Duration of the cooldown in seconds
- per_user- If- True, cooldown applies per user. If- False, global cooldown for all users
- message- Optional custom message when command is on cooldown (supports- {remaining}placeholder)
- action_name- Optional custom action name for tracking (defaults to command name)
- tracker- Optional custom cooldown tracker instance
Basic Examples¶
Per-User Cooldowns¶
# 5-minute per-user cooldown
@trigger(CooldownTrigger(300))
async def daily_bonus(event: Event, twitch: TwitchClient) -> None:
    """Daily bonus with 5-minute cooldown per user"""
    pass
# 1-hour per-user cooldown
@trigger(CooldownTrigger(3600, per_user=True))
async def big_reward(event: Event, twitch: TwitchClient) -> None:
    """Big reward with 1-hour cooldown per user"""
    pass
Global Cooldowns¶
# 10-second global cooldown
@trigger(CooldownTrigger(10, per_user=False))
async def server_stats(event: Event, twitch: TwitchClient) -> None:
    """Server stats with global cooldown to prevent API spam"""
    pass
# 2-minute global cooldown for expensive operations
@trigger(CooldownTrigger(120, per_user=False))
async def generate_clip(event: Event, twitch: TwitchClient) -> None:
    """Clip generation with global cooldown"""
    pass
Custom Messages¶
# Custom cooldown message
@trigger(CooldownTrigger(
    60,
    message="Please wait {remaining:.0f} seconds before working again!"
))
async def work_with_message(event: Event, twitch: TwitchClient) -> None:
    """Work command with custom cooldown message"""
    pass
# More detailed message
@trigger(CooldownTrigger(
    300,
    message="@{username} You must wait {remaining:.1f} more seconds. Work cooldown prevents spam!"
))
async def detailed_cooldown(event: Event, twitch: TwitchClient) -> None:
    """Command with detailed cooldown message"""
    pass
Custom Action Names¶
# Custom action name for shared cooldowns
@trigger(CooldownTrigger(60, action_name="economy_command"))
async def balance_check(event: Event, twitch: TwitchClient) -> None:
    """Balance command sharing cooldown with other economy commands"""
    pass
@trigger(CooldownTrigger(60, action_name="economy_command"))
async def give_money(event: Event, twitch: TwitchClient) -> None:
    """Give money command sharing the same cooldown pool"""
    pass
Cooldown Types¶
Per-User Cooldowns (Default)¶
Per-user cooldowns track usage separately for each user:
@trigger(CooldownTrigger(60))  # per_user=True by default
async def personal_command(event: Event, twitch: TwitchClient) -> None:
    """Each user has their own 60-second cooldown"""
    pass
Behavior: - User A uses command → User A on cooldown for 60 seconds - User B can still use the command immediately - After 60 seconds, User A can use the command again
Use Cases:
- Economy commands (!work, !daily, !gamble)
- Personal stats (!balance, !profile)
- User-specific actions (!hug, !pet)
Global Cooldowns¶
Global cooldowns affect all users collectively:
@trigger(CooldownTrigger(30, per_user=False))
async def global_command(event: Event, twitch: TwitchClient) -> None:
    """All users share the same 30-second cooldown"""
    pass
Behavior: - Any user uses command → All users on cooldown for 30 seconds - No user can use the command until cooldown expires
Use Cases:
- API-heavy commands (!weather, !news)
- Resource-intensive operations (!clip, !screenshot)
- Rate-limited external services
- Commands that affect the entire stream
Advanced Usage¶
Combining with Other Triggers¶
Cooldowns work seamlessly with other triggers:
from starstreamer.triggers import ModOnlyTrigger, SubscriberOnlyTrigger
# Moderator-only command with cooldown
@trigger(CommandTrigger("!clear"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(10, per_user=False))
async def clear_chat(event: Event, twitch: TwitchClient) -> None:
    """Mod-only chat clear with global cooldown"""
    pass
# Subscriber command with per-user cooldown
@trigger(CommandTrigger("!vip"))
@trigger(SubscriberOnlyTrigger())
@trigger(CooldownTrigger(300))
async def subscriber_feature(event: Event, twitch: TwitchClient) -> None:
    """Subscriber-only feature with 5-minute per-user cooldown"""
    pass
Multiple Cooldown Layers¶
Different commands can share cooldown pools:
# Shared "social" command cooldown
SOCIAL_COOLDOWN = CooldownTrigger(30, action_name="social_commands")
@trigger(CommandTrigger("!discord"))
@trigger(SOCIAL_COOLDOWN)
async def discord_link(event: Event, twitch: TwitchClient) -> None:
    """Discord link sharing social cooldown"""
    pass
@trigger(CommandTrigger("!twitter"))
@trigger(SOCIAL_COOLDOWN)
async def twitter_link(event: Event, twitch: TwitchClient) -> None:
    """Twitter link sharing social cooldown"""
    pass
@trigger(CommandTrigger("!youtube"))
@trigger(SOCIAL_COOLDOWN)
async def youtube_link(event: Event, twitch: TwitchClient) -> None:
    """YouTube link sharing social cooldown"""
    pass
Dynamic Cooldowns¶
Adjust cooldown durations based on conditions:
@trigger(CommandTrigger("!work"))
async def dynamic_work_command(event: Event, twitch: TwitchClient, economy: EconomyService) -> None:
    """Work command with dynamic cooldown based on user level"""
    user = event.data.get("user", {})
    user_id = user.get("id", "")
    username = user.get("display_name", "Unknown")
    # Get user level
    user_profile = await economy.get_user_profile(user_id)
    level = user_profile.level
    # Calculate dynamic cooldown (higher level = shorter cooldown)
    base_cooldown = 300  # 5 minutes
    cooldown_reduction = min(level * 10, 120)  # Max 2-minute reduction
    actual_cooldown = base_cooldown - cooldown_reduction
    # Check cooldown manually
    from starstreamer.core.cooldowns import get_cooldown_tracker
    tracker = get_cooldown_tracker()
    if not tracker.check_cooldown(user_id, "!work", actual_cooldown):
        remaining = tracker.get_remaining_cooldown(user_id, "!work", actual_cooldown)
        await twitch.send_message(
            f"@{username} Please wait {remaining:.0f} seconds before working again!"
        )
        return
    # Execute work command
    await twitch.send_message(f"@{username} You worked hard! (Cooldown: {actual_cooldown}s)")
Cooldown Messages¶
Default Messages¶
Without custom messages, StarStreamer provides default cooldown feedback:
@trigger(CooldownTrigger(60))
async def simple_command(event: Event, twitch: TwitchClient) -> None:
    """Uses default cooldown message"""
    pass
Default message: "Please wait {remaining:.0f} seconds before using this command again."
Custom Message Formatting¶
Custom messages support placeholder formatting:
@trigger(CooldownTrigger(
    300,
    message="⏰ {remaining:.0f} seconds remaining until you can work again!"
))
async def work_with_timer(event: Event, twitch: TwitchClient) -> None:
    """Work command with timer emoji"""
    pass
@trigger(CooldownTrigger(
    60,
    message="Slow down there, {username}! Wait {remaining:.1f}s before trying again."
))
async def rate_limited_command(event: Event, twitch: TwitchClient) -> None:
    """Command with personalized cooldown message"""
    pass
Available placeholders:
- {remaining} - Remaining cooldown time in seconds (float)
- {username} - User's display name (if available in message formatter)
Handling Cooldown Responses¶
Commands on cooldown don't execute, but you can handle cooldown responses:
from starstreamer.core.decorators import filter
def handle_cooldown_response(event):
    """Custom filter to handle cooldown responses"""
    # This would be implemented in a more advanced trigger system
    # For now, cooldowns are handled automatically by the trigger system
    return True
@on_event("twitch.chat.message")
@filter(handle_cooldown_response)
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(300, message="Work cooldown: {remaining:.0f}s remaining"))
async def work_with_response(event: Event, twitch: TwitchClient) -> None:
    """Work command that handles its own cooldown responses"""
    pass
CooldownTracker API¶
For advanced use cases, you can interact with the cooldown tracker directly:
Basic Usage¶
from starstreamer.core.cooldowns import get_cooldown_tracker
# Get the global tracker instance
tracker = get_cooldown_tracker()
# Check if action is on cooldown
if tracker.check_cooldown("user123", "!work", 300):
    # Execute action
    print("Action executed!")
else:
    # Get remaining time
    remaining = tracker.get_remaining_cooldown("user123", "!work", 300)
    print(f"Wait {remaining:.0f} seconds")
Manual Cooldown Management¶
async def admin_reset_cooldown(event: Event, twitch: TwitchClient) -> None:
    """Admin command to reset user cooldowns"""
    message = event.data.get("message", "")
    parts = message.split()
    if len(parts) < 3:  # !resetcd <user> <action>
        await twitch.send_message("Usage: !resetcd <user> <action>")
        return
    target_user = parts[1]
    action = parts[2]
    tracker = get_cooldown_tracker()
    tracker.reset_cooldown(target_user, action)
    await twitch.send_message(f"Reset {action} cooldown for {target_user}")
async def view_cooldown_stats(event: Event, twitch: TwitchClient) -> None:
    """View cooldown system statistics"""
    tracker = get_cooldown_tracker()
    stats = tracker.get_stats()
    await twitch.send_message(
        f"Cooldown Stats - Users: {stats['total_users']}, "
        f"Active cooldowns: {stats['total_actions']}, "
        f"Global: {stats['global_cooldowns']}"
    )
Memory Management¶
import asyncio
async def cooldown_cleanup_task():
    """Background task to clean up old cooldown entries"""
    tracker = get_cooldown_tracker()
    while True:
        # Clean up entries older than 1 hour
        tracker.cleanup_old_entries(max_age_seconds=3600)
        # Wait 10 minutes before next cleanup
        await asyncio.sleep(600)
# Start cleanup task in your main application
asyncio.create_task(cooldown_cleanup_task())
Configuration Examples¶
Economy System Cooldowns¶
# Different cooldowns for different economy actions
@trigger(CooldownTrigger(300))  # 5 minutes
async def work_command(event: Event, twitch: TwitchClient) -> None:
    """Regular work command"""
    pass
@trigger(CooldownTrigger(86400))  # 24 hours
async def daily_command(event: Event, twitch: TwitchClient) -> None:
    """Daily bonus command"""
    pass
@trigger(CooldownTrigger(60))  # 1 minute
async def balance_command(event: Event, twitch: TwitchClient) -> None:
    """Balance check with short cooldown"""
    pass
@trigger(CooldownTrigger(10, per_user=False))  # Global 10 seconds
async def leaderboard_command(event: Event, twitch: TwitchClient) -> None:
    """Leaderboard with global cooldown to prevent spam"""
    pass
Moderation Cooldowns¶
from starstreamer.triggers import ModOnlyTrigger
@trigger(CommandTrigger("!timeout"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(5, per_user=False))  # Global 5-second cooldown
async def timeout_command(event: Event, twitch: TwitchClient) -> None:
    """Timeout command with global cooldown to prevent rapid timeouts"""
    pass
@trigger(CommandTrigger("!clear"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(30, per_user=False))  # Global 30-second cooldown
async def clear_command(event: Event, twitch: TwitchClient) -> None:
    """Clear chat with global cooldown"""
    pass
API Integration Cooldowns¶
@trigger(CooldownTrigger(60, per_user=False))  # Global 1-minute cooldown
async def weather_command(event: Event, twitch: TwitchClient, weather_api: WeatherAPI) -> None:
    """Weather command with global cooldown to respect API limits"""
    try:
        weather_data = await weather_api.get_current_weather()
        await twitch.send_message(f"Current weather: {weather_data}")
    except Exception as e:
        await twitch.send_message("Weather service temporarily unavailable")
@trigger(CooldownTrigger(120, per_user=False))  # Global 2-minute cooldown
async def news_command(event: Event, twitch: TwitchClient, news_api: NewsAPI) -> None:
    """News command with global cooldown for API rate limiting"""
    try:
        latest_news = await news_api.get_latest_headlines()
        await twitch.send_message(f"Latest: {latest_news}")
    except Exception as e:
        await twitch.send_message("News service temporarily unavailable")
Testing Cooldowns¶
Unit Testing¶
import pytest
from unittest.mock import Mock, patch
from starstreamer.triggers.cooldown import CooldownTrigger
def test_cooldown_trigger_blocks_rapid_usage():
    """Test that cooldown prevents rapid command usage"""
    trigger = CooldownTrigger(60, action_name="test_action")
    # Mock event
    event = Mock()
    event.data = {"user": {"user_id": "test_user"}}
    # First use should work
    result1 = trigger.matches(event)
    assert result1.matched is True
    # Second use should be blocked
    result2 = trigger.matches(event)
    assert result2.matched is False
    assert result2.data["cooldown_remaining"] > 0
@patch('time.time')
def test_cooldown_expiration(mock_time):
    """Test that cooldown expires after specified time"""
    trigger = CooldownTrigger(60, action_name="test_action")
    event = Mock()
    event.data = {"user": {"user_id": "test_user"}}
    # Set initial time
    mock_time.return_value = 1000.0
    # Use command
    result1 = trigger.matches(event)
    assert result1.matched is True
    # Advance time past cooldown
    mock_time.return_value = 1061.0  # 61 seconds later
    # Should work again
    result2 = trigger.matches(event)
    assert result2.matched is True
Integration Testing¶
@pytest.mark.asyncio
async def test_cooldown_in_event_flow():
    """Test cooldown behavior in full event flow"""
    from starstreamer.core.decorators import create_di_event_bus
    event_bus = create_di_event_bus()
    container = event_bus.handler_registry.container
    # Mock services
    mock_twitch = AsyncMock()
    container.register_singleton(TwitchClient, mock_twitch)
    # Register command with cooldown
    @on_event("twitch.chat.message")
    @trigger(CommandTrigger("!test"))
    @trigger(CooldownTrigger(60))
    async def test_command(event: Event, twitch: TwitchClient) -> None:
        await twitch.send_message("Test command executed!")
    await event_bus.start()
    # First execution
    await event_bus.emit("twitch.chat.message", {
        "user": {"user_id": "test_user"},
        "message": "!test"
    })
    # Second execution (should be blocked)
    await event_bus.emit("twitch.chat.message", {
        "user": {"user_id": "test_user"},
        "message": "!test"
    })
    await asyncio.sleep(0.1)  # Allow processing
    # Should only have been called once due to cooldown
    assert mock_twitch.send_message.call_count == 1
    await event_bus.stop()
Best Practices¶
Choosing Cooldown Durations¶
Short Cooldowns (5-30 seconds):
- Information commands (!uptime, !song)
- Simple responses (!hello, !discord)
- Low-cost operations
Medium Cooldowns (1-5 minutes):
- Economy commands (!work, !gamble)
- Interactive features (!hug, !pet)
- Moderate-cost operations
Long Cooldowns (30+ minutes):
- Daily rewards (!daily, !bonus)
- Expensive operations (!bigwork)
- Once-per-session features
Global vs Per-User: - Use per-user for personal commands and economy features - Use global for expensive operations, API calls, and stream-wide effects
Error Handling¶
@trigger(CooldownTrigger(60, message="Please wait {remaining:.0f}s before retrying"))
async def safe_command(event: Event, twitch: TwitchClient, logger: logging.Logger) -> None:
    """Command with proper error handling and cooldown"""
    try:
        # Command logic that might fail
        result = await risky_operation()
        await twitch.send_message(f"Success: {result}")
    except Exception as e:
        logger.error(f"Command failed: {e}")
        # Don't send error to chat - cooldown already applied
        # User will need to wait before retrying
Performance Considerations¶
# ✅ Good: Shared cooldown tracker
SHARED_TRACKER = get_cooldown_tracker()
@trigger(CooldownTrigger(60, tracker=SHARED_TRACKER))
async def efficient_command(event: Event, twitch: TwitchClient) -> None:
    """Efficient command using shared tracker"""
    pass
# ❌ Avoid: Creating new trackers
@trigger(CooldownTrigger(60, tracker=CooldownTracker()))  # Creates new instance
async def inefficient_command(event: Event, twitch: TwitchClient) -> None:
    """Inefficient - creates separate tracker"""
    pass
Documentation¶
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(
    300,  # 5 minutes
    per_user=True,
    message="Work cooldown: {remaining:.0f}s remaining"
))
async def work_command(event: Event, twitch: TwitchClient, economy: EconomyService) -> None:
    """
    Work command with 5-minute per-user cooldown.
    Allows users to earn virtual currency by working.
    Cooldown prevents spam and maintains balance.
    Cooldown: 5 minutes per user
    Type: Per-user (each user has separate cooldown)
    """
    pass
Troubleshooting¶
Common Issues¶
Cooldown Not Working:
# ❌ Problem: Missing trigger decorator
@on_event("twitch.chat.message")
async def broken_command(event: Event, twitch: TwitchClient) -> None:
    # CooldownTrigger not applied
    pass
# ✅ Solution: Add trigger decorator
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!test"))
@trigger(CooldownTrigger(60))
async def working_command(event: Event, twitch: TwitchClient) -> None:
    pass
Users Not Identified Properly:
# Debug user identification
@trigger(CooldownTrigger(60))
async def debug_command(event: Event, twitch: TwitchClient, logger: logging.Logger) -> None:
    user_data = event.data.get("user", {})
    user_id = user_data.get("user_id") or user_data.get("id") or user_data.get("username", "unknown")
    logger.info(f"User identified as: {user_id}")
    logger.info(f"Full user data: {user_data}")
Cooldown Messages Not Appearing: - Cooldown messages are handled by the trigger system - Commands on cooldown don't execute, so no response is sent - Consider adding a separate cooldown response handler if needed
Memory Usage¶
Monitor cooldown memory usage for high-traffic streams:
async def monitor_cooldowns():
    """Monitor cooldown memory usage"""
    tracker = get_cooldown_tracker()
    while True:
        stats = tracker.get_stats()
        print(f"Cooldown memory usage: {stats['memory_entries']} entries")
        # Clean up if too many entries
        if stats['memory_entries'] > 10000:
            tracker.cleanup_old_entries(max_age_seconds=1800)  # 30 minutes
        await asyncio.sleep(300)  # Check every 5 minutes
See Also¶
- Triggers & Filters - Understanding the trigger system
- Custom Commands Examples - Practical cooldown examples
- Decorators API - Complete decorator reference
- Module System - Building modules with cooldowns