发布于 2026-01-06 2 阅读
0

使用 React、GraphQL 订阅和 Redis PubSub 构建实时仪表板 一个使用 React、GraphQL 订阅和 Redis PubSub 的轻量级实时仪表板 DEV 的全球展示挑战赛 由 Mux 呈现:展示你的项目!

使用 React、GraphQL 订阅和 Redis PubSub 构建实时仪表盘

一个使用 React、GraphQL 订阅和 Redis PubSub 构建的轻量级实时仪表盘

由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!

本文将介绍如何使用ReactGraphQL 订阅Redis PubSub创建一个简单、可扩展且实时更新的仪表盘。实时仪表盘用于监控基础设施(服务器、网络、服务)、应用程序流量(事务量、用户数)、告警(应用程序健康状况、关键问题通知、宕机时间)等。大多数情况下,仪表盘由一个或多个数据源驱动。

开发者利用一些开源应用程序来创建丰富且实用的仪表盘。例如,Kibana用于可视化与ELK Stack集成的应用程序日志。Grafana提供了一个平台,用于在PrometheusGraphiteOpenTSDB等时间序列数据库之上构建各种可视化图表。但是,截至目前,它们仅支持拉取式模型。也就是说,当用户打开浏览器时,应用程序会查询数据源来渲染仪表盘。与推送式模型相比,拉取式模型是目前应用最广泛的模型

何时可以使用推送模式?

假设您有一个包含20 个面板的仪表盘,需要实时查询来自多个数据源的数据。用户设置的刷新频率为5 秒。如果平均有100 个用户同时打开仪表盘,那么每 5 秒就会产生20 x 100 = 2000 个请求!如果您拥有良好的底层时间序列数据库基础设施,这尚可承受。否则,多个高负载查询会占用大量内存,导致结果检索延迟。这个问题可以通过引入智能缓存方案或使用WebSocket 的简单推送模型来解决。对于多个用户在同一时间或略有不同的时间查询相同数据的情况,这种方法非常有效(且简单)。

以下是推送模式工作原理的简要流程图:

  • 服务器和客户端之间通过 WebSocket 建立连接。
  • 服务器定期向客户端发送所需数据。
  • 如果连接断开,客户端可以重试(甚至可以无限期地重试)。
  • 在任何给定时间点,所有客户端都显示相同的数据。

我们正在建造什么?

这是我们将要构建的简单实时仪表盘的预览。它包含 4 个面板——CPU 利用率、流量信息、数据中心分布和警报。

实时仪表盘预览

GraphQL订阅

GraphQL是一种用于 API 的查询语言,也是一个运行时环境,用于使用现有数据来执行这些查询。如果您不熟悉 GraphQL,请访问graphql.org获取更多信息。

除了查询变更之外,GraphQL 还引入了另一个规范——订阅

正如服务器支持的 mutation 列表描述了客户端可以执行的所有操作一样,服务器支持的 subscription 列表描述了它可以订阅的所有事件。正如客户端可以使用 GraphQL select 语句告诉服务器在执行 mutation 后需要重新获取哪些数据一样,客户端也可以使用 GraphQL select 语句告诉服务器希望通过 subscription 推送哪些数据。—— GraphQL 博客

例如,客户端可以使用以下订阅语法订阅 CPU 数据。

subscription CPU {
  cpu {
    percentage
  }
}
Enter fullscreen mode Exit fullscreen mode

服务器可以定期发布数据,

pubsub.publish(CPU, { cpu: { percentage: 65 } });
Enter fullscreen mode Exit fullscreen mode

Redis 发布订阅

自 2.0 版本起,Redis支持发布/订阅模式,使用PUBLISHSUBSCRIBEUNSUBSCRIBE命令。更多信息请参阅Redis 文档

消息可以通过频道发布。要"hello listeners"通过频道发送消息myradio,请使用PUBLISH以下命令。

PUBLISH myradio "hello listeners"
Enter fullscreen mode Exit fullscreen mode

但是,如果没有人收听,频道就毫无用处!打开另一个标签页redis-cli,订阅该频道myradio

SUBSCRIBE myradio
Enter fullscreen mode Exit fullscreen mode

现在,再次发送发布命令并观察另一个终端。

终端中的 Redis PubSub

结合 GraphQL 订阅和 Redis 发布/订阅

可以使用Apollo的包graphql-subscriptions来实现 GraphQL 订阅规范。

使用 Redis 作为客户端到服务器事件发布中介可以实现横向扩展。graphql -redis-subscriptions包可以作为 PubSubEngine 接口插入到系统中graphql-subscriptions

示例实现

有关完整实现,请参阅github.com/nowke/realtime-dashboard-demo/

GitHub 标志 nowke /实时仪表盘演示

一个基于 React、GraphQL 订阅和 Redis PubSub 的轻量级、可扩展的实时仪表盘

一个使用 React、GraphQL 订阅和 Redis PubSub 构建的轻量级实时仪表盘

操作指南:https://dev.to/nowke/building-real-time-dashboard-using-react-graphql-subscriptions-and-redis-pubsub-2gip

预览

预览

设置

先决条件

  • 安装NodeJS (LTS)
  • 安装Redis(服务器端和客户端)
    • MacOS -brew install redis
  • 安装Yarn
    • MacOS -brew install yarn

克隆存储库

git clone https://github.com/nowke/realtime-dashboard-demo.git

(a)设置服务器

启动 Redis 服务器

redis-server

安装依赖项

cd server
yarn install

启动服务器

yarn start

服务器将在http://localhost:4000/运行。将显示以下用户界面。

GraphQL UI

(b)设置工作进程

安装依赖项

cd worker
yarn install

启动工人

yarn start

将会打印以下日志:

Starting worker
Scheduled Jobs for CPU, Traffic, distribution, messages
Fetched new results for MESSAGES
Fetched new results for CPU
Fetched new results for DISTRIBUTION
Fetched new results for CPU
Fetched new results for MESSAGES
Fetched new results for TRAFFIC
...

(c)设置客户端

安装依赖项

cd client
yarn install

启动客户端

yarn start

客户端将运行……

示例代码由 3 个部分组成,

  • 服务器
  • 客户端(用户浏览器)连接到服务器
  • 工作进程 - 通过向服务器发布事件来模拟真实事件

服务器

安装所需软件包

yarn add graphql apollo-server graphql-redis-subscriptions graphql-subscriptions ioredis moment
Enter fullscreen mode Exit fullscreen mode

请确保在系统redis-server中运行并设置 PubSub 。它用于发布消息。localhostPORT 6379graphql-redis-subscriptions

server/pubsub.js

const { RedisPubSub } = require("graphql-redis-subscriptions");

const pubsub = new RedisPubSub();
module.exports = pubsub;
Enter fullscreen mode Exit fullscreen mode

定义 GraphQL schema。

  • 查询- 用于从 Redis 获取初始结果。
  • 变异- 用于发布新消息。
  • 订阅- 用于客户端和服务器之间的实时数据交换。
const { gql } = require("apollo-server");

const schema = gql`
  type Dps {
    timestamp: Int!
    value: Float!
  }

  type Traffic {
    total: Int!
    dps: [Dps]
  }

  type CPU {
    percentage: Float!
  }

  type Distribution {
    region: String!
    percentage: Float!
  }

  type Message {
    title: String!
    description: String!
    color: String!
  }

  type Query {
    cpu: CPU
    traffic: Traffic
    distribution: [Distribution]
    messages: [Message]
  }

  type Mutation {
    cpu: CPU
    traffic: Traffic
    distribution: [Distribution]
    messages: [Message]
  }

  type Subscription {
    cpu: CPU
    traffic: Traffic
    distribution: [Distribution]
    messages: [Message]
  }
`;

module.exports = schema;
Enter fullscreen mode Exit fullscreen mode

我们提供了辅助函数来生成所有 4 个面板的虚拟数据——请参考server/utils/generator.js。使用这些数据生成器,编写一个包装函数publishRandomData

const pubsub = require("./pubsub");
const { set } = require("./utils/redis");

const COMPONENTS = {
  CPU: "cpu",
  TRAFFIC: "traffic",
  DISTRIBUTION: "distribution",
  MESSAGES: "messages"
};

const publishRandomData = async (generator, component) => {
  const data = generator();
  pubsub.publish(component, { [component]: data });
  await set(component, data);
  return data;
};
Enter fullscreen mode Exit fullscreen mode

publishRandomData可以通过以下方式调用该函数来获取 CPU 使用率。

const getCPU = () => 50;
await publishRandomData(getCPU, "CPU")
Enter fullscreen mode Exit fullscreen mode

为之前定义的模式定义解析器函数(下面给出 CPU 的示例)。

server/resolvers.js

const { get } = require("./utils/redis");

module.exports = {
  Query: {
    cpu: () => get(COMPONENTS.CPU)
  },
  Mutation: {
    cpu: () => publishRandomData(cpuData, COMPONENTS.CPU),
  },
  Subscription: {
    cpu: {
      subscribe: () => pubsub.asyncIterator(COMPONENTS.CPU)
    },
  }
}
Enter fullscreen mode Exit fullscreen mode

启动服务器

server/index.js

const { ApolloServer } = require("apollo-server");

const typeDefs = require("./schema");
const resolvers = require("./resolvers");

// Server
const server = new ApolloServer({ typeDefs, resolvers });

server.listen().then(({ url }) => {
  console.log(`🚀  Server ready at ${url}`);
});
Enter fullscreen mode Exit fullscreen mode
$ yarn start
yarn run v1.13.0
$ nodemon index.js
...
🚀  Server ready at http://localhost:4000/
Enter fullscreen mode Exit fullscreen mode

访问localhost:4000打开 GraphQL playground。

订阅 CPU 百分比输入Tab 1并点击播放按钮

subscription {
  cpu {
    percentage
  }
}
Enter fullscreen mode Exit fullscreen mode

运行针对 CPU 的 mutationTab 2来发布一个随机百分比值。该值将作为事件被接收Tab 1。多次尝试该 mutation 以获取不同的值。

mutation {
  cpu {
    percentage
  }
}
Enter fullscreen mode Exit fullscreen mode

运行 CPU 查询Tab 3。返回的是最后发布的值——这是因为最近的值缓存在 Redis 中。

query {
  cpu {
    percentage
  }
}
Enter fullscreen mode Exit fullscreen mode
{
  "data": {
    "cpu": {
      "percentage": 25
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

客户

创建一个新的 React 应用程序,用于create-react-app客户端

yarn create react-app client
Enter fullscreen mode Exit fullscreen mode

安装所需的依赖项。

yarn add apollo-boost apollo-client apollo-link-ws graphql react-apollo subscriptions-transport-ws
Enter fullscreen mode Exit fullscreen mode

设置 Apollo HTTP 客户端和 WebSocket 客户端,因为两种连接类型都需要。HTTP 服务器将运行在 [此处应填写服务器地址] http://localhost:4000,WebSocket 订阅服务器将运行在 [此处应填写服务器地址] ws://localhost:4000/graphql

client/src/App.js

import React, { Component } from "react";
import { ApolloClient } from "apollo-client";
import { InMemoryCache } from "apollo-cache-inmemory";
import { ApolloProvider } from "react-apollo";
import { split } from "apollo-link";
import { HttpLink } from "apollo-link-http";
import { WebSocketLink } from "apollo-link-ws";
import { getMainDefinition } from "apollo-utilities";

import './App.css'
import Home from "./Pages/Home";

// Create an http link:
const httpLink = new HttpLink({
  uri: "http://localhost:4000"
});

// Create a WebSocket link:
const wsLink = new WebSocketLink({
  uri: `ws://localhost:4000/graphql`,
  options: {
    reconnect: true
  }
});

// using the ability to split links, you can send data to each link
// depending on what kind of operation is being sent
const link = split(
  // split based on operation type
  ({ query }) => {
    const { kind, operation } = getMainDefinition(query);
    return kind === "OperationDefinition" && operation === "subscription";
  },
  wsLink,
  httpLink
);

const client = new ApolloClient({
  link,
  cache: new InMemoryCache()
});

class App extends Component {
  render() {
    return (
      <ApolloProvider client={client}>
        <Home />
      </ApolloProvider>
    );
  }
}

export default App;
Enter fullscreen mode Exit fullscreen mode

Home组件被封装起来ApolloProvider,从而可以运行查询和订阅。

让我们来设计 CPU 使用率组件 - CpuUsage.js

定义查询和订阅

import gql from "graphql-tag";

const QUERY = gql`
  query CPU {
    cpu {
      percentage
    }
  }
`;

const SUBSCRIPTION = gql`
  subscription CPU {
    cpu {
      percentage
    }
  }
`;
Enter fullscreen mode Exit fullscreen mode

要求如下。

  • query初始加载时,数据应通过(来自 Redis 键值存储)进行渲染。
  • 加载完成后,组件应该渲染来自订阅(来自 Redis PubSub 通道)的值。

这可以通过组件subscribeToMore提供的 prop来实现,详情请参阅https://www.apollographql.com/docs/react/advanced/subscriptions.html#subscribe-to-moreQueryreact-apollo

import React, { Component } from "react";
import { Query } from "react-apollo";

const CpuUsageContainer = () => (
  <Query query={QUERY}>
    {({ subscribeToMore, ...result }) => (
      <CpuUsage
        {...result}
        subscribeToNewData={() =>
          subscribeToMore({
            document: SUBSCRIPTION,
            updateQuery: (prev, { subscriptionData }) => {
              if (!subscriptionData.data) return prev;
              return subscriptionData.data;
            }
          })
        }
      />
    )}
  </Query>
)
Enter fullscreen mode Exit fullscreen mode

显示组件中的 CPU 使用率CpuUsage

class CpuUsage extends Component {
  componentDidMount() {
    this.props.subscribeToNewData();
  }

  render() {
    const { data, error, loading } = this.props;
    if (loading) return <p> Loading ... </p>;
    if (error) return <p>Error!</p>;
    return (
      <p> CPU Usage: {data.cpu.percentage}% </p>
    )
  }
}
Enter fullscreen mode Exit fullscreen mode

请参阅CpuUsage.js文件以获取包含饼图的完整类定义。

CPU 使用率

工人

可以使用简单的调度脚本,通过定期调用 4 个面板的 mutation 来模拟真实事件。node -schedule包可用于创建异步调度器。

安装依赖项

yarn add node-schedule request request-promise
Enter fullscreen mode Exit fullscreen mode

为每个面板定义突变

const queries = {
  CPU: `
    mutation {
      cpu {
        percentage
      }
    }
    `,
  TRAFFIC: `
    mutation {
      traffic {
        total
        dps {
          timestamp
          value
        }
      }
    }
    `,
  DISTRIBUTION: `
    mutation {
      distribution {
        region
        percentage
      }
    }
    `,
  MESSAGES: `
    mutation {
      messages {
        title
        description
        color
      }
    }
    `
};
Enter fullscreen mode Exit fullscreen mode

例如,添加一个 CPU 调度器,schedule.scheduleJob每 3 秒执行一次任务。

const schedule = require("node-schedule");

schedule.scheduleJob("*/3 * * * * *", async () => {
  await makeHttpRequest("CPU"); // Call mutation for CPU panel
  console.log("Fetched new results for CPU");
});
Enter fullscreen mode Exit fullscreen mode

请参考worker/worker.js获取完整脚本

运行工作进程

$ yarn start
yarn run v1.13.0
$ node worker.js
Starting worker
Scheduled Jobs for CPU, Traffic, distribution, messages
Fetched new results for TRAFFIC
Fetched new results for MESSAGES
Fetched new results for CPU
Fetched new results for DISTRIBUTION
Fetched new results for CPU
Fetched new results for MESSAGES
Fetched new results for TRAFFIC
...
...
Enter fullscreen mode Exit fullscreen mode

实时仪表盘预览

规模化

为了实现高可用性,服务器程序将部署在多个实例上,并通过负载均衡器连接。

假设有 4 台服务器S1,分别为 A S2、BS3S4C。当用户打开浏览器(客户端)时,可以通过负载均衡器连接到其中任何一台服务器。所有这些服务器都连接到一个 Redis 集群R

如果使用 nginx,可以通过更改配置来路由 websocket 请求。详情请参阅www.nginx.com/blog/websocket-nginx/ 。

架构图

下图表示一个配置,其中 4 个客户端通过负载均衡器连接到 4 台服务器。

架构图

分析来自Worker的请求流

请求分析

  1. Worker向其中一个服务器(通过负载均衡器)发出POST请求(即变更) 。S1
  2. S1向 Redis 集群发送PUBLISH包含数据的命令cpu
  3. 由于所有服务器都订阅了 Redis 中的同一个通道,因此它们(S1、、S2)都会收到数据S3S4cpu
  4. 服务器通过WebSocket将数据发布给所有客户端(C1,,,C2C3C4
文章来源:https://dev.to/nowke/building-real-time-dashboard-using-react-graphql-subscriptions-and-redis-pubsub-2gip