Skip to content

Cooldowns

StarStreamer's cooldown system prevents command spam and rate-limits actions to maintain a healthy chat experience. The system supports both per-user and global cooldowns with flexible configuration options.

Overview

The cooldown system works by tracking when commands or actions were last executed and preventing re-execution until a specified time period has elapsed. This helps:

  • Prevent Spam - Stop users from repeatedly triggering the same command
  • Manage Resources - Rate-limit expensive operations like API calls
  • Improve Chat Quality - Reduce noise from command overuse
  • Fair Usage - Ensure all users get a chance to use commands

Quick Start

Basic Per-User Cooldown

from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger, CooldownTrigger
from starstreamer.plugins.twitch import TwitchClient

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(300, per_user=True))  # 5 minutes per user
async def work_command(event: Event, twitch: TwitchClient) -> None:
    """Work command with per-user cooldown"""
    user = event.data.get("user", {})
    username = user.get("display_name", "Unknown")

    # Command logic here
    await twitch.send_message(f"@{username} You worked hard and earned coins!")

Global Cooldown

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!uptime"))
@trigger(CooldownTrigger(30, per_user=False))  # 30-second global cooldown
async def uptime_command(event: Event, twitch: TwitchClient) -> None:
    """Uptime command with global cooldown"""
    uptime = await get_stream_uptime()
    await twitch.send_message(f"Stream has been live for: {uptime}")

CooldownTrigger API

Constructor

CooldownTrigger(
    cooldown_seconds: float,
    per_user: bool = True,
    message: str | None = None,
    action_name: str | None = None,
    tracker: CooldownTracker | None = None
)

Parameters:

  • cooldown_seconds - Duration of the cooldown in seconds
  • per_user - If True, cooldown applies per user. If False, global cooldown for all users
  • message - Optional custom message when command is on cooldown (supports {remaining} placeholder)
  • action_name - Optional custom action name for tracking (defaults to command name)
  • tracker - Optional custom cooldown tracker instance

Basic Examples

Per-User Cooldowns

# 5-minute per-user cooldown
@trigger(CooldownTrigger(300))
async def daily_bonus(event: Event, twitch: TwitchClient) -> None:
    """Daily bonus with 5-minute cooldown per user"""
    pass

# 1-hour per-user cooldown
@trigger(CooldownTrigger(3600, per_user=True))
async def big_reward(event: Event, twitch: TwitchClient) -> None:
    """Big reward with 1-hour cooldown per user"""
    pass

Global Cooldowns

# 10-second global cooldown
@trigger(CooldownTrigger(10, per_user=False))
async def server_stats(event: Event, twitch: TwitchClient) -> None:
    """Server stats with global cooldown to prevent API spam"""
    pass

# 2-minute global cooldown for expensive operations
@trigger(CooldownTrigger(120, per_user=False))
async def generate_clip(event: Event, twitch: TwitchClient) -> None:
    """Clip generation with global cooldown"""
    pass

Custom Messages

# Custom cooldown message
@trigger(CooldownTrigger(
    60,
    message="Please wait {remaining:.0f} seconds before working again!"
))
async def work_with_message(event: Event, twitch: TwitchClient) -> None:
    """Work command with custom cooldown message"""
    pass

# More detailed message
@trigger(CooldownTrigger(
    300,
    message="@{username} You must wait {remaining:.1f} more seconds. Work cooldown prevents spam!"
))
async def detailed_cooldown(event: Event, twitch: TwitchClient) -> None:
    """Command with detailed cooldown message"""
    pass

Custom Action Names

# Custom action name for shared cooldowns
@trigger(CooldownTrigger(60, action_name="economy_command"))
async def balance_check(event: Event, twitch: TwitchClient) -> None:
    """Balance command sharing cooldown with other economy commands"""
    pass

@trigger(CooldownTrigger(60, action_name="economy_command"))
async def give_money(event: Event, twitch: TwitchClient) -> None:
    """Give money command sharing the same cooldown pool"""
    pass

Cooldown Types

Per-User Cooldowns (Default)

Per-user cooldowns track usage separately for each user:

@trigger(CooldownTrigger(60))  # per_user=True by default
async def personal_command(event: Event, twitch: TwitchClient) -> None:
    """Each user has their own 60-second cooldown"""
    pass

Behavior: - User A uses command → User A on cooldown for 60 seconds - User B can still use the command immediately - After 60 seconds, User A can use the command again

Use Cases: - Economy commands (!work, !daily, !gamble) - Personal stats (!balance, !profile) - User-specific actions (!hug, !pet)

Global Cooldowns

Global cooldowns affect all users collectively:

@trigger(CooldownTrigger(30, per_user=False))
async def global_command(event: Event, twitch: TwitchClient) -> None:
    """All users share the same 30-second cooldown"""
    pass

Behavior: - Any user uses command → All users on cooldown for 30 seconds - No user can use the command until cooldown expires

Use Cases: - API-heavy commands (!weather, !news) - Resource-intensive operations (!clip, !screenshot) - Rate-limited external services - Commands that affect the entire stream

Advanced Usage

Combining with Other Triggers

Cooldowns work seamlessly with other triggers:

from starstreamer.triggers import ModOnlyTrigger, SubscriberOnlyTrigger

# Moderator-only command with cooldown
@trigger(CommandTrigger("!clear"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(10, per_user=False))
async def clear_chat(event: Event, twitch: TwitchClient) -> None:
    """Mod-only chat clear with global cooldown"""
    pass

# Subscriber command with per-user cooldown
@trigger(CommandTrigger("!vip"))
@trigger(SubscriberOnlyTrigger())
@trigger(CooldownTrigger(300))
async def subscriber_feature(event: Event, twitch: TwitchClient) -> None:
    """Subscriber-only feature with 5-minute per-user cooldown"""
    pass

Multiple Cooldown Layers

Different commands can share cooldown pools:

# Shared "social" command cooldown
SOCIAL_COOLDOWN = CooldownTrigger(30, action_name="social_commands")

@trigger(CommandTrigger("!discord"))
@trigger(SOCIAL_COOLDOWN)
async def discord_link(event: Event, twitch: TwitchClient) -> None:
    """Discord link sharing social cooldown"""
    pass

@trigger(CommandTrigger("!twitter"))
@trigger(SOCIAL_COOLDOWN)
async def twitter_link(event: Event, twitch: TwitchClient) -> None:
    """Twitter link sharing social cooldown"""
    pass

@trigger(CommandTrigger("!youtube"))
@trigger(SOCIAL_COOLDOWN)
async def youtube_link(event: Event, twitch: TwitchClient) -> None:
    """YouTube link sharing social cooldown"""
    pass

Dynamic Cooldowns

Adjust cooldown durations based on conditions:

@trigger(CommandTrigger("!work"))
async def dynamic_work_command(event: Event, twitch: TwitchClient, economy: EconomyService) -> None:
    """Work command with dynamic cooldown based on user level"""
    user = event.data.get("user", {})
    user_id = user.get("id", "")
    username = user.get("display_name", "Unknown")

    # Get user level
    user_profile = await economy.get_user_profile(user_id)
    level = user_profile.level

    # Calculate dynamic cooldown (higher level = shorter cooldown)
    base_cooldown = 300  # 5 minutes
    cooldown_reduction = min(level * 10, 120)  # Max 2-minute reduction
    actual_cooldown = base_cooldown - cooldown_reduction

    # Check cooldown manually
    from starstreamer.core.cooldowns import get_cooldown_tracker
    tracker = get_cooldown_tracker()

    if not tracker.check_cooldown(user_id, "!work", actual_cooldown):
        remaining = tracker.get_remaining_cooldown(user_id, "!work", actual_cooldown)
        await twitch.send_message(
            f"@{username} Please wait {remaining:.0f} seconds before working again!"
        )
        return

    # Execute work command
    await twitch.send_message(f"@{username} You worked hard! (Cooldown: {actual_cooldown}s)")

Cooldown Messages

Default Messages

Without custom messages, StarStreamer provides default cooldown feedback:

@trigger(CooldownTrigger(60))
async def simple_command(event: Event, twitch: TwitchClient) -> None:
    """Uses default cooldown message"""
    pass

Default message: "Please wait {remaining:.0f} seconds before using this command again."

Custom Message Formatting

Custom messages support placeholder formatting:

@trigger(CooldownTrigger(
    300,
    message="⏰ {remaining:.0f} seconds remaining until you can work again!"
))
async def work_with_timer(event: Event, twitch: TwitchClient) -> None:
    """Work command with timer emoji"""
    pass

@trigger(CooldownTrigger(
    60,
    message="Slow down there, {username}! Wait {remaining:.1f}s before trying again."
))
async def rate_limited_command(event: Event, twitch: TwitchClient) -> None:
    """Command with personalized cooldown message"""
    pass

Available placeholders: - {remaining} - Remaining cooldown time in seconds (float) - {username} - User's display name (if available in message formatter)

Handling Cooldown Responses

Commands on cooldown don't execute, but you can handle cooldown responses:

from starstreamer.core.decorators import filter

def handle_cooldown_response(event):
    """Custom filter to handle cooldown responses"""
    # This would be implemented in a more advanced trigger system
    # For now, cooldowns are handled automatically by the trigger system
    return True

@on_event("twitch.chat.message")
@filter(handle_cooldown_response)
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(300, message="Work cooldown: {remaining:.0f}s remaining"))
async def work_with_response(event: Event, twitch: TwitchClient) -> None:
    """Work command that handles its own cooldown responses"""
    pass

CooldownTracker API

For advanced use cases, you can interact with the cooldown tracker directly:

Basic Usage

from starstreamer.core.cooldowns import get_cooldown_tracker

# Get the global tracker instance
tracker = get_cooldown_tracker()

# Check if action is on cooldown
if tracker.check_cooldown("user123", "!work", 300):
    # Execute action
    print("Action executed!")
else:
    # Get remaining time
    remaining = tracker.get_remaining_cooldown("user123", "!work", 300)
    print(f"Wait {remaining:.0f} seconds")

Manual Cooldown Management

async def admin_reset_cooldown(event: Event, twitch: TwitchClient) -> None:
    """Admin command to reset user cooldowns"""
    message = event.data.get("message", "")
    parts = message.split()

    if len(parts) < 3:  # !resetcd <user> <action>
        await twitch.send_message("Usage: !resetcd <user> <action>")
        return

    target_user = parts[1]
    action = parts[2]

    tracker = get_cooldown_tracker()
    tracker.reset_cooldown(target_user, action)

    await twitch.send_message(f"Reset {action} cooldown for {target_user}")

async def view_cooldown_stats(event: Event, twitch: TwitchClient) -> None:
    """View cooldown system statistics"""
    tracker = get_cooldown_tracker()
    stats = tracker.get_stats()

    await twitch.send_message(
        f"Cooldown Stats - Users: {stats['total_users']}, "
        f"Active cooldowns: {stats['total_actions']}, "
        f"Global: {stats['global_cooldowns']}"
    )

Memory Management

import asyncio

async def cooldown_cleanup_task():
    """Background task to clean up old cooldown entries"""
    tracker = get_cooldown_tracker()

    while True:
        # Clean up entries older than 1 hour
        tracker.cleanup_old_entries(max_age_seconds=3600)

        # Wait 10 minutes before next cleanup
        await asyncio.sleep(600)

# Start cleanup task in your main application
asyncio.create_task(cooldown_cleanup_task())

Configuration Examples

Economy System Cooldowns

# Different cooldowns for different economy actions
@trigger(CooldownTrigger(300))  # 5 minutes
async def work_command(event: Event, twitch: TwitchClient) -> None:
    """Regular work command"""
    pass

@trigger(CooldownTrigger(86400))  # 24 hours
async def daily_command(event: Event, twitch: TwitchClient) -> None:
    """Daily bonus command"""
    pass

@trigger(CooldownTrigger(60))  # 1 minute
async def balance_command(event: Event, twitch: TwitchClient) -> None:
    """Balance check with short cooldown"""
    pass

@trigger(CooldownTrigger(10, per_user=False))  # Global 10 seconds
async def leaderboard_command(event: Event, twitch: TwitchClient) -> None:
    """Leaderboard with global cooldown to prevent spam"""
    pass

Moderation Cooldowns

from starstreamer.triggers import ModOnlyTrigger

@trigger(CommandTrigger("!timeout"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(5, per_user=False))  # Global 5-second cooldown
async def timeout_command(event: Event, twitch: TwitchClient) -> None:
    """Timeout command with global cooldown to prevent rapid timeouts"""
    pass

@trigger(CommandTrigger("!clear"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(30, per_user=False))  # Global 30-second cooldown
async def clear_command(event: Event, twitch: TwitchClient) -> None:
    """Clear chat with global cooldown"""
    pass

API Integration Cooldowns

@trigger(CooldownTrigger(60, per_user=False))  # Global 1-minute cooldown
async def weather_command(event: Event, twitch: TwitchClient, weather_api: WeatherAPI) -> None:
    """Weather command with global cooldown to respect API limits"""
    try:
        weather_data = await weather_api.get_current_weather()
        await twitch.send_message(f"Current weather: {weather_data}")
    except Exception as e:
        await twitch.send_message("Weather service temporarily unavailable")

@trigger(CooldownTrigger(120, per_user=False))  # Global 2-minute cooldown
async def news_command(event: Event, twitch: TwitchClient, news_api: NewsAPI) -> None:
    """News command with global cooldown for API rate limiting"""
    try:
        latest_news = await news_api.get_latest_headlines()
        await twitch.send_message(f"Latest: {latest_news}")
    except Exception as e:
        await twitch.send_message("News service temporarily unavailable")

Testing Cooldowns

Unit Testing

import pytest
from unittest.mock import Mock, patch
from starstreamer.triggers.cooldown import CooldownTrigger

def test_cooldown_trigger_blocks_rapid_usage():
    """Test that cooldown prevents rapid command usage"""
    trigger = CooldownTrigger(60, action_name="test_action")

    # Mock event
    event = Mock()
    event.data = {"user": {"user_id": "test_user"}}

    # First use should work
    result1 = trigger.matches(event)
    assert result1.matched is True

    # Second use should be blocked
    result2 = trigger.matches(event)
    assert result2.matched is False
    assert result2.data["cooldown_remaining"] > 0

@patch('time.time')
def test_cooldown_expiration(mock_time):
    """Test that cooldown expires after specified time"""
    trigger = CooldownTrigger(60, action_name="test_action")

    event = Mock()
    event.data = {"user": {"user_id": "test_user"}}

    # Set initial time
    mock_time.return_value = 1000.0

    # Use command
    result1 = trigger.matches(event)
    assert result1.matched is True

    # Advance time past cooldown
    mock_time.return_value = 1061.0  # 61 seconds later

    # Should work again
    result2 = trigger.matches(event)
    assert result2.matched is True

Integration Testing

@pytest.mark.asyncio
async def test_cooldown_in_event_flow():
    """Test cooldown behavior in full event flow"""
    from starstreamer.core.decorators import create_di_event_bus

    event_bus = create_di_event_bus()
    container = event_bus.handler_registry.container

    # Mock services
    mock_twitch = AsyncMock()
    container.register_singleton(TwitchClient, mock_twitch)

    # Register command with cooldown
    @on_event("twitch.chat.message")
    @trigger(CommandTrigger("!test"))
    @trigger(CooldownTrigger(60))
    async def test_command(event: Event, twitch: TwitchClient) -> None:
        await twitch.send_message("Test command executed!")

    await event_bus.start()

    # First execution
    await event_bus.emit("twitch.chat.message", {
        "user": {"user_id": "test_user"},
        "message": "!test"
    })

    # Second execution (should be blocked)
    await event_bus.emit("twitch.chat.message", {
        "user": {"user_id": "test_user"},
        "message": "!test"
    })

    await asyncio.sleep(0.1)  # Allow processing

    # Should only have been called once due to cooldown
    assert mock_twitch.send_message.call_count == 1

    await event_bus.stop()

Best Practices

Choosing Cooldown Durations

Short Cooldowns (5-30 seconds): - Information commands (!uptime, !song) - Simple responses (!hello, !discord) - Low-cost operations

Medium Cooldowns (1-5 minutes): - Economy commands (!work, !gamble) - Interactive features (!hug, !pet) - Moderate-cost operations

Long Cooldowns (30+ minutes): - Daily rewards (!daily, !bonus) - Expensive operations (!bigwork) - Once-per-session features

Global vs Per-User: - Use per-user for personal commands and economy features - Use global for expensive operations, API calls, and stream-wide effects

Error Handling

@trigger(CooldownTrigger(60, message="Please wait {remaining:.0f}s before retrying"))
async def safe_command(event: Event, twitch: TwitchClient, logger: logging.Logger) -> None:
    """Command with proper error handling and cooldown"""
    try:
        # Command logic that might fail
        result = await risky_operation()
        await twitch.send_message(f"Success: {result}")
    except Exception as e:
        logger.error(f"Command failed: {e}")
        # Don't send error to chat - cooldown already applied
        # User will need to wait before retrying

Performance Considerations

# ✅ Good: Shared cooldown tracker
SHARED_TRACKER = get_cooldown_tracker()

@trigger(CooldownTrigger(60, tracker=SHARED_TRACKER))
async def efficient_command(event: Event, twitch: TwitchClient) -> None:
    """Efficient command using shared tracker"""
    pass

# ❌ Avoid: Creating new trackers
@trigger(CooldownTrigger(60, tracker=CooldownTracker()))  # Creates new instance
async def inefficient_command(event: Event, twitch: TwitchClient) -> None:
    """Inefficient - creates separate tracker"""
    pass

Documentation

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(
    300,  # 5 minutes
    per_user=True,
    message="Work cooldown: {remaining:.0f}s remaining"
))
async def work_command(event: Event, twitch: TwitchClient, economy: EconomyService) -> None:
    """
    Work command with 5-minute per-user cooldown.

    Allows users to earn virtual currency by working.
    Cooldown prevents spam and maintains balance.

    Cooldown: 5 minutes per user
    Type: Per-user (each user has separate cooldown)
    """
    pass

Troubleshooting

Common Issues

Cooldown Not Working:

# ❌ Problem: Missing trigger decorator
@on_event("twitch.chat.message")
async def broken_command(event: Event, twitch: TwitchClient) -> None:
    # CooldownTrigger not applied
    pass

# ✅ Solution: Add trigger decorator
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!test"))
@trigger(CooldownTrigger(60))
async def working_command(event: Event, twitch: TwitchClient) -> None:
    pass

Users Not Identified Properly:

# Debug user identification
@trigger(CooldownTrigger(60))
async def debug_command(event: Event, twitch: TwitchClient, logger: logging.Logger) -> None:
    user_data = event.data.get("user", {})
    user_id = user_data.get("user_id") or user_data.get("id") or user_data.get("username", "unknown")

    logger.info(f"User identified as: {user_id}")
    logger.info(f"Full user data: {user_data}")

Cooldown Messages Not Appearing: - Cooldown messages are handled by the trigger system - Commands on cooldown don't execute, so no response is sent - Consider adding a separate cooldown response handler if needed

Memory Usage

Monitor cooldown memory usage for high-traffic streams:

async def monitor_cooldowns():
    """Monitor cooldown memory usage"""
    tracker = get_cooldown_tracker()

    while True:
        stats = tracker.get_stats()
        print(f"Cooldown memory usage: {stats['memory_entries']} entries")

        # Clean up if too many entries
        if stats['memory_entries'] > 10000:
            tracker.cleanup_old_entries(max_age_seconds=1800)  # 30 minutes

        await asyncio.sleep(300)  # Check every 5 minutes

See Also