Skip to content

Triggers & Filters

Triggers and filters provide powerful ways to control when your handlers execute. They allow you to create specific conditions for your actions without complex if/else logic.

Implementation Status: 🚧 Partially Implemented - Core triggers (Command, Cooldown, Mod/Sub/VIP) are available. Advanced triggers (Keyword, Regex, User) are planned for future releases.

Triggers

Triggers are decorators that define activation conditions for handlers. They check if an event matches specific criteria before allowing the handler to run.

Command Trigger

The most common trigger - responds to chat commands:

from starstreamer import on_event
from starstreamer.triggers import trigger, CommandTrigger

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!help"))
async def help_command(event, twitch):
    """Responds only to !help command"""
    await twitch.send_message("Available commands: !help, !discord, !socials")

Command Options

# Basic command (case-insensitive by default)
@trigger(CommandTrigger("!help"))

# Command without prefix (will match !command)
@trigger(CommandTrigger("help"))

# Note: Advanced features like aliases and custom prefixes are planned for future implementation

Keyword Trigger

Respond to specific keywords in chat messages:

from starstreamer.triggers import KeywordTrigger, trigger

@on_event("twitch.chat.message")
@trigger(KeywordTrigger("discord", case_sensitive=False))
async def discord_mention(event, twitch):
    """Responds when someone mentions discord"""
    await twitch.send_message("Join our Discord: discord.gg/example")

# Multiple keywords
@on_event("twitch.chat.message")
@trigger(KeywordTrigger("discord", "community", "server", case_sensitive=False))
async def community_mention(event, twitch):
    """Responds to any community-related keywords"""
    await twitch.send_message("Join our community!")

Cooldown Trigger

Prevents spam by limiting execution frequency:

from starstreamer.triggers import CooldownTrigger

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!clip"))
@trigger(CooldownTrigger(cooldown_seconds=30))
async def create_clip(event, twitch):
    """Can only be used once every 30 seconds"""
    await twitch.create_clip()
    await twitch.send_message("Clip created!")

Cooldown Scopes

# Per-user cooldown (default)
@trigger(CooldownTrigger(cooldown_seconds=60, per_user=True))

# Global cooldown (affects everyone)
@trigger(CooldownTrigger(cooldown_seconds=120, per_user=False))

# Custom cooldown message
@trigger(CooldownTrigger(
    cooldown_seconds=30, 
    per_user=True,
    message="Please wait {remaining:.0f}s before using this command again!"
))

Moderator-Only Trigger

Restricts commands to moderators:

from starstreamer.triggers import ModOnlyTrigger

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!timeout"))
@trigger(ModOnlyTrigger())
async def timeout_user(event, twitch):
    """Only moderators can use this command"""
    # Parse target user from message
    target = event.data['message'].split()[1]
    await twitch.timeout_user(target, duration=60)

Regex Trigger

Use regex patterns to match complex message formats:

from starstreamer.triggers import RegexTrigger
import re

@on_event("twitch.chat.message")
@trigger(RegexTrigger(r"!remind\s+(\d+)\s+(.+)", flags=re.IGNORECASE))
async def reminder_command(event, twitch):
    """Matches: !remind 5 check the oven"""
    # Access regex match results from trigger data
    trigger_data = getattr(event, 'trigger_data', {})
    groups = trigger_data.get('groups', [])
    if len(groups) >= 2:
        minutes = int(groups[0])
        message = groups[1]
        # Set reminder logic here

User Triggers

Restrict commands to specific users by username or user ID:

from starstreamer.triggers import UserTrigger

# Specific users only (allow mode)
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!admin"))
@trigger(UserTrigger(["streamer", "trusted_mod"], mode="allow"))
async def admin_command(event, twitch):
    """Only specific users can use this"""
    pass

# Exclude specific users (deny mode)
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!public"))
@trigger(UserTrigger(["banned_user", "spam_bot"], mode="deny"))
async def public_command(event, twitch):
    """Everyone except banned users can use this"""
    pass

Subscriber/VIP Triggers

Access control by viewer status:

from starstreamer.triggers import SubscriberOnlyTrigger, VIPOnlyTrigger

# Subscribers only
@trigger(SubscriberOnlyTrigger())
async def sub_command(event, twitch):
    """Subscribers only"""
    pass

# VIPs only  
@trigger(VIPOnlyTrigger())
async def vip_command(event, twitch):
    """VIPs only"""
    pass

# Alternative using filters
@filter(TwitchFilters.subscriber_only())
async def sub_command_alt(event, twitch):
    """Subscribers only using filter"""
    pass

Combining Triggers

Stack multiple triggers for complex conditions:

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!ban"))
@trigger(ModOnlyTrigger())
@trigger(CooldownTrigger(cooldown_seconds=5))
async def ban_command(event, twitch):
    """Mod-only ban command with cooldown"""
    # All triggers must pass for handler to execute
    pass

Active Triggers

New in v0.3.0 ✨ Active Triggers - Time-based and state-based triggers that run independently of events.

Active triggers are fundamentally different from event-based triggers. Instead of reacting to events like chat messages, they actively monitor conditions and execute handlers on schedules or when states change.

Timer Trigger

Execute handlers at regular intervals:

from starstreamer import on_event
from starstreamer.triggers import TimerTrigger

@on_event("active_trigger.execute")
async def heartbeat_monitor(event, logger):
    """Runs every 5 minutes"""
    if event.data.get("trigger_type") == "TimerTrigger":
        logger.info("💓 Bot is alive and monitoring")

# Register the timer (typically done in module initialization)
from starstreamer.core.active_triggers import ActiveTriggerManager

timer = TimerTrigger(
    interval=300,              # 5 minutes
    start_immediately=False,   # Wait for first interval
    align_to_minute=True,      # Align to minute boundaries
    name="heartbeat"
)

Timer Options

# Basic timer - every 30 seconds
TimerTrigger(interval=30)

# Start immediately, then repeat every hour
TimerTrigger(interval=3600, start_immediately=True)

# Align to minute boundaries (useful for scheduling)
TimerTrigger(interval=900, align_to_minute=True)  # Every 15 minutes on :00, :15, :30, :45

# Custom name for debugging
TimerTrigger(interval=60, name="cleanup_timer")

Query Trigger

Monitor database conditions periodically:

from starstreamer.triggers import QueryTrigger

@on_event("active_trigger.execute")
async def inactive_user_cleanup(event, twitch, logger):
    """Clean up inactive users every hour"""
    if event.data.get("trigger_type") == "QueryTrigger":
        logger.info("🧹 Checking for inactive users")
        # Cleanup logic here

# Database monitoring trigger
query_trigger = QueryTrigger(
    query="SELECT * FROM users WHERE last_seen < datetime('now', '-7 days')",
    condition=lambda results: len(results) > 0,  # Trigger if inactive users found
    check_interval=3600,  # Check every hour
    name="inactive_cleanup"
)

Threshold Trigger

React to metrics crossing thresholds with hysteresis:

from starstreamer.triggers import ThresholdTrigger

@on_event("active_trigger.execute") 
async def viewer_milestone_alert(event, twitch, logger):
    """Celebrate viewer milestones"""
    if event.data.get("trigger_type") == "ThresholdTrigger":
        milestone = event.data.get("threshold")
        current = event.data.get("metric_value")
        await twitch.send_message(f"🎉 {current} viewers! Thank you!")

def get_viewer_count():
    """Your function to get current viewer count"""
    return current_viewer_count

# Milestone trigger with hysteresis to prevent spam
threshold_trigger = ThresholdTrigger(
    metric_func=get_viewer_count,
    thresholds=[50, 100, 500, 1000],   # Celebrate these milestones
    direction="rising",                 # Only on increases
    hysteresis=0.1,                    # 10% dead zone
    check_interval=60,                 # Check every minute
    name="viewer_milestones"
)

Condition Trigger

Execute when boolean conditions become true:

from starstreamer.triggers import ConditionTrigger
from datetime import datetime

@on_event("active_trigger.execute")
async def evening_reminder(event, twitch):
    """Remind about evening activities"""
    if event.data.get("trigger_type") == "ConditionTrigger":
        await twitch.send_message("🌙 Good evening! Stream starting soon!")

# Time-based condition
evening_trigger = ConditionTrigger(
    condition=lambda: 19 <= datetime.now().hour <= 23,  # 7 PM to 11 PM
    check_interval=600,   # Check every 10 minutes
    once=True,           # Only fire once when condition becomes true
    name="evening_reminder"
)

Combined State Trigger

Combine timer intervals with state conditions:

from starstreamer.triggers import CombinedStateTrigger

# Only check expensive condition every 5 minutes, and only during streaming hours
combined_trigger = CombinedStateTrigger(
    timer_trigger=TimerTrigger(interval=300),  # Every 5 minutes
    condition=lambda: is_currently_streaming() and get_viewer_count() > 10,
    name="streaming_monitor"
)

Active Trigger Management

Active triggers need lifecycle management:

# In your module's register_actions method
from starstreamer.core.active_triggers import ActiveTriggerManager

async def register_active_triggers(active_trigger_manager: ActiveTriggerManager):
    """Register all active triggers for this module"""

    # Create triggers
    heartbeat = TimerTrigger(interval=300, name="heartbeat")
    cleanup = QueryTrigger(
        query="SELECT COUNT(*) FROM expired_items",
        condition=lambda results: results[0][0] > 0,
        check_interval=3600,
        name="cleanup"
    )

    # Register with handlers
    await active_trigger_manager.register_trigger(heartbeat, heartbeat_monitor)
    await active_trigger_manager.register_trigger(cleanup, cleanup_expired_items)

Active Trigger Handlers

Active triggers execute handlers registered with the active_trigger.execute event type. The ActiveTriggerManager creates synthetic events with this type and calls handlers directly:

@on_event("active_trigger.execute")
async def handle_any_active_trigger(event, logger):
    """Handle active trigger executions"""
    trigger_type = event.data.get("trigger_type")
    trigger_name = event.data.get("trigger", {}).get("name", "unknown")

    logger.info(f"Active trigger executed: {trigger_type}:{trigger_name}")

    # Route to specific handlers based on trigger
    if trigger_name == "heartbeat":
        await handle_heartbeat(event)
    elif trigger_name == "cleanup":
        await handle_cleanup(event)

Filters

Filters are functions that determine if a handler should process an event. Unlike triggers, filters are more general-purpose.

Built-in Filters

from starstreamer import filter
from starstreamer.events.filters import TwitchFilters

@on_event("twitch.chat.message")
@filter(TwitchFilters.not_bot())
async def human_only(event):
    """Ignores bot messages"""
    pass

@on_event("twitch.chat.message")
@filter(TwitchFilters.subscriber_only())
async def sub_only(event):
    """Subscriber messages only"""
    pass

@on_event("twitch.chat.message")
@filter(TwitchFilters.min_sub_months(1))
async def established_subscribers(event):
    """Subscribers with at least 1 month"""
    pass

Custom Filters

Create your own filter functions:

def is_prime_user(event):
    """Check if user has Prime Gaming"""
    badges = event.data.get('badges', {})
    return 'prime' in badges

@on_event("twitch.chat.message")
@filter(is_prime_user)
async def prime_perks(event, twitch):
    """Special perks for Prime members"""
    pass

# Lambda filters
@filter(lambda e: len(e.data['message']) > 100)
async def long_messages(event):
    """Only process long messages"""
    pass

Combining Filters

@on_event("twitch.chat.message")
@filter(TwitchFilters.not_bot())
@filter(TwitchFilters.subscriber_only())
@filter(lambda e: e.data.get('user', {}).get('username') != 'ignored_user')
async def filtered_handler(event):
    """Multiple filters - ALL must pass"""
    pass

Creating Custom Triggers

Extend the base Trigger class:

from starstreamer.triggers.base import Trigger
from starstreamer.runtime.types import Event

class TimeTrigger(Trigger):
    """Only runs during specified hours"""

    def __init__(self, start_hour: int, end_hour: int):
        self.start_hour = start_hour
        self.end_hour = end_hour

    async def check(self, event: Event) -> bool:
        """Check if current hour is in range"""
        from datetime import datetime
        current_hour = datetime.now().hour
        return self.start_hour <= current_hour < self.end_hour

# Usage
@on_event("twitch.chat.message")
@trigger(TimeTrigger(18, 23))  # 6 PM to 11 PM only
async def evening_command(event, twitch):
    """Only works during evening hours"""
    pass

Trigger Context

Access trigger results in handlers:

@on_event("twitch.chat.message")
@trigger(RegexTrigger(r"!timer\s+(\d+)"))
async def timer_command(event, twitch):
    """Access regex match groups"""
    # Trigger adds match to event context
    match = event.context.get('regex_match')
    seconds = int(match.group(1))
    await asyncio.sleep(seconds)
    await twitch.send_message(f"Timer for {seconds}s finished!")

Performance Considerations

Trigger Order Matters

Place fastest triggers first:

# Good - fast checks first
@trigger(CommandTrigger("!command"))  # Fast string check
@trigger(ModOnlyTrigger())           # Badge check
@trigger(CooldownTrigger(60))        # State lookup

# Avoid - slow checks first
@trigger(CooldownTrigger(60))        # State lookup first
@trigger(ModOnlyTrigger())          
@trigger(CommandTrigger("!command"))

Cache Expensive Checks

from functools import lru_cache

@lru_cache(maxsize=128)
def is_special_user(username: str) -> bool:
    """Cache expensive database lookups"""
    # Database query here
    return result

@filter(lambda e: is_special_user(e.data['user']['username']))
async def special_handler(event):
    pass

Common Patterns

Command with Arguments

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!game"))
async def set_game(event, twitch):
    """!game <title> - Set stream game"""
    parts = event.data['message'].split(maxsplit=1)
    if len(parts) > 1:
        game_title = parts[1]
        await twitch.set_game(game_title)
        await twitch.send_message(f"Game set to: {game_title}")
    else:
        await twitch.send_message("Usage: !game <title>")

Tiered Access

# Everyone
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!commands"))
async def list_commands(event, twitch):
    await twitch.send_message("Commands: !help, !discord, !schedule")

# Subscribers only
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!songrequest"))
@filter(TwitchFilters.is_subscriber())
async def song_request(event, twitch):
    """Sub-only song requests"""
    pass

# Mods only
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!settitle"))
@trigger(ModOnlyTrigger())
async def set_title(event, twitch):
    """Mod-only title change"""
    pass

Anti-Spam System

@on_event("twitch.chat.message")
@trigger(CooldownTrigger(cooldown_seconds=2, per_user=True))
@filter(lambda e: len(e.data['message']) < 500)
async def chat_filter(event, twitch):
    """Basic spam protection"""
    # Check for spam patterns
    message = event.data['message'].lower()
    if message.count('!') > 10 or 'spam' * 5 in message:
        username = event.data['user']['username']
        await twitch.timeout_user(username, 60)
        await twitch.send_message(f"@{username} timed out for spam")

Debugging Triggers

Enable trigger debugging:

import logging
logging.getLogger('starstreamer.triggers').setLevel(logging.DEBUG)

@on_event("twitch.chat.message")
@trigger(CommandTrigger("!test"))
async def test_command(event, logger):
    logger.debug(f"Trigger fired for: {event.data['message']}")

Next Steps