Active Triggers¶
New in v0.3.0 ✨ Active Triggers provide time-based and state-based automation that runs independently of external events.
Active triggers are self-scheduling components that execute handlers on their own timeline. The ActiveTriggerManager maintains these triggers as background tasks, executing registered handlers directly with full dependency injection support.
Active triggers fundamentally change how you can automate your stream. Instead of only reacting to chat messages, follows, or donations, you can now create handlers that:
- Execute on schedules (every X seconds/minutes/hours)
- Monitor database conditions periodically
- React to metrics crossing thresholds
- Trigger based on time-based conditions
- Combine timers with state monitoring
Key Features¶
⏰ Timer-Based Execution¶
- Precise interval scheduling with drift correction
- Minute boundary alignment for clean scheduling
- Immediate start or delayed start options
📊 State Monitoring¶
- Database query monitoring with custom conditions
- Metric threshold monitoring with hysteresis
- Boolean condition checking with edge detection
🎯 Smart Execution¶
- Hysteresis prevents rapid re-triggering
- Once-only execution for state changes
- Lifecycle management with proper cleanup
🔧 Full Integration¶
- Works with existing dependency injection
- Handlers use familiar @on_event decorator pattern
- Complete test coverage and type safety
Quick Start¶
1. Create a Timer Handler¶
from starstreamer import on_event
from starstreamer.triggers import TimerTrigger
@on_event("active_trigger.execute")
async def stream_heartbeat(event, logger):
"""Monitor bot health every 5 minutes"""
if event.data.get("trigger_type") == "TimerTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "heartbeat":
logger.info("💓 Bot heartbeat - all systems operational")
2. Register the Timer¶
# In your module's initialization
from starstreamer.core.active_triggers import ActiveTriggerManager
async def setup_timers(active_trigger_manager: ActiveTriggerManager):
# Create timer - runs every 5 minutes
heartbeat_timer = TimerTrigger(
interval=300, # 5 minutes in seconds
start_immediately=False, # Wait for first interval
name="heartbeat" # Identifier for filtering
)
# Register timer with handler
await active_trigger_manager.register_trigger(heartbeat_timer, stream_heartbeat)
Timer Triggers¶
Execute handlers at regular intervals with precision timing.
Basic Usage¶
from starstreamer.triggers import TimerTrigger
# Simple timer - every 30 seconds
timer = TimerTrigger(interval=30)
# Timer with custom name for identification
timer = TimerTrigger(interval=60, name="minute_check")
# Start immediately, then repeat
timer = TimerTrigger(interval=300, start_immediately=True)
Minute Alignment¶
For clean scheduling, align timers to minute boundaries:
# Runs at :00, :15, :30, :45 every hour
timer = TimerTrigger(
interval=900, # 15 minutes
align_to_minute=True, # Align to minute boundaries
name="quarter_hour"
)
# For intervals >= 60 seconds, aligns to next minute boundary
timer = TimerTrigger(
interval=3600, # 1 hour
align_to_minute=True, # Will run at :00 of each hour
name="hourly_report"
)
Practical Examples¶
@on_event("active_trigger.execute")
async def uptime_reminder(event, twitch, logger):
"""Announce stream uptime every 30 minutes"""
if event.data.get("trigger_type") == "TimerTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "uptime_announce":
uptime = get_stream_uptime() # Your function
await twitch.send_message(f"🕒 Stream uptime: {uptime}")
@on_event("active_trigger.execute")
async def save_stream_stats(event, database, logger):
"""Save stream statistics every 5 minutes"""
if event.data.get("trigger_type") == "TimerTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "stats_save":
stats = gather_current_stats() # Your function
await database.execute(
"INSERT INTO stream_stats (timestamp, viewers, followers) VALUES (?, ?, ?)",
(stats.timestamp, stats.viewers, stats.followers)
)
logger.debug(f"Saved stats: {stats}")
Query Triggers¶
Monitor database conditions and execute when criteria are met.
Basic Database Monitoring¶
from starstreamer.triggers import QueryTrigger
@on_event("active_trigger.execute")
async def process_pending_rewards(event, logger):
"""Process pending channel point rewards"""
if event.data.get("trigger_type") == "QueryTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "pending_rewards":
logger.info("Processing pending channel point rewards...")
# Your reward processing logic here
# Monitor for pending rewards every minute
rewards_trigger = QueryTrigger(
query="SELECT COUNT(*) FROM pending_rewards WHERE processed = 0",
condition=lambda results: results[0][0] > 0, # Trigger if unprocessed rewards exist
check_interval=60, # Check every minute
name="pending_rewards"
)
Advanced Query Conditions¶
# Complex condition checking
cleanup_trigger = QueryTrigger(
query="""
SELECT user_id, last_seen
FROM users
WHERE last_seen < datetime('now', '-30 days')
AND status = 'active'
""",
condition=lambda results: len(results) >= 10, # Cleanup when 10+ inactive users
check_interval=3600, # Check hourly
name="user_cleanup"
)
# Multiple table monitoring
analytics_trigger = QueryTrigger(
query="""
SELECT
COUNT(*) as new_followers,
AVG(chat_activity) as avg_activity
FROM daily_stats
WHERE date = date('now')
""",
condition=lambda results: (
results[0][0] > 50 and # More than 50 new followers
results[0][1] > 100 # High chat activity
),
check_interval=300, # Check every 5 minutes
name="growth_milestone"
)
Threshold Triggers¶
React to metrics crossing thresholds with built-in hysteresis to prevent spam.
Viewer Count Monitoring¶
from starstreamer.triggers import ThresholdTrigger
@on_event("active_trigger.execute")
async def viewer_milestone_celebration(event, twitch, logger):
"""Celebrate viewer milestones"""
if event.data.get("trigger_type") == "ThresholdTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "viewer_milestones":
threshold = event.data.get("threshold")
current_value = event.data.get("metric_value")
await twitch.send_message(f"🎉 Amazing! We hit {current_value} viewers! Thank you all!")
logger.info(f"Viewer milestone reached: {threshold} (current: {current_value})")
def get_current_viewers():
"""Get current viewer count from your API"""
# Your implementation here
return current_viewer_count
# Celebrate milestones as viewers increase
viewer_trigger = ThresholdTrigger(
metric_func=get_current_viewers,
thresholds=[25, 50, 100, 250, 500, 1000], # Celebration points
direction="rising", # Only celebrate increases
hysteresis=0.05, # 5% dead zone prevents spam
check_interval=30, # Check every 30 seconds
name="viewer_milestones"
)
Performance Monitoring¶
@on_event("active_trigger.execute")
async def performance_alert(event, logger):
"""Alert on high CPU usage"""
if event.data.get("trigger_type") == "ThresholdTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "cpu_monitor":
cpu_percent = event.data.get("metric_value")
logger.warning(f"🚨 High CPU usage detected: {cpu_percent:.1f}%")
def get_cpu_usage():
"""Get current CPU usage percentage"""
import psutil
return psutil.cpu_percent(interval=1)
# Monitor CPU usage with hysteresis
cpu_trigger = ThresholdTrigger(
metric_func=get_cpu_usage,
thresholds=[80.0, 90.0, 95.0], # Warning levels
direction="rising", # Alert on high usage
hysteresis=0.1, # 10% hysteresis
check_interval=10, # Check every 10 seconds
name="cpu_monitor"
)
Bidirectional Monitoring¶
# Monitor both rising and falling values
follower_trigger = ThresholdTrigger(
metric_func=get_follower_count,
thresholds=[100, 500, 1000, 5000],
direction="both", # Trigger on both increases and decreases
hysteresis=0.02, # 2% hysteresis
check_interval=60, # Check every minute
name="follower_tracker"
)
Condition Triggers¶
Execute handlers when boolean conditions become true.
Time-Based Conditions¶
from starstreamer.triggers import ConditionTrigger
from datetime import datetime
@on_event("active_trigger.execute")
async def stream_schedule_reminder(event, twitch):
"""Remind about stream schedule during prime time"""
if event.data.get("trigger_type") == "ConditionTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "prime_time":
await twitch.send_message("🕒 Prime time! Stream starts in 30 minutes!")
# Trigger during prime viewing hours
prime_time_trigger = ConditionTrigger(
condition=lambda: datetime.now().hour in [19, 20, 21], # 7-9 PM
check_interval=300, # Check every 5 minutes
once=True, # Only fire once when condition becomes true
name="prime_time"
)
Stream State Conditions¶
@on_event("active_trigger.execute")
async def low_activity_alert(event, twitch, logger):
"""Alert when chat activity is low during stream"""
if event.data.get("trigger_type") == "ConditionTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "low_activity":
await twitch.send_message("Chat seems quiet! How's everyone doing? 💬")
logger.info("Low activity alert triggered")
def is_chat_inactive():
"""Check if chat has been inactive"""
# Your logic to check recent chat activity
last_message_time = get_last_chat_message_time()
return (datetime.now() - last_message_time).seconds > 300 # 5 minutes
# Monitor for chat inactivity during streaming
activity_trigger = ConditionTrigger(
condition=lambda: is_streaming() and is_chat_inactive(),
check_interval=60, # Check every minute
once=False, # Can trigger multiple times
name="low_activity"
)
Edge Detection¶
# Trigger only on rising edge (false -> true transition)
stream_start_trigger = ConditionTrigger(
condition=lambda: is_live_on_twitch(),
check_interval=30, # Check every 30 seconds
once=True, # Only on state change
name="stream_start"
)
# Continuous execution while condition is true
maintenance_trigger = ConditionTrigger(
condition=lambda: is_maintenance_mode(),
check_interval=10, # Check every 10 seconds
once=False, # Keep executing while true
name="maintenance_mode"
)
Combined State Triggers¶
Combine timer intervals with state conditions for efficient monitoring.
Smart Monitoring¶
from starstreamer.triggers import CombinedStateTrigger
@on_event("active_trigger.execute")
async def smart_stream_monitor(event, logger):
"""Only monitor during streaming with viewers"""
if event.data.get("trigger_type") == "CombinedStateTrigger":
trigger = event.data.get("trigger")
if trigger and trigger.name == "smart_monitor":
# Perform expensive monitoring only when needed
metrics = gather_detailed_metrics()
logger.info(f"Stream metrics: {metrics}")
# Combine timer with condition - only check expensive metrics during active streaming
smart_trigger = CombinedStateTrigger(
timer_trigger=TimerTrigger(interval=120), # Every 2 minutes
condition=lambda: is_streaming() and get_viewer_count() > 5, # Only when active
name="smart_monitor"
)
Resource-Efficient Monitoring¶
# Check database health, but only during business hours
db_health_trigger = CombinedStateTrigger(
timer_trigger=TimerTrigger(interval=300), # Every 5 minutes
condition=lambda: 9 <= datetime.now().hour <= 17, # Business hours only
name="db_health_check"
)
# Monitor expensive metrics only when streaming
performance_trigger = CombinedStateTrigger(
timer_trigger=TimerTrigger(interval=60), # Every minute
condition=lambda: (
is_streaming() and
get_viewer_count() > 10 and
datetime.now().hour >= 18 # Evening streams only
),
name="evening_performance"
)
Module Integration¶
Complete Module Example¶
# modules/monitoring/module.py
from modules.base import BaseModule
from starstreamer.core.active_triggers import ActiveTriggerManager
class MonitoringModule(BaseModule):
@property
def module_name(self) -> str:
return "monitoring"
async def register_actions(self) -> None:
# Import handlers to register them
from modules.monitoring.actions import monitors
async def on_enable(self) -> None:
"""Set up active triggers when module is enabled"""
# Get the active trigger manager from DI
active_trigger_manager = self.container.get(ActiveTriggerManager)
# Register all monitoring triggers
await self._setup_monitoring_triggers(active_trigger_manager)
async def _setup_monitoring_triggers(self, manager: ActiveTriggerManager):
"""Register all monitoring triggers"""
from starstreamer.triggers import TimerTrigger, ThresholdTrigger
# System health check every 5 minutes
health_timer = TimerTrigger(interval=300, name="health_check")
await manager.register_trigger(health_timer, monitors.system_health_check)
# Viewer milestone celebrations
viewer_trigger = ThresholdTrigger(
metric_func=monitors.get_viewer_count,
thresholds=[50, 100, 500, 1000],
direction="rising",
hysteresis=0.05,
check_interval=30,
name="viewer_milestones"
)
await manager.register_trigger(viewer_trigger, monitors.celebrate_milestones)
Handler Organization¶
# modules/monitoring/actions/monitors.py
from starstreamer import on_event
@on_event("active_trigger.execute")
async def system_health_check(event, logger):
"""Monitor system health"""
if _is_trigger(event, "TimerTrigger", "health_check"):
# Health check logic
logger.info("💓 System health check completed")
@on_event("active_trigger.execute")
async def celebrate_milestones(event, twitch, logger):
"""Celebrate viewer milestones"""
if _is_trigger(event, "ThresholdTrigger", "viewer_milestones"):
threshold = event.data.get("threshold")
current = event.data.get("metric_value")
await twitch.send_message(f"🎉 {current} viewers! Thank you!")
def _is_trigger(event, trigger_type: str, trigger_name: str) -> bool:
"""Helper to check trigger type and name"""
if event.data.get("trigger_type") != trigger_type:
return False
trigger = event.data.get("trigger")
return trigger and trigger.name == trigger_name
def get_viewer_count() -> int:
"""Get current viewer count"""
# Your implementation
return current_viewers
Best Practices¶
Error Handling¶
@on_event("active_trigger.execute")
async def robust_handler(event, logger):
"""Handler with proper error handling"""
try:
if _is_trigger(event, "TimerTrigger", "my_timer"):
# Your logic here
result = await perform_operation()
logger.info(f"Operation completed: {result}")
except Exception as e:
logger.error(f"Error in timer handler: {e}", exc_info=True)
# Don't re-raise - let the trigger continue running
Performance Considerations¶
# Good: Fast condition check
def is_prime_time() -> bool:
"""Quick time check"""
return 19 <= datetime.now().hour <= 22
# Avoid: Expensive operations in conditions
def expensive_condition() -> bool:
"""Don't do this - too expensive for frequent checking"""
result = database.execute("SELECT COUNT(*) FROM complex_query")
return result[0][0] > threshold
Resource Management¶
# Use appropriate check intervals
TimerTrigger(interval=30) # 30s - for real-time monitoring
TimerTrigger(interval=300) # 5min - for regular checks
TimerTrigger(interval=3600) # 1hr - for background maintenance
# Use hysteresis for noisy metrics
ThresholdTrigger(
metric_func=get_noisy_metric,
thresholds=[100],
hysteresis=0.1, # 10% dead zone prevents flapping
check_interval=60
)
Debugging¶
# Enable debug logging for active triggers
import logging
logging.getLogger('starstreamer.triggers.active').setLevel(logging.DEBUG)
# Add detailed logging to handlers
@on_event("active_trigger.execute")
async def debug_handler(event, logger):
"""Handler with debug information"""
trigger_type = event.data.get("trigger_type")
trigger = event.data.get("trigger")
trigger_name = trigger.name if trigger else "unknown"
logger.debug(f"Active trigger fired: {trigger_type}:{trigger_name}")
logger.debug(f"Event data: {event.data}")
# Your handler logic here
Migration from Event-Based¶
If you have existing event-based handlers that you want to convert to active triggers:
Before (Event-Based)¶
# This only runs when chat messages occur
@on_event("twitch.chat.message")
@trigger(CommandTrigger("!uptime"))
async def uptime_command(event, twitch):
uptime = get_stream_uptime()
await twitch.send_message(f"Stream uptime: {uptime}")
After (Active Trigger)¶
# This runs automatically every 30 minutes
@on_event("active_trigger.execute")
async def uptime_announcement(event, twitch):
if _is_trigger(event, "TimerTrigger", "uptime_announce"):
uptime = get_stream_uptime()
await twitch.send_message(f"📺 Stream uptime: {uptime}")
# Register the timer
uptime_timer = TimerTrigger(interval=1800, name="uptime_announce") # 30 minutes
Troubleshooting¶
Common Issues¶
Triggers not firing:
- Check that the ActiveTriggerManager is properly initialized in main.py
- Verify trigger registration in module's on_enable method
- Check logs for registration errors
High CPU usage: - Reduce check frequency for expensive conditions - Use appropriate intervals (not too frequent) - Optimize metric functions
Triggers firing too often:
- Add hysteresis to ThresholdTrigger
- Use once=True for ConditionTrigger
- Check condition logic for unexpected behavior
Memory leaks:
- Ensure proper trigger cleanup in module's on_disable
- Don't store large objects in trigger callbacks
- Use weak references for external resources
Debug Commands¶
# Check active trigger status
active_trigger_manager = container.get(ActiveTriggerManager)
stats = active_trigger_manager.get_stats()
logger.info(f"Active triggers: {stats}")
# List registered triggers
triggers = active_trigger_manager.list_triggers()
for trigger_id, trigger_info in triggers.items():
logger.info(f"Trigger {trigger_id}: {trigger_info}")
Next Steps¶
- Explore Triggers & Filters for the complete trigger system
- Learn about Dependency Injection for advanced handler patterns
- Check out Module Development for building custom modules with active triggers