发布于 2026-01-06 5 阅读
0

使用 Socket.io 实现 GraphQL 实时查询 动机 概念 LiveQueryStore 实现 在 GraphQL schema 中添加 @live 指令 在客户端使用实时查询 与 Apollo Client 的结合使用 与 Urql 的结合使用 未来展望

使用 Socket.io 进行 GraphQL 实时查询

动机

概念

LiveQueryStore 实现

将指令添加@live到您的 GraphQL schema 中

在客户端上消费实时查询

与 Apollo Client 一起使用

与 Urql 一起使用

未来

照片由Unsplash上的Luca Campioni拍摄

注:为了更好地理解,我建议先阅读《订阅和实时查询 - 使用 GraphQL 实现实时查询》

我长期以来一直使用基于 Socket.io 的 GraphQL schema。最终,我将客户端和服务端的协议抽象成一个库,以便于在不同项目中复用。

除了常见的 GraphQL 操作之外,我还添加了对执行实时查询的支持。

TL;DR:

  • @n1ru4l/socket-io-graphql-server:一个用于通过 socket.io 服务器提供 GraphQL schema 的层。支持查询、变更、订阅和实时查询。
  • @n1ru4l/socket-io-graphql-client. 一个用于消费通过 . 提供的 GraphQL schema 的网络接口@n1ru4l/socket-io-graphql-server。可与所有主流 GraphQL 客户端(如 Relay、Apollo Client 或 Urql)一起使用。
  • @n1ru4l/graphql-live-query. 为任何GraphQL schema 添加实时查询的实用程序。
  • @n1ru4l/in-memory-live-query-storeGraphQL实时查询实现。

所有软件包都可以在此仓库中找到:

https://github.com/n1ru4l/graphql-live-queries

为了展示这些库,我创建了一个待办事项示例应用程序,该应用程序使用上述软件包在所有客户端之间同步其状态:

  • Server:待办事项:使用 graphql-js 实现应用程序服务器,@n1ru4l/socket-io-graphql-server以及@n1ru4l/in-memory-live-query-store
  • Client Relay:Todo 应用客户端实现create-react-apprelay以及@n1ru4l/socket-io-graphql-client
  • Client Apollo:Todo 应用客户端实现create-react-app@apollo/client以及@n1ru4l/socket-io-graphql-client
  • Client Urql:Todo 应用客户端实现create-react-appurql以及@n1ru4l/socket-io-graphql-client

动机

看来,所有大型厂商都没有像应该的那样大力推广 GraphQL 的实时性。

由于 Apollo 更专注于其他领域,因此最流行的 Node.js 订阅实现维护状况不佳

目前有一些实时查询实现,但没有一个是不与特定数据库绑定的。

社区中涌现出一些很棒的想法(例如…… graphql-live-subscriptions),但这些想法要么没有得到维护,要么存在一些重大缺陷,例如与接口或联合类型不兼容。

这些实现示例可以作为范例,@n1ru4l/graphql-live-query展示@n1ru4l/in-memory-live-query-store如何在不依赖于任何特定(响应式)数据库或数据结构的情况下实现此功能。随着用户反馈新的用例并开始使用实时查询,该实现有望不断完善。

除此之外,我还创建了这两个@n1ru4l/socket-io-graphql-server@n1ru4l/socket-io-graphql-client因为我在一个需要实时更新的项目中已经大量使用了 GraphQL 而不是 Socket.io。

GraphQL 已经有了实时解决方案

订阅是响应事件的理想工具。例如,当收到新消息时,可以触发声音或显示提示信息。

订阅也常用于更新客户端上已有的查询结果。根据复杂程度,缓存更新代码最终可能会变得非常臃肿。通常,在收到订阅事件后直接重新获取查询结果会更直接。

实时查询魔法

然而,实时查询应该感觉非常流畅,无需任何缓存更新操作,即可使用服务器上的最新数据更新用户界面。这样就将复杂性从客户端转移到了服务器端。

概念

目前,我项目中实时查询的定义是:使用@live指令进行注释的查询操作。

query users @live {
  users(first: 10) {
    id
    login
  }
}
Enter fullscreen mode Exit fullscreen mode

实时查询会发送到服务器(通过 WebSocket 或 HTTP),并存储在服务器中,直到客户端断开连接或通知服务器他不再对实时查询操作感兴趣(因此服务器会将其丢弃)。

在服务器端,一旦实时查询操作选择的数据发生变化,查询就会重新执行。然后,结果会以流式传输的方式发送给客户端。可以通过使上次执行结果中选定的资源失效来安排重新执行。失效操作可以通过使用根字段的模式坐标(例如 `<root_field_name>` Query.todos)或资源标识符(例如 ` Todo:1<resource_identifier>`,id 为 1 的待办事项)来触发。更新: 您可以在此处了解更多关于库如何收集资源标识符的信息

LiveQueryStore 实现

这是一个由该软件包InMemoryLiveQueryStore提供的类,它将内存中所有有关活动查询的信息存储在该类中。@n1ru4l/in-memory-live-query-storeInMemoryLiveQueryStore

注册 LiveQueryStore

为了实现实时查询,必须将返回值的函数@n1ru4l/socket-io-graphql-server传递给该函数。excuteAsyncIteratorregisterSocketIOGraphQLServer

除了参考实现execute中的默认函数外graphql-js,新execute函数还可以返回一个AsyncIterableIterator<ExecutionResult>注意:由于@defer@stream已添加到graphql-js参考实现中,它现在也可以返回AsyncIterators

InMemoryLiveQueryStore属性execute必须用于执行实时查询(但如果操作未被标识为实时查询操作,则也会回退到默认值,从而用于非实时查询操作)executegraphql-js

import socketIO from "socket.io";
import { InMemoryLiveQueryStore } from "@n1ru4l/in-memory-live-query-store";
import { registerSocketIOGraphQLServer } from "@n1ru4l/socket-io-graphql-server";
import { schema } from "./schema";

const liveQueryStore = new InMemoryLiveQueryStore();
const server = socketIO(httpServer);

registerSocketIOGraphQLServer({
  socketServer,
  // getExecutionParameter is invoked for each incoming operation.
  // a different context or even schema based on the connection can be returned
  // and will be used for that operation.
  getExecutionParameter: () => ({
    execute: liveQueryStore.execute,
    graphQLExecutionParameter: {
      schema,
      contextValue: {
        liveQueryStore,
      },
    },
  }),
});
Enter fullscreen mode Exit fullscreen mode

目前有一个正在进行中的 pull request,用于在graphql-js参考实现中添加`for`和 `and`指令AsyncIterableIterator<ExecutionResult>的有效返回结果。这样做的一个附带好处是,也有助于实时查询。execute@defer@stream

通知实时查询存储更改

为了重新执行已注册的实时查询并将新结果流式传输到连接的客户端,我们必须使选择特定数据的实时查询操作失效。

对于给定的查询:

query messages @live {
  messages(first: 10) {
    id
    content
    author {
      id
      name
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

那看起来会是这样的:

// Mutation.createMessage resolver
const createMessage = async (root, args, context) => {
  await context.messageStore.createMessage({
    content: args.content,
    userId: context.viewer.id,
  });
  // notify liveQueryStore via query schema coordinate that all live queries that select Queries.messages must be re-executed and sent to the clients.
  context.liveQueryStore.invalidate("Query.messages");
};

const editMessage = async (root, args, context) => {
  await context.messageStore.updateMessage({
    messageId: args.messageId,
    content: args.content,
  });
  // notify liveQueryStore via resource identifier
  context.liveQueryStore.invalidate(`Message:${args.messageId}`);
}
Enter fullscreen mode Exit fullscreen mode

注:这里或许可以进行某种抽象。想象一下,实时查询存储位于网格入口点,它跟踪正在发生的实时查询和变更,然后根据变更自动触发失效,而不是将其硬编码到解析器中。Apollo最近构建了一个类似但又有所不同的功能

将指令添加@live到您的 GraphQL schema 中

@n1ru4l/graphql-live-query导出一个可以轻松添加到现有模式中的指令。您必须添加此指令,否则 GraphQL 服务器会报错,提示您的 GraphQL 操作中存在未知指令。

import { GraphQLLiveDirective } from "@n1ru4l/graphql-live-query";

export const schema = new gql.GraphQLSchema({
  query,
  mutation,
  subscription,
  directives: [GraphQLLiveDirective],
});
Enter fullscreen mode Exit fullscreen mode

对于使用 GraphQL SDL 驱动的开发流程的用户,必须将以下内容添加到类型定义中。

directive @live on QUERY
Enter fullscreen mode Exit fullscreen mode

在客户端上消费实时查询

@n1ru4l/socket-io-graphql-client软件包可用于对服务器执行(实时)操作QueryMutation实现了基于 Socket.io 协议的底层 GraphQL。Subscription@n1ru4l/socket-io-graphql-server

我还创建了一个 PR,用于支持 graphql-transport-ws 的实时查询

基本客户端创建

import io from "socket.io-client";
import { createSocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";

const socket = io();

const socketIOGraphQLClient = createSocketIOGraphQLClient(socket);
Enter fullscreen mode Exit fullscreen mode

执行 GraphQL 操作

SocketIOGraphQLClient提供了一个execute方法,该方法将返回一个 Observable,可用于订阅响应。

简单的查询或变更操作只会发布一个值。然而,实时查询或订阅会发布多个值,因此 Promise 并非处理此类情况的理想数据结构。

返回的可观察对象与提案规范兼容,并且可以轻松地被 apollo-client 和 relay 等库以及 GraphiQL 等工具使用。

socketIOGraphQLClient.execute({
  operation: /* GraphQL */ `
    query messages {
      id
      content
    }
  `
}, {
  next: console.log,
  error: console.log,
  complete: console.log
});

socketIOGraphQLClient.execute({
  operation: /* GraphQL */ `
    query messages @live {
      id
      content
    }
  `
}, {
  next: console.log,
  error: console.log,
  complete: console.log
});

socketIOGraphQLClient.execute({
  operation: /* GraphQL */ `
    subscription onNewMessage {
      onNewMessage {
        id
        content
      }
    }
  `
}, {
  next: console.log,
  error: console.log,
  complete: console.log
});
Enter fullscreen mode Exit fullscreen mode

GraphiQL 获取器

我们可以轻松地使用我们的 GraphiQL API 来获取和显示结果SocketIOGraphQLClient。我们只需要将自定义的 fetcher 传递给 GraphiQL 组件即可。


const fetcher = ({ query: operation, ...restGraphQLParams }) =>
  ({
    subscribe: (
      sinkOrNext,
      ...args
    ) => {
      const sink: Sink =
        typeof sinkOrNext === "function"
          ? { next: sinkOrNext, error: args[0], complete: args[1] }
          : sinkOrNext;

      const unsubscribe = socketIOGraphQLClient.execute(
        {
          operation,
          ...restGraphQLParams,
        },
        sink
      );

      return { unsubscribe };
    },
  });

const CustomGraphiQL = () => (
  <GraphiQL
    fetcher={({ query: operation, ...execRest }) =>
      socketIOGraphQLClient.execute({ operation, ...execRest })
    }
  />
);
Enter fullscreen mode Exit fullscreen mode

使用继电器实现

Relay 是一个功能强大的客户端缓存管理库。围绕 RelaySocketIOGraphQLClient实例可以轻松构建一个 Relay 环境(其中包含有关缓存以及如何从服务器获取数据的信息)。

import { SocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";
import {
  Environment,
  Network,
  RecordSource,
  Store,
  Observable,
  GraphQLResponse,
  RequestParameters,
  Variables,
} from "relay-runtime";

export const createRelayEnvironment = (
  networkInterface: SocketIOGraphQLClient<GraphQLResponse, Error>
) => {
  const execute = (request: RequestParameters, variables: Variables) => {
    if (!request.text) throw new Error("Missing document.");
    const { text: operation, name } = request;

    return Observable.create<GraphQLResponse>((sink) =>
      networkInterface.execute(
        {
          operation,
          variables,
          operationName: name,
        },
        sink
      )
    );
  };

  const network = Network.create(execute, execute);
  const store = attachNotifyGarbageCollectionBehaviourToStore(
    new Store(new RecordSource())
  );

  return new Environment({
    network,
    store,
  });
};
Enter fullscreen mode Exit fullscreen mode

这样一来,使用实时数据就变得非常简单:

const ChatApplicationMessagesQuery = graphql`
  query ChatApplication_MessagesQuery @live {
    messages(limit: 10) {
      id
      ...ChatApplication_message
    }
  }
`;

const ChatApplicationMessageRenderer = React.memo(
  ({ message }: { message: ChatApplication_message }) => {
    return (
      <div>
        <div>{message.author.name}</div>
        <div>{message.content}</div>
      </div>
    );
  }
);

const ChatApplicationMessage = createFragmentContainer(
  ChatApplicationMessageRenderer,
  {
    message: graphql`
      fragment ChatApplication_message on Message {
        id
        content
        author {
          id
          name
        }
      }
    `,
  }
);

export const ChatApplication: React.FunctionComponent<{
  relayEnvironment: RelayEnvironment;
}> = (props) => {
  return (
    <QueryRenderer<ChatApplication_MessagesQuery>
      environment={props.relayEnvironment}
      query={ChatApplicationMessagesQuery}
      variables={{}}
      render={({ props }) => {
        if (!props) {
          return null;
        }

        return props.messages.map((message) => (
          <ChatApplicationMessage key={message.id} message={message} />
        ));
      }}
    />
  );
};
Enter fullscreen mode Exit fullscreen mode

查看完整的示例应用程序

与 Apollo Client 一起使用

import { SocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";
import {
  ApolloClient,
  InMemoryCache,
  ApolloLink,
  Operation,
  Observable,
  FetchResult,
  Observable,
} from "@apollo/client";
import { print } from "graphql";

class SocketIOGraphQLApolloLink extends ApolloLink {
  private networkLayer: SocketIOGraphQLClient;
  constructor(networkLayer: SocketIOGraphQLClient) {
    super();
    this.networkLayer = networkLayer;
  }

  public request(operation: Operation): Observable<FetchResult> | null {
    return new Observable((sink) =>
      this.networkLayer.execute({
        operationName: operation.operationName,
        operation: print(operation.query),
        variables: operation.variables,
      })
    );
  }
}

export const createApolloClient = (networkInterface: SocketIOGraphQLClient) => {
  return new ApolloClient({
    link: new SocketIOGraphQLApolloLink(networkInterface),
    cache: new InMemoryCache(),
  });
};
Enter fullscreen mode Exit fullscreen mode

查看完整的示例应用程序

与 Urql 一起使用

import { SocketIOGraphQLClient } from "@n1ru4l/socket-io-graphql-client";
import {
  Client,
  dedupExchange,
  cacheExchange,
  subscriptionExchange,
  ExecutionResult,
} from "urql";

export const createUrqlClient = (
  networkInterface: SocketIOGraphQLClient<ExecutionResult>
) => {
  return new Client({
    url: "noop",
    exchanges: [
      dedupExchange,
      cacheExchange,
      subscriptionExchange({
        forwardSubscription: (operation) => ({
          subscribe: (sink) => ({
            unsubscribe: networkInterface.execute(
              {
                operation: operation.query,
                variables: operation.variables,
              },
              sink
            ),
          }),
        }),
        enableAllOperations: true,
      }),
    ],
  });
};
Enter fullscreen mode Exit fullscreen mode

查看完整的示例应用程序

未来

这是实时查询库的第一个实现版本。随着越来越多的人试用并使用它构建项目,API 将变得更加成熟和灵活,以适应不同的使用场景。

可以构建分布式系统的新LiveQueryStore实现(例如基于 Redis PubSub)。

此外,还可以优化网络层,使其仅向客户端传输更新后的补丁指令,以减少有效载荷的大小

其他人可能不会使用 Socket.io。(混合)实现其他服务器库(例如 apollo-server 或 express-graphql)必须自行构建。

注意: GraphQL 实时查询现在可以执行graphql-ws,甚至包括实验性查询。快来体验GraphQL 前沿测试平台吧express-graphql

你有什么想法吗?欢迎通过Twitter联系我,或者在 GitHub 上提交issue,也可以在下方留言 😊。

文章来源:https://dev.to/n1ru4l/graphql-live-queries-with-socket-io-4mh6