发布于 2026-01-06 0 阅读
0

利用 12 项以上的 Redis 功能构建实时协作工作流

利用 12 项以上的 Redis 功能构建实时协作工作流

redisflow 主页
这是Redis AI 挑战赛:超越缓存的参赛作品

我建造的

我构建了一个名为RedisFlow的实时协作工作流自动化平台,它将 Redis 从一个简单的缓存转变为一个完整的应用程序骨干。你可以把它想象成 Zapier 和 Figma 的结合体,多个用户可以实时地共同设计、执行和监控自动化工作流。

RedisFlow 的特别之处在于?它利用了12 项以上的 Redis 功能,创建了一个可用于生产的平台,将 Redis 展示为多模型数据库、实时引擎和分布式系统基础——同时实现了低于 10 毫秒的协作延迟,并支持每个工作流 100 多个并发用户。

主要特点:

  • 可视化工作流构建器:拖放式界面,包含 7 种节点类型
  • 实时协作:多个用户同时使用实时光标进行编辑
  • 即时执行:实时查看工作流程运行状态更新
  • 实时监控:实时流式传输执行日志和指标
  • 分布式处理:利用 Redis 支持的作业队列实现水平扩展

演示

我如何使用 Redis 8

RedisFlow 将 Redis 的功能远远拓展到缓存之外,使其成为构建现代实时应用程序的完整平台。以下是我如何利用 Redis 的各项功能:

1. Redis JSON - 工作流的主要数据库

// Store complex workflow definitions as JSON documents
await redisClient.json.set(`workflow:${id}`, '$', {
  id,
  name: 'Customer Data Processor',
  nodes: [...],
  connections: [...],
  metadata: { created: Date.now(), version: 1 }
});
Enter fullscreen mode Exit fullscreen mode

2. Redis 发布/订阅 - 实时协作引擎

// Broadcast workflow changes to all connected users
publisher.publish(`workflow:${workflowId}:updates`, JSON.stringify({
  type: 'node-moved',
  nodeId: node.id,
  position: { x: 100, y: 200 },
  userId: currentUser.id
}));

// Subscribe to receive updates
subscriber.subscribe(`workflow:${workflowId}:updates`);
subscriber.on('message', (channel, message) => {
  // Update UI in real-time
});
Enter fullscreen mode Exit fullscreen mode

3. Redis Streams - 事件溯源与执行日志

// Stream execution events for real-time monitoring
await ioredisClient.xadd(
  `execution:${executionId}:logs`,
  '*',
  'type', 'node_complete',
  'nodeId', node.id,
  'status', 'success',
  'duration', executionTime,
  'result', JSON.stringify(result)
);
Enter fullscreen mode Exit fullscreen mode

4. BullMQ(基于 Redis)——分布式作业处理

// Queue workflow executions with retry logic
await executionQueue.add('execute', {
  workflowId,
  executionId,
  inputs
}, {
  attempts: 3,
  backoff: { type: 'exponential', delay: 2000 }
});
Enter fullscreen mode Exit fullscreen mode

5. Redis 搜索 - 全文工作流搜索

// Create search index for workflows
await redisClient.ft.create('idx:workflows', {
  name: { type: 'TEXT', sortable: true },
  description: 'TEXT',
  tags: 'TAG',
  created: 'NUMERIC'
}, { ON: 'JSON', PREFIX: 'workflow:' });
Enter fullscreen mode Exit fullscreen mode

6. Redis 有序集合 - 指标与排名

// Track workflow popularity and execution metrics
await ioredisClient.zincrby('workflows:by_executions', 1, workflowId);
await ioredisClient.zadd('workflows:by_date', Date.now(), workflowId);
Enter fullscreen mode Exit fullscreen mode

7. Redis 计数器 - 实时分析

// Increment execution counters atomically
await ioredisClient.incr('metrics:executions:total');
await ioredisClient.incr(`metrics:executions:daily:${today}`);
Enter fullscreen mode Exit fullscreen mode

8. Redis TTL - 自动数据清理

// Set TTL for temporary data
await ioredisClient.setex(`session:${sessionId}`, 3600, userData);
await ioredisClient.expire(`cache:${key}`, 300);
Enter fullscreen mode Exit fullscreen mode

9. Redis 事务 - 原子操作

// Ensure consistency with multi-command transactions
const multi = ioredisClient.multi();
multi.json.set(`workflow:${id}`, '$', workflowData);
multi.zadd('workflows:by_date', Date.now(), id);
multi.incr('metrics:workflows:total');
await multi.exec();
Enter fullscreen mode Exit fullscreen mode

10. Redis Sets - 用户在线状态跟踪

// Track active users per workflow
await ioredisClient.sadd(`workflow:${workflowId}:users`, userId);
const activeUsers = await ioredisClient.scard(`workflow:${workflowId}:users`);
Enter fullscreen mode Exit fullscreen mode

11. Redis 列表 - 执行队列管理

// Manage execution order
await ioredisClient.lpush('execution:queue', executionId);
const nextExecution = await ioredisClient.rpop('execution:queue');
Enter fullscreen mode Exit fullscreen mode

12. Redis 哈希 - 执行元数据

// Store execution details
await ioredisClient.hset(`execution:${executionId}`, {
  status: 'running',
  startedAt: Date.now(),
  workflowId,
  userId
});
Enter fullscreen mode Exit fullscreen mode

架构与性能

系统架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Vue.js    │────▶│   Node.js   │────▶│    Redis    │
│  Frontend   │     │   Backend   │     │   Database  │
└─────────────┘     └─────────────┘     └─────────────┘
       │                    │                    │
       └────WebSocket───────┘                    │
           (Socket.io)                           │
                                          ┌──────┴────┐
                                          │  Features │
                                          ├───────────┤
                                          │ • JSON    │
                                          │ • Streams │
                                          │ • Pub/Sub │
                                          │ • Search  │
                                          │ • BullMQ  │
                                          └───────────┘
Enter fullscreen mode Exit fullscreen mode

绩效指标

  • 工作流加载时间:<50毫秒(Redis JSON)
  • 协作延迟:<10毫秒(Redis 发布/订阅)
  • 搜索响应时间:<100毫秒(Redis 搜索)
  • 执行开始:<100毫秒(BullMQ)
  • 并发用户数:每个工作流程 100+
  • 吞吐量:每分钟执行 1000 次以上

技术亮点

生产就绪功能

  • 错误处理:全面的错误处理机制,包括重试机制
  • 监控:内置指标和性能跟踪
  • 可扩展性:支持 Redis 集群的水平扩展
  • 安全性:输入验证、速率限制和用户隔离
  • 持久化:使用 Redis AOF 实现数据持久性

开发者体验

  • 简洁架构:模块化设计,关注点清晰分离
  • 现代技术栈:Vue 3、Node.js、Socket.io 和 Redis
  • 轻松安装:只需一条命令即可完成安装,并提供环境模板
  • 完整文档:README 文件、API 文档和部署指南

未来的可能性

随着 Redis 的不断发展,RedisFlow 未来可能会扩展到包括:

  • 向量搜索:人工智能驱动的工作流程推荐
  • Redis ML:智能工作流优化
  • 时间序列:高级分析和监测
  • :复杂工作流依赖关系管理

RedisFlow 证明 Redis 不仅仅是一个缓存——它是一个功能强大的多模型平台,能够为整个应用程序提供支持。RedisFlow 通过创新地利用 12 项以上的 Redis 特性,创建了一种实时协作体验,而这种体验如果使用传统数据库则很难实现。

该项目突破了 Redis 的功能极限,向全球开发者展示了他们可以使用 Redis 作为主要数据库和实时引擎来构建复杂的实时应用程序。

文章来源:https://dev.to/depapp/building-real-time-collaborative-workflows-with-12-redis-features-a51