class AdvertiserFeatureStore: def __init__(self): self.redis_client = Redis() self.feature_cache_ttl = 300 # 5 phút
def get_churn_features(self, advertiser_id): features = {} recent_activity = self.get_recent_activity(advertiser_id) features['days_since_last_campaign'] = recent_activity.days_since_last features['spend_trend_7d'] = recent_activity.spend_trend historical_key = f"historical_features:{advertiser_id}" cached_historical = self.redis_client.get(historical_key) if cached_historical: features.update(json.loads(cached_historical)) else: historical = self.compute_historical_features(advertiser_id) self.redis_client.setex(historical_key, 3600, json.dumps(historical)) features.update(historical) return features
def get_advertiser_insights(advertiser_id): cache_key = f"insights:{advertiser_id}" try: cached_data = redis.get(cache_key) if cached_data: insights = json.loads(cached_data) if insights.get('includes_safety_data'): fresh_safety_data = get_fresh_safety_metrics(advertiser_id) insights.update(fresh_safety_data) return insights except RedisConnectionError: logger.warning("Cache unavailable, falling back to database")
insights_data = generate_advertiser_insights(advertiser_id) try: redis.setex(cache_key, 300, json.dumps(insights_data)) except RedisConnectionError: pass return insights_data
def warm_popular_content(): """Run this job every hour to pre-populate cache""" popular_items = get_trending_content() for item in popular_items: get_content_with_cache(item.id)
Queue | Mức ưu tiên | Loại thông báo |
---|---|---|
P0 | Cao nhất | Cảnh báo khẩn cấp (cháy, y tế, an ninh) |
P1 | Trung bình | Thông báo quan trọng (pin yếu, thiết bị offline) |
P2 | Thấp nhất | Thông báo thông tin (thời tiết, nhắc nhở) |
def route_notification(notification): priority = determine_priority(notification) if notification.type == 'emergency': emergency_queue.send_message(notification, priority=0) send_immediate_push(notification) elif notification.type == 'device_alert': device_queue.send_message(notification, priority=1) else: general_queue.send_message(notification, priority=2)
class NotificationConsumer: def __init__(self): self.emergency_consumer = StreamingConsumer('emergency-events') self.batch_consumer = StreamingConsumer('batch-events')
def consume_emergency_events(self): for message in self.emergency_consumer: try: process_emergency_notification(message.value) except Exception as e: emergency_dlq.send(message.value)
def consume_batch_events(self): batch = [] for message in self.batch_consumer: batch.append(message.value) if len(batch) >= 100: process_notification_batch(batch) batch = []
def process_image(job_data): try: result = image_processor.process(job_data['image_url']) return result except Exception as e: logger.error(f"Image processing failed: {e}", extra=job_data) dead_letter_queue.put({ 'original_job': job_data, 'error': str(e), 'timestamp': time.time(), 'retry_count': job_data.get('retry_count', 0) }) raise
class AdvertiserExperienceTracker: def __init__(self): self.metrics = defaultdict(list)
def track_recommendation_request(self, advertiser_id, model_type, latency, confidence, success): timestamp = time.time() self.metrics[f"recommendation_journey_{model_type}"].append({ 'latency': latency, 'confidence': confidence, 'success': success, 'timestamp': timestamp }) if confidence < 0.8 and model_type == 'churn_prediction': self.alert(f"Churn prediction confidence dropped to {confidence}")
recent_requests = [ req for req in self.metrics[f"recommendation_journey_{model_type}"] if timestamp - req['timestamp'] < 300 ] if len(recent_requests) > 10: p95_latency = np.percentile([r['latency'] for r in recent_requests], 95) if p95_latency > 2000: self.alert(f"High P95 latency for {model_type}: {p95_latency}ms")
from jaeger_client import Config
def create_tracer(): config = Config( config={ 'sampler': {'type': 'const', 'param': 1}, 'logging': True, }, service_name='user-service', ) return config.initialize_tracer()
@traced_functiondef process_user_request(user_id): with tracer.start_span('database_lookup') as span: user = get_user(user_id) span.set_tag('user_tier', user.tier) with tracer.start_span('permission_check'): permissions = check_permissions(user) return build_response(user, permissions)
pybreaker
với cấu hình circuit breaker.from pybreaker import CircuitBreaker
emergency_contact_breaker = CircuitBreaker( fail_max=3, reset_timeout=30, exclude=[ValueError])
analytics_breaker = CircuitBreaker( fail_max=10, reset_timeout=300, exclude=[ValueError])
@emergency_contact_breakerdef validate_emergency_contact(contact_info): return contact_service.validate(contact_info)
def handle_emergency_call(user_id, emergency_type): try: contact_validation = validate_emergency_contact(user_contacts) return initiate_emergency_call(user_id, emergency_type, contact_validation) except CircuitBreakerError: logger.warning("Contact validation unavailable, proceed anyway") return initiate_emergency_call(user_id, emergency_type, fallback_contacts=True)