Triggers & Filters¶
Triggers and filters provide powerful ways to control when your handlers execute. They allow you to create specific conditions for your actions without complex if/else logic.
Implementation Status: 🚧 Partially Implemented - Core triggers (Command, Cooldown, Mod/Sub/VIP) are available. Advanced triggers (Keyword, Regex, User) are planned for future releases.
Triggers¶
Triggers are decorators that define activation conditions for handlers. They check if an event matches specific criteria before allowing the handler to run.
Command Trigger¶
The most common trigger - responds to chat commands:
from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!help"))
async def help_command(event, twitch):
"""Responds only to !help command"""
await twitch.send_message("Available commands: !help, !discord, !socials")
Command Options¶
# Basic command (case-insensitive by default)
@trigger(CommandTrigger("!help"))
# Command without prefix (will match !command)
@trigger(CommandTrigger("help"))
# Note: Advanced features like aliases and custom prefixes are planned for future implementation
Keyword Trigger¶
Respond to specific keywords in chat messages:
from starstreamer.triggers import KeywordTrigger, trigger
@on_event("twitch.chat.message")
@trigger(KeywordTrigger("discord", case_sensitive=False))
async def discord_mention(event, twitch):
"""Responds when someone mentions discord"""
await twitch.send_message("Join our Discord: discord.gg/example")
# Multiple keywords
@on_event("twitch.chat.message")
@trigger(KeywordTrigger("discord", "community", "server", case_sensitive=False))
async def community_mention(event, twitch):
"""Responds to any community-related keywords"""
await twitch.send_message("Join our community!")
Cooldown Trigger¶
Prevents spam by limiting execution frequency:
from starstreamer.triggers import CooldownTrigger
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!clip"))
@trigger(CooldownTrigger(cooldown_seconds=30))
async def create_clip(event, twitch):
"""Can only be used once every 30 seconds"""
await twitch.create_clip()
await twitch.send_message("Clip created!")
Cooldown Scopes¶
# Per-user cooldown (default)
@trigger(CooldownTrigger(cooldown_seconds=60, per_user=True))
# Global cooldown (affects everyone)
@trigger(CooldownTrigger(cooldown_seconds=120, per_user=False))
# Custom cooldown message
@trigger(CooldownTrigger(
cooldown_seconds=30,
per_user=True,
message="Please wait {remaining:.0f}s before using this command again!"
))
Moderator-Only Trigger¶
Restricts commands to moderators:
from starstreamer.triggers import ModOnlyTrigger
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!timeout"))
@trigger(ModOnlyTrigger())
async def timeout_user(event, twitch):
"""Only moderators can use this command"""
# Parse target user from message
target = event.data['message'].split()[1]
await twitch.timeout_user(target, duration=60)
Regex Trigger¶
Use regex patterns to match complex message formats:
from starstreamer.triggers import RegexTrigger
import re
@on_event("twitch.chat.message")
@trigger(RegexTrigger(r"!remind\s+(\d+)\s+(.+)", flags=re.IGNORECASE))
async def reminder_command(event, twitch):
"""Matches: !remind 5 check the oven"""
# Access regex match results from trigger data
trigger_data = getattr(event, 'trigger_data', {})
groups = trigger_data.get('groups', [])
if len(groups) >= 2:
minutes = int(groups[0])
message = groups[1]
# Set reminder logic here
User Triggers¶
Restrict commands to specific users by username or user ID:
from starstreamer.triggers import UserTrigger
# Specific users only (allow mode)
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!admin"))
@trigger(UserTrigger(["streamer", "trusted_mod"], mode="allow"))
async def admin_command(event, twitch):
"""Only specific users can use this"""
pass
# Exclude specific users (deny mode)
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!public"))
@trigger(UserTrigger(["banned_user", "spam_bot"], mode="deny"))
async def public_command(event, twitch):
"""Everyone except banned users can use this"""
pass
Subscriber/VIP Triggers¶
Access control by viewer status:
from starstreamer.triggers import SubscriberOnlyTrigger, VIPOnlyTrigger
# Subscribers only
@trigger(SubscriberOnlyTrigger())
async def sub_command(event, twitch):
"""Subscribers only"""
pass
# VIPs only
@trigger(VIPOnlyTrigger())
async def vip_command(event, twitch):
"""VIPs only"""
pass
# Alternative using filters
@filter(TwitchFilters.subscriber_only())
async def sub_command_alt(event, twitch):
"""Subscribers only using filter"""
pass
Combining Triggers¶
Stack multiple triggers for complex conditions:
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!ban"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(cooldown_seconds=5))
async def ban_command(event, twitch):
"""Mod-only ban command with cooldown"""
# All triggers must pass for handler to execute
pass
Active Triggers¶
New in v0.3.0 ✨ Active Triggers - Time-based and state-based triggers that run independently of events.
Active triggers are fundamentally different from event-based triggers. Instead of reacting to events like chat messages, they actively monitor conditions and execute handlers on schedules or when states change.
Timer Trigger¶
Execute handlers at regular intervals:
from starstreamer import on_event
from starstreamer.triggers import TimerTrigger
@on_event("active_trigger.execute")
async def heartbeat_monitor(event, logger):
"""Runs every 5 minutes"""
if event.data.get("trigger_type") == "TimerTrigger":
logger.info("💓 Bot is alive and monitoring")
# Register the timer (typically done in module initialization)
from starstreamer.core.active_triggers import ActiveTriggerManager
timer = TimerTrigger(
interval=300, # 5 minutes
start_immediately=False, # Wait for first interval
align_to_minute=True, # Align to minute boundaries
name="heartbeat"
)
Timer Options¶
# Basic timer - every 30 seconds
TimerTrigger(interval=30)
# Start immediately, then repeat every hour
TimerTrigger(interval=3600, start_immediately=True)
# Align to minute boundaries (useful for scheduling)
TimerTrigger(interval=900, align_to_minute=True) # Every 15 minutes on :00, :15, :30, :45
# Custom name for debugging
TimerTrigger(interval=60, name="cleanup_timer")
Query Trigger¶
Monitor database conditions periodically:
from starstreamer.triggers import QueryTrigger
@on_event("active_trigger.execute")
async def inactive_user_cleanup(event, twitch, logger):
"""Clean up inactive users every hour"""
if event.data.get("trigger_type") == "QueryTrigger":
logger.info("🧹 Checking for inactive users")
# Cleanup logic here
# Database monitoring trigger
query_trigger = QueryTrigger(
query="SELECT * FROM users WHERE last_seen < datetime('now', '-7 days')",
condition=lambda results: len(results) > 0, # Trigger if inactive users found
check_interval=3600, # Check every hour
name="inactive_cleanup"
)
Threshold Trigger¶
React to metrics crossing thresholds with hysteresis:
from starstreamer.triggers import ThresholdTrigger
@on_event("active_trigger.execute")
async def viewer_milestone_alert(event, twitch, logger):
"""Celebrate viewer milestones"""
if event.data.get("trigger_type") == "ThresholdTrigger":
milestone = event.data.get("threshold")
current = event.data.get("metric_value")
await twitch.send_message(f"🎉 {current} viewers! Thank you!")
def get_viewer_count():
"""Your function to get current viewer count"""
return current_viewer_count
# Milestone trigger with hysteresis to prevent spam
threshold_trigger = ThresholdTrigger(
metric_func=get_viewer_count,
thresholds=[50, 100, 500, 1000], # Celebrate these milestones
direction="rising", # Only on increases
hysteresis=0.1, # 10% dead zone
check_interval=60, # Check every minute
name="viewer_milestones"
)
Condition Trigger¶
Execute when boolean conditions become true:
from starstreamer.triggers import ConditionTrigger
from datetime import datetime
@on_event("active_trigger.execute")
async def evening_reminder(event, twitch):
"""Remind about evening activities"""
if event.data.get("trigger_type") == "ConditionTrigger":
await twitch.send_message("🌙 Good evening! Stream starting soon!")
# Time-based condition
evening_trigger = ConditionTrigger(
condition=lambda: 19 <= datetime.now().hour <= 23, # 7 PM to 11 PM
check_interval=600, # Check every 10 minutes
once=True, # Only fire once when condition becomes true
name="evening_reminder"
)
Combined State Trigger¶
Combine timer intervals with state conditions:
from starstreamer.triggers import CombinedStateTrigger
# Only check expensive condition every 5 minutes, and only during streaming hours
combined_trigger = CombinedStateTrigger(
timer_trigger=TimerTrigger(interval=300), # Every 5 minutes
condition=lambda: is_currently_streaming() and get_viewer_count() > 10,
name="streaming_monitor"
)
Active Trigger Management¶
Active triggers need lifecycle management:
# In your module's register_actions method
from starstreamer.core.active_triggers import ActiveTriggerManager
async def register_active_triggers(active_trigger_manager: ActiveTriggerManager):
"""Register all active triggers for this module"""
# Create triggers
heartbeat = TimerTrigger(interval=300, name="heartbeat")
cleanup = QueryTrigger(
query="SELECT COUNT(*) FROM expired_items",
condition=lambda results: results[0][0] > 0,
check_interval=3600,
name="cleanup"
)
# Register with handlers
await active_trigger_manager.register_trigger(heartbeat, heartbeat_monitor)
await active_trigger_manager.register_trigger(cleanup, cleanup_expired_items)
Active Trigger Handlers¶
Active triggers execute handlers registered with the active_trigger.execute event type. The ActiveTriggerManager creates synthetic events with this type and calls handlers directly:
@on_event("active_trigger.execute")
async def handle_any_active_trigger(event, logger):
"""Handle active trigger executions"""
trigger_type = event.data.get("trigger_type")
trigger_name = event.data.get("trigger", {}).get("name", "unknown")
logger.info(f"Active trigger executed: {trigger_type}:{trigger_name}")
# Route to specific handlers based on trigger
if trigger_name == "heartbeat":
await handle_heartbeat(event)
elif trigger_name == "cleanup":
await handle_cleanup(event)
Filters¶
Filters are functions that determine if a handler should process an event. Unlike triggers, filters are more general-purpose.
Built-in Filters¶
from starstreamer import filter
from starstreamer.events.filters import TwitchFilters
@on_event("twitch.chat.message")
@filter(TwitchFilters.not_bot())
async def human_only(event):
"""Ignores bot messages"""
pass
@on_event("twitch.chat.message")
@filter(TwitchFilters.subscriber_only())
async def sub_only(event):
"""Subscriber messages only"""
pass
@on_event("twitch.chat.message")
@filter(TwitchFilters.min_sub_months(1))
async def established_subscribers(event):
"""Subscribers with at least 1 month"""
pass
Custom Filters¶
Create your own filter functions:
def is_prime_user(event):
"""Check if user has Prime Gaming"""
badges = event.data.get('badges', {})
return 'prime' in badges
@on_event("twitch.chat.message")
@filter(is_prime_user)
async def prime_perks(event, twitch):
"""Special perks for Prime members"""
pass
# Lambda filters
@filter(lambda e: len(e.data['message']) > 100)
async def long_messages(event):
"""Only process long messages"""
pass
Combining Filters¶
@on_event("twitch.chat.message")
@filter(TwitchFilters.not_bot())
@filter(TwitchFilters.subscriber_only())
@filter(lambda e: e.data.get('user', {}).get('username') != 'ignored_user')
async def filtered_handler(event):
"""Multiple filters - ALL must pass"""
pass
Creating Custom Triggers¶
Extend the base Trigger class:
from starstreamer.triggers.base import Trigger
from starstreamer.runtime.types import Event
class TimeTrigger(Trigger):
"""Only runs during specified hours"""
def __init__(self, start_hour: int, end_hour: int):
self.start_hour = start_hour
self.end_hour = end_hour
async def check(self, event: Event) -> bool:
"""Check if current hour is in range"""
from datetime import datetime
current_hour = datetime.now().hour
return self.start_hour <= current_hour < self.end_hour
# Usage
@on_event("twitch.chat.message")
@trigger(TimeTrigger(18, 23)) # 6 PM to 11 PM only
async def evening_command(event, twitch):
"""Only works during evening hours"""
pass
Trigger Context¶
Access trigger results in handlers:
@on_event("twitch.chat.message")
@trigger(RegexTrigger(r"!timer\s+(\d+)"))
async def timer_command(event, twitch):
"""Access regex match groups"""
# Trigger adds match to event context
match = event.context.get('regex_match')
seconds = int(match.group(1))
await asyncio.sleep(seconds)
await twitch.send_message(f"Timer for {seconds}s finished!")
Performance Considerations¶
Trigger Order Matters¶
Place fastest triggers first:
# Good - fast checks first
@trigger(CommandTrigger("!command")) # Fast string check
@trigger(ModOnlyTrigger()) # Badge check
@trigger(CooldownTrigger(60)) # State lookup
# Avoid - slow checks first
@trigger(CooldownTrigger(60)) # State lookup first
@trigger(ModOnlyTrigger())
@trigger(CommandTrigger("!command"))
Cache Expensive Checks¶
from functools import lru_cache
@lru_cache(maxsize=128)
def is_special_user(username: str) -> bool:
"""Cache expensive database lookups"""
# Database query here
return result
@filter(lambda e: is_special_user(e.data['user']['username']))
async def special_handler(event):
pass
Common Patterns¶
Command with Arguments¶
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!game"))
async def set_game(event, twitch):
"""!game <title> - Set stream game"""
parts = event.data['message'].split(maxsplit=1)
if len(parts) > 1:
game_title = parts[1]
await twitch.set_game(game_title)
await twitch.send_message(f"Game set to: {game_title}")
else:
await twitch.send_message("Usage: !game <title>")
Tiered Access¶
# Everyone
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!commands"))
async def list_commands(event, twitch):
await twitch.send_message("Commands: !help, !discord, !schedule")
# Subscribers only
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!songrequest"))
@filter(TwitchFilters.is_subscriber())
async def song_request(event, twitch):
"""Sub-only song requests"""
pass
# Mods only
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!settitle"))
@trigger(ModOnlyTrigger())
async def set_title(event, twitch):
"""Mod-only title change"""
pass
Anti-Spam System¶
@on_event("twitch.chat.message")
@trigger(CooldownTrigger(cooldown_seconds=2, per_user=True))
@filter(lambda e: len(e.data['message']) < 500)
async def chat_filter(event, twitch):
"""Basic spam protection"""
# Check for spam patterns
message = event.data['message'].lower()
if message.count('!') > 10 or 'spam' * 5 in message:
username = event.data['user']['username']
await twitch.timeout_user(username, 60)
await twitch.send_message(f"@{username} timed out for spam")
Debugging Triggers¶
Enable trigger debugging:
import logging
logging.getLogger('starstreamer.triggers').setLevel(logging.DEBUG)
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!test"))
async def test_command(event, logger):
logger.debug(f"Trigger fired for: {event.data['message']}")
Next Steps¶
- Learn about Dependency Injection
- Explore Chat Commands
- Learn about the Module System