Skip to content

Active Triggers

New in v0.3.0Active Triggers provide time-based and state-based automation that runs independently of external events.

Active triggers are self-scheduling components that execute handlers on their own timeline. The ActiveTriggerManager maintains these triggers as background tasks, executing registered handlers directly with full dependency injection support.

Active triggers fundamentally change how you can automate your stream. Instead of only reacting to chat messages, follows, or donations, you can now create handlers that:

  • Execute on schedules (every X seconds/minutes/hours)
  • Monitor database conditions periodically
  • React to metrics crossing thresholds
  • Trigger based on time-based conditions
  • Combine timers with state monitoring

Key Features

Timer-Based Execution

  • Precise interval scheduling with drift correction
  • Minute boundary alignment for clean scheduling
  • Immediate start or delayed start options

📊 State Monitoring

  • Database query monitoring with custom conditions
  • Metric threshold monitoring with hysteresis
  • Boolean condition checking with edge detection

🎯 Smart Execution

  • Hysteresis prevents rapid re-triggering
  • Once-only execution for state changes
  • Lifecycle management with proper cleanup

🔧 Full Integration

  • Works with existing dependency injection
  • Handlers use familiar @on_event decorator pattern
  • Complete test coverage and type safety

Quick Start

1. Create a Timer Handler

from starstreamer import on_event
from starstreamer.triggers import TimerTrigger

@on_event("active_trigger.execute")
async def stream_heartbeat(event, logger):
    """Monitor bot health every 5 minutes"""
    if event.data.get("trigger_type") == "TimerTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "heartbeat":
            logger.info("💓 Bot heartbeat - all systems operational")

2. Register the Timer

# In your module's initialization
from starstreamer.core.active_triggers import ActiveTriggerManager

async def setup_timers(active_trigger_manager: ActiveTriggerManager):
    # Create timer - runs every 5 minutes
    heartbeat_timer = TimerTrigger(
        interval=300,              # 5 minutes in seconds
        start_immediately=False,   # Wait for first interval
        name="heartbeat"          # Identifier for filtering
    )

    # Register timer with handler
    await active_trigger_manager.register_trigger(heartbeat_timer, stream_heartbeat)

Timer Triggers

Execute handlers at regular intervals with precision timing.

Basic Usage

from starstreamer.triggers import TimerTrigger

# Simple timer - every 30 seconds
timer = TimerTrigger(interval=30)

# Timer with custom name for identification
timer = TimerTrigger(interval=60, name="minute_check")

# Start immediately, then repeat
timer = TimerTrigger(interval=300, start_immediately=True)

Minute Alignment

For clean scheduling, align timers to minute boundaries:

# Runs at :00, :15, :30, :45 every hour
timer = TimerTrigger(
    interval=900,           # 15 minutes
    align_to_minute=True,   # Align to minute boundaries
    name="quarter_hour"
)

# For intervals >= 60 seconds, aligns to next minute boundary
timer = TimerTrigger(
    interval=3600,          # 1 hour  
    align_to_minute=True,   # Will run at :00 of each hour
    name="hourly_report"
)

Practical Examples

@on_event("active_trigger.execute")
async def uptime_reminder(event, twitch, logger):
    """Announce stream uptime every 30 minutes"""
    if event.data.get("trigger_type") == "TimerTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "uptime_announce":
            uptime = get_stream_uptime()  # Your function
            await twitch.send_message(f"🕒 Stream uptime: {uptime}")

@on_event("active_trigger.execute") 
async def save_stream_stats(event, database, logger):
    """Save stream statistics every 5 minutes"""
    if event.data.get("trigger_type") == "TimerTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "stats_save":
            stats = gather_current_stats()  # Your function
            await database.execute(
                "INSERT INTO stream_stats (timestamp, viewers, followers) VALUES (?, ?, ?)",
                (stats.timestamp, stats.viewers, stats.followers)
            )
            logger.debug(f"Saved stats: {stats}")

Query Triggers

Monitor database conditions and execute when criteria are met.

Basic Database Monitoring

from starstreamer.triggers import QueryTrigger

@on_event("active_trigger.execute")
async def process_pending_rewards(event, logger):
    """Process pending channel point rewards"""
    if event.data.get("trigger_type") == "QueryTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "pending_rewards":
            logger.info("Processing pending channel point rewards...")
            # Your reward processing logic here

# Monitor for pending rewards every minute
rewards_trigger = QueryTrigger(
    query="SELECT COUNT(*) FROM pending_rewards WHERE processed = 0",
    condition=lambda results: results[0][0] > 0,  # Trigger if unprocessed rewards exist
    check_interval=60,  # Check every minute
    name="pending_rewards"
)

Advanced Query Conditions

# Complex condition checking
cleanup_trigger = QueryTrigger(
    query="""
        SELECT user_id, last_seen 
        FROM users 
        WHERE last_seen < datetime('now', '-30 days')
        AND status = 'active'
    """,
    condition=lambda results: len(results) >= 10,  # Cleanup when 10+ inactive users
    check_interval=3600,  # Check hourly
    name="user_cleanup"
)

# Multiple table monitoring
analytics_trigger = QueryTrigger(
    query="""
        SELECT 
            COUNT(*) as new_followers,
            AVG(chat_activity) as avg_activity
        FROM daily_stats 
        WHERE date = date('now')
    """,
    condition=lambda results: (
        results[0][0] > 50 and  # More than 50 new followers
        results[0][1] > 100     # High chat activity
    ),
    check_interval=300,  # Check every 5 minutes
    name="growth_milestone"
)

Threshold Triggers

React to metrics crossing thresholds with built-in hysteresis to prevent spam.

Viewer Count Monitoring

from starstreamer.triggers import ThresholdTrigger

@on_event("active_trigger.execute")
async def viewer_milestone_celebration(event, twitch, logger):
    """Celebrate viewer milestones"""
    if event.data.get("trigger_type") == "ThresholdTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "viewer_milestones":
            threshold = event.data.get("threshold")
            current_value = event.data.get("metric_value")

            await twitch.send_message(f"🎉 Amazing! We hit {current_value} viewers! Thank you all!")
            logger.info(f"Viewer milestone reached: {threshold} (current: {current_value})")

def get_current_viewers():
    """Get current viewer count from your API"""
    # Your implementation here
    return current_viewer_count

# Celebrate milestones as viewers increase
viewer_trigger = ThresholdTrigger(
    metric_func=get_current_viewers,
    thresholds=[25, 50, 100, 250, 500, 1000],  # Celebration points
    direction="rising",          # Only celebrate increases
    hysteresis=0.05,            # 5% dead zone prevents spam
    check_interval=30,          # Check every 30 seconds
    name="viewer_milestones"
)

Performance Monitoring

@on_event("active_trigger.execute")
async def performance_alert(event, logger):
    """Alert on high CPU usage"""
    if event.data.get("trigger_type") == "ThresholdTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "cpu_monitor":
            cpu_percent = event.data.get("metric_value")
            logger.warning(f"🚨 High CPU usage detected: {cpu_percent:.1f}%")

def get_cpu_usage():
    """Get current CPU usage percentage"""
    import psutil
    return psutil.cpu_percent(interval=1)

# Monitor CPU usage with hysteresis
cpu_trigger = ThresholdTrigger(
    metric_func=get_cpu_usage,
    thresholds=[80.0, 90.0, 95.0],  # Warning levels
    direction="rising",             # Alert on high usage
    hysteresis=0.1,                # 10% hysteresis  
    check_interval=10,             # Check every 10 seconds
    name="cpu_monitor"
)

Bidirectional Monitoring

# Monitor both rising and falling values
follower_trigger = ThresholdTrigger(
    metric_func=get_follower_count,
    thresholds=[100, 500, 1000, 5000],
    direction="both",           # Trigger on both increases and decreases
    hysteresis=0.02,           # 2% hysteresis
    check_interval=60,         # Check every minute
    name="follower_tracker"
)

Condition Triggers

Execute handlers when boolean conditions become true.

Time-Based Conditions

from starstreamer.triggers import ConditionTrigger
from datetime import datetime

@on_event("active_trigger.execute")
async def stream_schedule_reminder(event, twitch):
    """Remind about stream schedule during prime time"""
    if event.data.get("trigger_type") == "ConditionTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "prime_time":
            await twitch.send_message("🕒 Prime time! Stream starts in 30 minutes!")

# Trigger during prime viewing hours
prime_time_trigger = ConditionTrigger(
    condition=lambda: datetime.now().hour in [19, 20, 21],  # 7-9 PM
    check_interval=300,     # Check every 5 minutes
    once=True,             # Only fire once when condition becomes true
    name="prime_time"
)

Stream State Conditions

@on_event("active_trigger.execute")
async def low_activity_alert(event, twitch, logger):
    """Alert when chat activity is low during stream"""
    if event.data.get("trigger_type") == "ConditionTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "low_activity":
            await twitch.send_message("Chat seems quiet! How's everyone doing? 💬")
            logger.info("Low activity alert triggered")

def is_chat_inactive():
    """Check if chat has been inactive"""
    # Your logic to check recent chat activity
    last_message_time = get_last_chat_message_time()
    return (datetime.now() - last_message_time).seconds > 300  # 5 minutes

# Monitor for chat inactivity during streaming
activity_trigger = ConditionTrigger(
    condition=lambda: is_streaming() and is_chat_inactive(),
    check_interval=60,      # Check every minute
    once=False,            # Can trigger multiple times
    name="low_activity"
)

Edge Detection

# Trigger only on rising edge (false -> true transition)
stream_start_trigger = ConditionTrigger(
    condition=lambda: is_live_on_twitch(),
    check_interval=30,      # Check every 30 seconds
    once=True,             # Only on state change
    name="stream_start"
)

# Continuous execution while condition is true
maintenance_trigger = ConditionTrigger(
    condition=lambda: is_maintenance_mode(),
    check_interval=10,      # Check every 10 seconds  
    once=False,            # Keep executing while true
    name="maintenance_mode"
)

Combined State Triggers

Combine timer intervals with state conditions for efficient monitoring.

Smart Monitoring

from starstreamer.triggers import CombinedStateTrigger

@on_event("active_trigger.execute")
async def smart_stream_monitor(event, logger):
    """Only monitor during streaming with viewers"""
    if event.data.get("trigger_type") == "CombinedStateTrigger":
        trigger = event.data.get("trigger")
        if trigger and trigger.name == "smart_monitor":
            # Perform expensive monitoring only when needed
            metrics = gather_detailed_metrics()
            logger.info(f"Stream metrics: {metrics}")

# Combine timer with condition - only check expensive metrics during active streaming
smart_trigger = CombinedStateTrigger(
    timer_trigger=TimerTrigger(interval=120),  # Every 2 minutes
    condition=lambda: is_streaming() and get_viewer_count() > 5,  # Only when active
    name="smart_monitor"
)

Resource-Efficient Monitoring

# Check database health, but only during business hours
db_health_trigger = CombinedStateTrigger(
    timer_trigger=TimerTrigger(interval=300),  # Every 5 minutes
    condition=lambda: 9 <= datetime.now().hour <= 17,  # Business hours only
    name="db_health_check"
)

# Monitor expensive metrics only when streaming
performance_trigger = CombinedStateTrigger(
    timer_trigger=TimerTrigger(interval=60),   # Every minute
    condition=lambda: (
        is_streaming() and 
        get_viewer_count() > 10 and 
        datetime.now().hour >= 18  # Evening streams only
    ),
    name="evening_performance"
)

Module Integration

Complete Module Example

# modules/monitoring/module.py
from modules.base import BaseModule
from starstreamer.core.active_triggers import ActiveTriggerManager

class MonitoringModule(BaseModule):
    @property
    def module_name(self) -> str:
        return "monitoring"

    async def register_actions(self) -> None:
        # Import handlers to register them
        from modules.monitoring.actions import monitors

    async def on_enable(self) -> None:
        """Set up active triggers when module is enabled"""
        # Get the active trigger manager from DI
        active_trigger_manager = self.container.get(ActiveTriggerManager)

        # Register all monitoring triggers
        await self._setup_monitoring_triggers(active_trigger_manager)

    async def _setup_monitoring_triggers(self, manager: ActiveTriggerManager):
        """Register all monitoring triggers"""
        from starstreamer.triggers import TimerTrigger, ThresholdTrigger

        # System health check every 5 minutes
        health_timer = TimerTrigger(interval=300, name="health_check")
        await manager.register_trigger(health_timer, monitors.system_health_check)

        # Viewer milestone celebrations
        viewer_trigger = ThresholdTrigger(
            metric_func=monitors.get_viewer_count,
            thresholds=[50, 100, 500, 1000],
            direction="rising",
            hysteresis=0.05,
            check_interval=30,
            name="viewer_milestones"
        )
        await manager.register_trigger(viewer_trigger, monitors.celebrate_milestones)

Handler Organization

# modules/monitoring/actions/monitors.py
from starstreamer import on_event

@on_event("active_trigger.execute")
async def system_health_check(event, logger):
    """Monitor system health"""
    if _is_trigger(event, "TimerTrigger", "health_check"):
        # Health check logic
        logger.info("💓 System health check completed")

@on_event("active_trigger.execute") 
async def celebrate_milestones(event, twitch, logger):
    """Celebrate viewer milestones"""
    if _is_trigger(event, "ThresholdTrigger", "viewer_milestones"):
        threshold = event.data.get("threshold")
        current = event.data.get("metric_value")
        await twitch.send_message(f"🎉 {current} viewers! Thank you!")

def _is_trigger(event, trigger_type: str, trigger_name: str) -> bool:
    """Helper to check trigger type and name"""
    if event.data.get("trigger_type") != trigger_type:
        return False
    trigger = event.data.get("trigger")
    return trigger and trigger.name == trigger_name

def get_viewer_count() -> int:
    """Get current viewer count"""
    # Your implementation
    return current_viewers

Best Practices

Error Handling

@on_event("active_trigger.execute")
async def robust_handler(event, logger):
    """Handler with proper error handling"""
    try:
        if _is_trigger(event, "TimerTrigger", "my_timer"):
            # Your logic here
            result = await perform_operation()
            logger.info(f"Operation completed: {result}")
    except Exception as e:
        logger.error(f"Error in timer handler: {e}", exc_info=True)
        # Don't re-raise - let the trigger continue running

Performance Considerations

# Good: Fast condition check
def is_prime_time() -> bool:
    """Quick time check"""
    return 19 <= datetime.now().hour <= 22

# Avoid: Expensive operations in conditions
def expensive_condition() -> bool:
    """Don't do this - too expensive for frequent checking"""
    result = database.execute("SELECT COUNT(*) FROM complex_query")
    return result[0][0] > threshold

Resource Management

# Use appropriate check intervals
TimerTrigger(interval=30)           # 30s - for real-time monitoring
TimerTrigger(interval=300)          # 5min - for regular checks  
TimerTrigger(interval=3600)         # 1hr - for background maintenance

# Use hysteresis for noisy metrics
ThresholdTrigger(
    metric_func=get_noisy_metric,
    thresholds=[100],
    hysteresis=0.1,  # 10% dead zone prevents flapping
    check_interval=60
)

Debugging

# Enable debug logging for active triggers
import logging
logging.getLogger('starstreamer.triggers.active').setLevel(logging.DEBUG)

# Add detailed logging to handlers
@on_event("active_trigger.execute")
async def debug_handler(event, logger):
    """Handler with debug information"""
    trigger_type = event.data.get("trigger_type")
    trigger = event.data.get("trigger")
    trigger_name = trigger.name if trigger else "unknown"

    logger.debug(f"Active trigger fired: {trigger_type}:{trigger_name}")
    logger.debug(f"Event data: {event.data}")

    # Your handler logic here

Migration from Event-Based

If you have existing event-based handlers that you want to convert to active triggers:

Before (Event-Based)

# This only runs when chat messages occur
@on_event("twitch.chat.message") 
@trigger(CommandTrigger("!uptime"))
async def uptime_command(event, twitch):
    uptime = get_stream_uptime()
    await twitch.send_message(f"Stream uptime: {uptime}")

After (Active Trigger)

# This runs automatically every 30 minutes
@on_event("active_trigger.execute")
async def uptime_announcement(event, twitch):
    if _is_trigger(event, "TimerTrigger", "uptime_announce"):
        uptime = get_stream_uptime()
        await twitch.send_message(f"📺 Stream uptime: {uptime}")

# Register the timer
uptime_timer = TimerTrigger(interval=1800, name="uptime_announce")  # 30 minutes

Troubleshooting

Common Issues

Triggers not firing: - Check that the ActiveTriggerManager is properly initialized in main.py - Verify trigger registration in module's on_enable method - Check logs for registration errors

High CPU usage: - Reduce check frequency for expensive conditions - Use appropriate intervals (not too frequent) - Optimize metric functions

Triggers firing too often: - Add hysteresis to ThresholdTrigger - Use once=True for ConditionTrigger - Check condition logic for unexpected behavior

Memory leaks: - Ensure proper trigger cleanup in module's on_disable - Don't store large objects in trigger callbacks - Use weak references for external resources

Debug Commands

# Check active trigger status
active_trigger_manager = container.get(ActiveTriggerManager)
stats = active_trigger_manager.get_stats()
logger.info(f"Active triggers: {stats}")

# List registered triggers  
triggers = active_trigger_manager.list_triggers()
for trigger_id, trigger_info in triggers.items():
    logger.info(f"Trigger {trigger_id}: {trigger_info}")

Next Steps