Cooldowns¶
StarStreamer's cooldown system prevents command spam and rate-limits actions to maintain a healthy chat experience. The system supports both per-user and global cooldowns with flexible configuration options.
Overview¶
The cooldown system works by tracking when commands or actions were last executed and preventing re-execution until a specified time period has elapsed. This helps:
- Prevent Spam - Stop users from repeatedly triggering the same command
- Manage Resources - Rate-limit expensive operations like API calls
- Improve Chat Quality - Reduce noise from command overuse
- Fair Usage - Ensure all users get a chance to use commands
Quick Start¶
Basic Per-User Cooldown¶
from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger, CooldownTrigger
from starstreamer.plugins.twitch import TwitchClient
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(300, per_user=True)) # 5 minutes per user
async def work_command(event: Event, twitch: TwitchClient) -> None:
"""Work command with per-user cooldown"""
user = event.data.get("user", {})
username = user.get("display_name", "Unknown")
# Command logic here
await twitch.send_message(f"@{username} You worked hard and earned coins!")
Global Cooldown¶
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!uptime"))
@trigger(CooldownTrigger(30, per_user=False)) # 30-second global cooldown
async def uptime_command(event: Event, twitch: TwitchClient) -> None:
"""Uptime command with global cooldown"""
uptime = await get_stream_uptime()
await twitch.send_message(f"Stream has been live for: {uptime}")
CooldownTrigger API¶
Constructor¶
CooldownTrigger(
cooldown_seconds: float,
per_user: bool = True,
message: str | None = None,
action_name: str | None = None,
tracker: CooldownTracker | None = None
)
Parameters:
cooldown_seconds- Duration of the cooldown in secondsper_user- IfTrue, cooldown applies per user. IfFalse, global cooldown for all usersmessage- Optional custom message when command is on cooldown (supports{remaining}placeholder)action_name- Optional custom action name for tracking (defaults to command name)tracker- Optional custom cooldown tracker instance
Basic Examples¶
Per-User Cooldowns¶
# 5-minute per-user cooldown
@trigger(CooldownTrigger(300))
async def daily_bonus(event: Event, twitch: TwitchClient) -> None:
"""Daily bonus with 5-minute cooldown per user"""
pass
# 1-hour per-user cooldown
@trigger(CooldownTrigger(3600, per_user=True))
async def big_reward(event: Event, twitch: TwitchClient) -> None:
"""Big reward with 1-hour cooldown per user"""
pass
Global Cooldowns¶
# 10-second global cooldown
@trigger(CooldownTrigger(10, per_user=False))
async def server_stats(event: Event, twitch: TwitchClient) -> None:
"""Server stats with global cooldown to prevent API spam"""
pass
# 2-minute global cooldown for expensive operations
@trigger(CooldownTrigger(120, per_user=False))
async def generate_clip(event: Event, twitch: TwitchClient) -> None:
"""Clip generation with global cooldown"""
pass
Custom Messages¶
# Custom cooldown message
@trigger(CooldownTrigger(
60,
message="Please wait {remaining:.0f} seconds before working again!"
))
async def work_with_message(event: Event, twitch: TwitchClient) -> None:
"""Work command with custom cooldown message"""
pass
# More detailed message
@trigger(CooldownTrigger(
300,
message="@{username} You must wait {remaining:.1f} more seconds. Work cooldown prevents spam!"
))
async def detailed_cooldown(event: Event, twitch: TwitchClient) -> None:
"""Command with detailed cooldown message"""
pass
Custom Action Names¶
# Custom action name for shared cooldowns
@trigger(CooldownTrigger(60, action_name="economy_command"))
async def balance_check(event: Event, twitch: TwitchClient) -> None:
"""Balance command sharing cooldown with other economy commands"""
pass
@trigger(CooldownTrigger(60, action_name="economy_command"))
async def give_money(event: Event, twitch: TwitchClient) -> None:
"""Give money command sharing the same cooldown pool"""
pass
Cooldown Types¶
Per-User Cooldowns (Default)¶
Per-user cooldowns track usage separately for each user:
@trigger(CooldownTrigger(60)) # per_user=True by default
async def personal_command(event: Event, twitch: TwitchClient) -> None:
"""Each user has their own 60-second cooldown"""
pass
Behavior: - User A uses command → User A on cooldown for 60 seconds - User B can still use the command immediately - After 60 seconds, User A can use the command again
Use Cases:
- Economy commands (!work, !daily, !gamble)
- Personal stats (!balance, !profile)
- User-specific actions (!hug, !pet)
Global Cooldowns¶
Global cooldowns affect all users collectively:
@trigger(CooldownTrigger(30, per_user=False))
async def global_command(event: Event, twitch: TwitchClient) -> None:
"""All users share the same 30-second cooldown"""
pass
Behavior: - Any user uses command → All users on cooldown for 30 seconds - No user can use the command until cooldown expires
Use Cases:
- API-heavy commands (!weather, !news)
- Resource-intensive operations (!clip, !screenshot)
- Rate-limited external services
- Commands that affect the entire stream
Advanced Usage¶
Combining with Other Triggers¶
Cooldowns work seamlessly with other triggers:
from starstreamer.triggers import ModOnlyTrigger, SubscriberOnlyTrigger
# Moderator-only command with cooldown
@trigger(CommandTrigger("!clear"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(10, per_user=False))
async def clear_chat(event: Event, twitch: TwitchClient) -> None:
"""Mod-only chat clear with global cooldown"""
pass
# Subscriber command with per-user cooldown
@trigger(CommandTrigger("!vip"))
@trigger(SubscriberOnlyTrigger())
@trigger(CooldownTrigger(300))
async def subscriber_feature(event: Event, twitch: TwitchClient) -> None:
"""Subscriber-only feature with 5-minute per-user cooldown"""
pass
Multiple Cooldown Layers¶
Different commands can share cooldown pools:
# Shared "social" command cooldown
SOCIAL_COOLDOWN = CooldownTrigger(30, action_name="social_commands")
@trigger(CommandTrigger("!discord"))
@trigger(SOCIAL_COOLDOWN)
async def discord_link(event: Event, twitch: TwitchClient) -> None:
"""Discord link sharing social cooldown"""
pass
@trigger(CommandTrigger("!twitter"))
@trigger(SOCIAL_COOLDOWN)
async def twitter_link(event: Event, twitch: TwitchClient) -> None:
"""Twitter link sharing social cooldown"""
pass
@trigger(CommandTrigger("!youtube"))
@trigger(SOCIAL_COOLDOWN)
async def youtube_link(event: Event, twitch: TwitchClient) -> None:
"""YouTube link sharing social cooldown"""
pass
Dynamic Cooldowns¶
Adjust cooldown durations based on conditions:
@trigger(CommandTrigger("!work"))
async def dynamic_work_command(event: Event, twitch: TwitchClient, economy: EconomyService) -> None:
"""Work command with dynamic cooldown based on user level"""
user = event.data.get("user", {})
user_id = user.get("id", "")
username = user.get("display_name", "Unknown")
# Get user level
user_profile = await economy.get_user_profile(user_id)
level = user_profile.level
# Calculate dynamic cooldown (higher level = shorter cooldown)
base_cooldown = 300 # 5 minutes
cooldown_reduction = min(level * 10, 120) # Max 2-minute reduction
actual_cooldown = base_cooldown - cooldown_reduction
# Check cooldown manually
from starstreamer.core.cooldowns import get_cooldown_tracker
tracker = get_cooldown_tracker()
if not tracker.check_cooldown(user_id, "!work", actual_cooldown):
remaining = tracker.get_remaining_cooldown(user_id, "!work", actual_cooldown)
await twitch.send_message(
f"@{username} Please wait {remaining:.0f} seconds before working again!"
)
return
# Execute work command
await twitch.send_message(f"@{username} You worked hard! (Cooldown: {actual_cooldown}s)")
Cooldown Messages¶
Default Messages¶
Without custom messages, StarStreamer provides default cooldown feedback:
@trigger(CooldownTrigger(60))
async def simple_command(event: Event, twitch: TwitchClient) -> None:
"""Uses default cooldown message"""
pass
Default message: "Please wait {remaining:.0f} seconds before using this command again."
Custom Message Formatting¶
Custom messages support placeholder formatting:
@trigger(CooldownTrigger(
300,
message="⏰ {remaining:.0f} seconds remaining until you can work again!"
))
async def work_with_timer(event: Event, twitch: TwitchClient) -> None:
"""Work command with timer emoji"""
pass
@trigger(CooldownTrigger(
60,
message="Slow down there, {username}! Wait {remaining:.1f}s before trying again."
))
async def rate_limited_command(event: Event, twitch: TwitchClient) -> None:
"""Command with personalized cooldown message"""
pass
Available placeholders:
- {remaining} - Remaining cooldown time in seconds (float)
- {username} - User's display name (if available in message formatter)
Handling Cooldown Responses¶
Commands on cooldown don't execute, but you can handle cooldown responses:
from starstreamer.core.decorators import filter
def handle_cooldown_response(event):
"""Custom filter to handle cooldown responses"""
# This would be implemented in a more advanced trigger system
# For now, cooldowns are handled automatically by the trigger system
return True
@on_event("twitch.chat.message")
@filter(handle_cooldown_response)
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(300, message="Work cooldown: {remaining:.0f}s remaining"))
async def work_with_response(event: Event, twitch: TwitchClient) -> None:
"""Work command that handles its own cooldown responses"""
pass
CooldownTracker API¶
For advanced use cases, you can interact with the cooldown tracker directly:
Basic Usage¶
from starstreamer.core.cooldowns import get_cooldown_tracker
# Get the global tracker instance
tracker = get_cooldown_tracker()
# Check if action is on cooldown
if tracker.check_cooldown("user123", "!work", 300):
# Execute action
print("Action executed!")
else:
# Get remaining time
remaining = tracker.get_remaining_cooldown("user123", "!work", 300)
print(f"Wait {remaining:.0f} seconds")
Manual Cooldown Management¶
async def admin_reset_cooldown(event: Event, twitch: TwitchClient) -> None:
"""Admin command to reset user cooldowns"""
message = event.data.get("message", "")
parts = message.split()
if len(parts) < 3: # !resetcd <user> <action>
await twitch.send_message("Usage: !resetcd <user> <action>")
return
target_user = parts[1]
action = parts[2]
tracker = get_cooldown_tracker()
tracker.reset_cooldown(target_user, action)
await twitch.send_message(f"Reset {action} cooldown for {target_user}")
async def view_cooldown_stats(event: Event, twitch: TwitchClient) -> None:
"""View cooldown system statistics"""
tracker = get_cooldown_tracker()
stats = tracker.get_stats()
await twitch.send_message(
f"Cooldown Stats - Users: {stats['total_users']}, "
f"Active cooldowns: {stats['total_actions']}, "
f"Global: {stats['global_cooldowns']}"
)
Memory Management¶
import asyncio
async def cooldown_cleanup_task():
"""Background task to clean up old cooldown entries"""
tracker = get_cooldown_tracker()
while True:
# Clean up entries older than 1 hour
tracker.cleanup_old_entries(max_age_seconds=3600)
# Wait 10 minutes before next cleanup
await asyncio.sleep(600)
# Start cleanup task in your main application
asyncio.create_task(cooldown_cleanup_task())
Configuration Examples¶
Economy System Cooldowns¶
# Different cooldowns for different economy actions
@trigger(CooldownTrigger(300)) # 5 minutes
async def work_command(event: Event, twitch: TwitchClient) -> None:
"""Regular work command"""
pass
@trigger(CooldownTrigger(86400)) # 24 hours
async def daily_command(event: Event, twitch: TwitchClient) -> None:
"""Daily bonus command"""
pass
@trigger(CooldownTrigger(60)) # 1 minute
async def balance_command(event: Event, twitch: TwitchClient) -> None:
"""Balance check with short cooldown"""
pass
@trigger(CooldownTrigger(10, per_user=False)) # Global 10 seconds
async def leaderboard_command(event: Event, twitch: TwitchClient) -> None:
"""Leaderboard with global cooldown to prevent spam"""
pass
Moderation Cooldowns¶
from starstreamer.triggers import ModOnlyTrigger
@trigger(CommandTrigger("!timeout"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(5, per_user=False)) # Global 5-second cooldown
async def timeout_command(event: Event, twitch: TwitchClient) -> None:
"""Timeout command with global cooldown to prevent rapid timeouts"""
pass
@trigger(CommandTrigger("!clear"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(30, per_user=False)) # Global 30-second cooldown
async def clear_command(event: Event, twitch: TwitchClient) -> None:
"""Clear chat with global cooldown"""
pass
API Integration Cooldowns¶
@trigger(CooldownTrigger(60, per_user=False)) # Global 1-minute cooldown
async def weather_command(event: Event, twitch: TwitchClient, weather_api: WeatherAPI) -> None:
"""Weather command with global cooldown to respect API limits"""
try:
weather_data = await weather_api.get_current_weather()
await twitch.send_message(f"Current weather: {weather_data}")
except Exception as e:
await twitch.send_message("Weather service temporarily unavailable")
@trigger(CooldownTrigger(120, per_user=False)) # Global 2-minute cooldown
async def news_command(event: Event, twitch: TwitchClient, news_api: NewsAPI) -> None:
"""News command with global cooldown for API rate limiting"""
try:
latest_news = await news_api.get_latest_headlines()
await twitch.send_message(f"Latest: {latest_news}")
except Exception as e:
await twitch.send_message("News service temporarily unavailable")
Testing Cooldowns¶
Unit Testing¶
import pytest
from unittest.mock import Mock, patch
from starstreamer.triggers.cooldown import CooldownTrigger
def test_cooldown_trigger_blocks_rapid_usage():
"""Test that cooldown prevents rapid command usage"""
trigger = CooldownTrigger(60, action_name="test_action")
# Mock event
event = Mock()
event.data = {"user": {"user_id": "test_user"}}
# First use should work
result1 = trigger.matches(event)
assert result1.matched is True
# Second use should be blocked
result2 = trigger.matches(event)
assert result2.matched is False
assert result2.data["cooldown_remaining"] > 0
@patch('time.time')
def test_cooldown_expiration(mock_time):
"""Test that cooldown expires after specified time"""
trigger = CooldownTrigger(60, action_name="test_action")
event = Mock()
event.data = {"user": {"user_id": "test_user"}}
# Set initial time
mock_time.return_value = 1000.0
# Use command
result1 = trigger.matches(event)
assert result1.matched is True
# Advance time past cooldown
mock_time.return_value = 1061.0 # 61 seconds later
# Should work again
result2 = trigger.matches(event)
assert result2.matched is True
Integration Testing¶
@pytest.mark.asyncio
async def test_cooldown_in_event_flow():
"""Test cooldown behavior in full event flow"""
from starstreamer.core.decorators import create_di_event_bus
event_bus = create_di_event_bus()
container = event_bus.handler_registry.container
# Mock services
mock_twitch = AsyncMock()
container.register_singleton(TwitchClient, mock_twitch)
# Register command with cooldown
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!test"))
@trigger(CooldownTrigger(60))
async def test_command(event: Event, twitch: TwitchClient) -> None:
await twitch.send_message("Test command executed!")
await event_bus.start()
# First execution
await event_bus.emit("twitch.chat.message", {
"user": {"user_id": "test_user"},
"message": "!test"
})
# Second execution (should be blocked)
await event_bus.emit("twitch.chat.message", {
"user": {"user_id": "test_user"},
"message": "!test"
})
await asyncio.sleep(0.1) # Allow processing
# Should only have been called once due to cooldown
assert mock_twitch.send_message.call_count == 1
await event_bus.stop()
Best Practices¶
Choosing Cooldown Durations¶
Short Cooldowns (5-30 seconds):
- Information commands (!uptime, !song)
- Simple responses (!hello, !discord)
- Low-cost operations
Medium Cooldowns (1-5 minutes):
- Economy commands (!work, !gamble)
- Interactive features (!hug, !pet)
- Moderate-cost operations
Long Cooldowns (30+ minutes):
- Daily rewards (!daily, !bonus)
- Expensive operations (!bigwork)
- Once-per-session features
Global vs Per-User: - Use per-user for personal commands and economy features - Use global for expensive operations, API calls, and stream-wide effects
Error Handling¶
@trigger(CooldownTrigger(60, message="Please wait {remaining:.0f}s before retrying"))
async def safe_command(event: Event, twitch: TwitchClient, logger: logging.Logger) -> None:
"""Command with proper error handling and cooldown"""
try:
# Command logic that might fail
result = await risky_operation()
await twitch.send_message(f"Success: {result}")
except Exception as e:
logger.error(f"Command failed: {e}")
# Don't send error to chat - cooldown already applied
# User will need to wait before retrying
Performance Considerations¶
# ✅ Good: Shared cooldown tracker
SHARED_TRACKER = get_cooldown_tracker()
@trigger(CooldownTrigger(60, tracker=SHARED_TRACKER))
async def efficient_command(event: Event, twitch: TwitchClient) -> None:
"""Efficient command using shared tracker"""
pass
# ❌ Avoid: Creating new trackers
@trigger(CooldownTrigger(60, tracker=CooldownTracker())) # Creates new instance
async def inefficient_command(event: Event, twitch: TwitchClient) -> None:
"""Inefficient - creates separate tracker"""
pass
Documentation¶
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!work"))
@trigger(CooldownTrigger(
300, # 5 minutes
per_user=True,
message="Work cooldown: {remaining:.0f}s remaining"
))
async def work_command(event: Event, twitch: TwitchClient, economy: EconomyService) -> None:
"""
Work command with 5-minute per-user cooldown.
Allows users to earn virtual currency by working.
Cooldown prevents spam and maintains balance.
Cooldown: 5 minutes per user
Type: Per-user (each user has separate cooldown)
"""
pass
Troubleshooting¶
Common Issues¶
Cooldown Not Working:
# ❌ Problem: Missing trigger decorator
@on_event("twitch.chat.message")
async def broken_command(event: Event, twitch: TwitchClient) -> None:
# CooldownTrigger not applied
pass
# ✅ Solution: Add trigger decorator
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!test"))
@trigger(CooldownTrigger(60))
async def working_command(event: Event, twitch: TwitchClient) -> None:
pass
Users Not Identified Properly:
# Debug user identification
@trigger(CooldownTrigger(60))
async def debug_command(event: Event, twitch: TwitchClient, logger: logging.Logger) -> None:
user_data = event.data.get("user", {})
user_id = user_data.get("user_id") or user_data.get("id") or user_data.get("username", "unknown")
logger.info(f"User identified as: {user_id}")
logger.info(f"Full user data: {user_data}")
Cooldown Messages Not Appearing: - Cooldown messages are handled by the trigger system - Commands on cooldown don't execute, so no response is sent - Consider adding a separate cooldown response handler if needed
Memory Usage¶
Monitor cooldown memory usage for high-traffic streams:
async def monitor_cooldowns():
"""Monitor cooldown memory usage"""
tracker = get_cooldown_tracker()
while True:
stats = tracker.get_stats()
print(f"Cooldown memory usage: {stats['memory_entries']} entries")
# Clean up if too many entries
if stats['memory_entries'] > 10000:
tracker.cleanup_old_entries(max_age_seconds=1800) # 30 minutes
await asyncio.sleep(300) # Check every 5 minutes
See Also¶
- Triggers & Filters - Understanding the trigger system
- Custom Commands Examples - Practical cooldown examples
- Decorators API - Complete decorator reference
- Module System - Building modules with cooldowns