defsemantic_search_fixed(self,query:str,limit:int=5)->List[Dict]:"""Fixed semantic search using Redis 8"""try:# Generate query embedding using Groq API + hash fallback
query_embedding=self.generate_simple_embedding(query)query_bytes=np.array(query_embedding,dtype=np.float32).tobytes()# Use Redis 8 vector search with KNN
results=self.redis.execute_command("FT.SEARCH","problem_vectors","*","PARAMS","2","query_vec",query_bytes,"SORTBY","__vector_score","LIMIT","0",str(limit),"RETURN","5","problem_id","title","difficulty","topic","companies")# Track search analytics
self.redis.execute_command("TS.ADD","similarity_searches","*",1)returnself.parse_fixed_results(results)exceptExceptionase:print(f"❌ Semantic search failed: {e}")returnself.fallback_search(query,limit)
2. 用于人工智能分析结果的语义缓存
应用案例:我们实现了一个智能缓存系统,该系统基于代码提交的语义相似性存储人工智能代码分析结果。这减少了对 Groq 的冗余 API 调用,并针对相似的代码模式提供即时反馈,同时保持了分析质量。
项目实现:创建了一个语义缓存层,该缓存层使用问题 ID 和代码内容的 MD5 哈希值生成基于内容的缓存键。系统将全面的 AI 分析结果(代码质量评分、算法效率评级、 通信评估)存储在具有智能 TTL 管理的 Redis JSON 文档中。
它的优势在于:消除了对类似代码提交进行昂贵的重新分析,降低了 60% 的 API 成本,并通过语义相似性匹配为缓存分析提供亚秒级的响应时间,同时保持 AI 反馈的质量。
defcache_ai_analysis(self,problem_id:str,code:str,analysis:Dict):"""Cache AI analysis results with semantic similarity"""try:code_hash=hashlib.md5(f"{problem_id}:{code}".encode()).hexdigest()cache_key=f"ai_analysis:{code_hash}"# Store comprehensive analysis as JSON with metadata
self.redis.execute_command("JSON.SET",cache_key,".",json.dumps({"problem_id":problem_id,"code_hash":code_hash,"analysis":analysis,"timestamp":datetime.now().isoformat(),"cached":True,"scores":{"code_quality":analysis.get("code_quality",0),"algorithm_efficiency":analysis.get("algorithm_efficiency",0),"communication":analysis.get("communication",0),"problem_solving":analysis.get("problem_solving",0)}}))# Set intelligent expiration (1 hour for fresh analysis)
self.redis.expire(cache_key,3600)# Track AI request in time series
self.redis.execute_command("TS.ADD","ai_analysis_requests",int(datetime.now().timestamp()*1000),1)exceptExceptionase:print(f"⚠️ AI analysis caching failed: {e}")
deftrack_user_activity(self,user_id:str,activity:str,metadata:Dict=None):try:timestamp=int(time.time()*1000)# Store detailed activity in JSON with nested metadata
log_data={"user_id":user_id,"activity":activity,"timestamp":datetime.now().isoformat(),"metadata":metadataor{},"session_info":{"platform":"DSA Interview Platform","features_used":["ai_analysis","vector_search","real_time_analytics"],"redis_modules":["JSON","Search","TimeSeries","VectorSet"]},"performance_metrics":{"response_time_ms":metadata.get("response_time",0),"cache_hit":metadata.get("cache_hit",False),"ai_analysis_used":metadata.get("ai_analysis",False)}}# Store as JSON document with complex nested structure
log_key=f"activity:{user_id}:{timestamp}"self.redis.execute_command("JSON.SET",log_key,".",json.dumps(log_data))# Set intelligent expiration (24 hours)
self.redis.expire(log_key,86400)# Update user's recent activity list
ifactivity=="problem_view"andmetadata:problem_id=metadata.get("problem_id")ifproblem_id:self.redis.lpush(f"user:{user_id}:recent",problem_id)self.redis.ltrim(f"user:{user_id}:recent",0,9)# Keep last 10
exceptExceptionase:print(f"⚠️ Activity tracking failed: {e}")
5. 全文搜索引擎
应用案例:我们利用 RediSearch 实现了全面的全文搜索功能,支持高级问题发现,并提供复杂的筛选、高亮显示和排名功能。这使得用户能够获得类似 Google 的搜索体验,搜索范围涵盖问题标题、描述和主题标签。
defsetup_fulltext_search(self):try:# Create advanced full-text search index with weighted fields
self.redis.execute_command("FT.CREATE","problem_search","ON","HASH","PREFIX","1","problem:","SCHEMA","title","TEXT","WEIGHT","2.0",# Higher weight for titles
"description","TEXT","WEIGHT","1.0","topic","TAG",# Exact match for topics
"difficulty","TAG",# Exact match for difficulty
"companies","TAG",# Multi-value company tags
"examples","TEXT","WEIGHT","0.5",# Lower weight for examples
"constraints","TEXT","WEIGHT","0.3")print("✅ Advanced full-text search index created")exceptExceptionase:print(f"⚠️ Full-text search setup: {e}")deffulltext_search(self,query:str,filters:Dict=None)->List[Dict]:"""Advanced full-text search with filtering and highlighting"""try:# Build complex search query with filters
search_query=queryiffilters:iffilters.get('difficulty'):search_query+=f" @difficulty:{filters['difficulty']}"iffilters.get('topic'):search_query+=f" @topic:{filters['topic']}"iffilters.get('companies'):search_query+=f" @companies:{filters['companies']}"# Execute search with highlighting and ranking
results=self.redis.execute_command("FT.SEARCH","problem_search",search_query,"LIMIT","0","10","HIGHLIGHT","FIELDS","2","title","description","SUMMARIZE","FIELDS","1","description","LEN","3","SORTBY","_score","DESC")returnself.parse_search_results(results)exceptExceptionase:print(f"❌ Full-text search failed: {e}")return[]
defsetup_bloom_filters(self):try:# Create bloom filter for tracking processed problems
# 0.01 false positive rate, 1000 expected items
self.redis.execute_command("BF.RESERVE","processed_problems","0.01","1000")# Create bloom filter for active user sessions
# 0.01 false positive rate, 10000 expected sessions
self.redis.execute_command("BF.RESERVE","active_sessions","0.01","10000")# Create bloom filter for AI analysis deduplication
self.redis.execute_command("BF.RESERVE","analyzed_code","0.005","5000")print("✅ Bloom filters created for intelligent duplicate detection")exceptExceptionase:print(f"⚠️ Bloom filter setup: {e}")defcheck_and_add_processed_problem(self,problem_id:str)->bool:"""Check if problem already processed and add if new"""try:# Check if problem already exists in bloom filter
exists=self.redis.execute_command("BF.EXISTS","processed_problems",problem_id)ifnotexists:# Add to bloom filter and process
self.redis.execute_command("BF.ADD","processed_problems",problem_id)returnFalse# Not processed before
returnTrue# Already processed
exceptExceptionase:print(f"⚠️ Bloom filter check failed: {e}")returnFalse# Process anyway on error
defsetup_pubsub_channels(self):"""Setup Redis Pub/Sub for real-time updates"""try:# Initialize pub/sub channels for different event types
self.pubsub=self.redis.pubsub()# Subscribe to analysis completion events
self.pubsub.subscribe('analysis_complete')# Subscribe to new problem notifications
self.pubsub.subscribe('new_problem_added')# Subscribe to user achievement events
self.pubsub.subscribe('user_achievements')print("✅ Pub/Sub channels configured for real-time updates")exceptExceptionase:print(f"⚠️ Pub/Sub setup failed: {e}")defpublish_analysis_complete(self,user_id:str,problem_id:str,analysis:Dict):"""Publish AI analysis completion event"""try:event_data={"event_type":"analysis_complete","user_id":user_id,"problem_id":problem_id,"timestamp":datetime.now().isoformat(),"analysis_summary":{"overall_score":analysis.get("overall_score",0),"recommendation":analysis.get("recommendation",""),"strengths":analysis.get("strengths",[]),"improvements":analysis.get("improvements",[])},"redis_features_used":["AI_Analysis","JSON_Storage","PubSub","TimeSeries"]}# Publish to analysis completion channel
self.redis.publish('analysis_complete',json.dumps(event_data))# Track pub/sub usage in time series
self.redis.execute_command("TS.ADD","pubsub_events",int(datetime.now().timestamp()*1000),1)exceptExceptionase:print(f"⚠️ Pub/Sub publish failed: {e}")