发布于 2026-01-06 0 阅读
0

★ 隆重推出 laravel-websockets,一个易于使用的 PHP WebSocket 服务器

★ 隆重推出 laravel-websockets,一个易于使用的 PHP WebSocket 服务器

laravel-websockets是一个 Laravel 扩展包,可以完全处理 WebSocket 的服务器端。它彻底取代了Pusher或基于 JavaScript 的laravel-echo-server等服务。它拥有详尽的文档一个可供体验的演示应用程序。过去几周,我和beyondcode 的开发者兼联合创始人Marcel一直在共同开发这个扩展包。在这篇博文中,我们将向大家介绍这个扩展包。

安慰

什么是WebSocket?

简单来说,WebSocket 连接是浏览器和服务器之间建立的持久连接。它支持双向通信:服务器可以向浏览器发送消息,而浏览器(客户端)可以通过同一连接做出响应。这与普通的 Ajax 不同,Ajax 仅支持单向通信:只有客户端可以向服务器请求数据。

WebSocket 主要用于实时应用程序,例如聊天应用程序。这样的聊天应用程序可以这样工作。

  1. 用户 A 将浏览器指向聊天应用程序。浏览器与服务器建立 WebSocket 连接。请记住,服务器会保持连接打开状态。
  2. 用户 B、C、D……都做了同样的事情。服务器现在有多个打开的连接。
  3. 第一个用户通过打开的 WebSocket 连接向服务器发送消息。
  4. 服务器看到通过第一个连接传入的消息,并将该消息发送到它打开的所有其他连接。
  5. 用户 B、C、D 等的浏览器通过 WebSocket 收到传入的消息,并可以对其进行一些操作(如果是聊天应用程序:则显示该消息)。

所有这些操作都是实时进行的,无需轮询。即使消息数量很多,有效负载也会小得多。也无需通过连接发送头部信息等。

什么是 laravel-websockets?

我们的laravel-websockets包可以帮你处理 WebSocket 的服务器端。它包含一个用 PHP 实现的 Web 服务器,可以处理传入的 WebSocket 连接。

laravel-websockets 构建于Ratchet之上,Ratchet 是一个用于处理 WebSocket 的底层包。虽然可以直接使用 Ratchet,但需要一些研究和配置才能使其正常工作。我们的软件包简化了这些步骤:您只需一分钟即可运行一个 WebSocket 服务器。

我和 Marcel 添加了很多实用功能,如果使用 Ratchet 本身来实现这些功能,会非常耗时。它支持多租户,因此您可以搭建一个 WebSocket 服务器并将其用于多个不同的应用程序。我们还添加了一个实时调试仪表板,方便您检查通过 WebSocket 传输的数据。所有这些都由一个实时图表提供支持,该图表可让您深入了解 WebSocket 应用程序的关键指标,例如峰值连接数、已发送的消息数和已接收的 API 消息数。

我们还实现了Pusher 消息协议。这样一来,所有支持 Pusher 的现有软件包和应用程序都能与我们的软件包兼容。Laravel 的广播功能也能完美运行。用于处理客户端 WebSocket 的 JavaScript 库laravel-echo也完全兼容。您可以利用 Pusher 提供的所有功能,例如私有频道在线状态频道,甚至Pusher HTTP API

除了作为 Pusher 的免费替代方案外,该软件包还为 Laravel 软件包开发者带来了巨大的优势。现在,向应用程序或软件包中添加 WebSocket 功能变得更加容易——因为我们的软件包完全消除了安装第三方应用程序或服务器的需求。此外,Pusher 的最大有效负载大小限制也相当保守。而我们的软件包允许您自定义最大有效负载大小

如何使用它

在深入了解其底层工作原理之前,我们先来看看它的实际使用方法。

首先,使用 Composer 引入它。

composer require beyondcode/laravel-websockets
Enter fullscreen mode Exit fullscreen mode

此软件包包含一个迁移文件,用于在运行 WebSocket 服务器时存储统计信息。您可以使用以下命令发布迁移文件:

php artisan vendor:publish --provider="BeyondCode\LaravelWebSockets\WebSocketsServiceProvider" --tag="migrations"
Enter fullscreen mode Exit fullscreen mode

运行迁移命令:

php artisan migrate
Enter fullscreen mode Exit fullscreen mode

接下来,您必须发布配置文件。

php artisan vendor:publish --provider="BeyondCode\LaravelWebSockets\WebSocketsServiceProvider" --tag="config"
Enter fullscreen mode Exit fullscreen mode

这是即将发布的文件。

return [

    /*
     * This package comes with multi tenancy out of the box. Here you can
     * configure the different apps that can use the webSockets server.
     *
     * Optionally you can disable client events so clients cannot send
     * messages to each other via the webSockets.
     */
    'apps' => [
        [
            'id' => env('PUSHER_APP_ID'),
            'name' => env('APP_NAME'),
            'key' => env('PUSHER_APP_KEY'),
            'secret' => env('PUSHER_APP_SECRET'),
            'enable_client_messages' => false,
            'enable_statistics' => true,
        ],
    ],

    /*
     * This class is responsible for finding the apps. The default provider
     * will use the apps defined in this config file.
     *
     * You can create a custom provider by implementing the
     * `AppProvider` interface.
     */
    'app_provider' => BeyondCode\LaravelWebSockets\Apps\ConfigAppProvider::class,

    /*
     * This array contains the hosts of which you want to allow incoming requests.
     * Leave this empty if you want to accept requests from all hosts.
     */
    'allowed_origins' => [
        //
    ],

    /*
     * The maximum request size in kilobytes that is allowed for an incoming WebSocket request.
     */
    'max_request_size_in_kb' => 250,

    /*
     * This path will be used to register the necessary routes for the package.
     */
    'path' => 'laravel-websockets',

    'statistics' => [
        /*
         * This model will be used to store the statistics of the WebSocketsServer.
         * The only requirement is that the model should extend
         * `WebSocketsStatisticsEntry` provided by this package.
         */
        'model' => \BeyondCode\LaravelWebSockets\Statistics\Models\WebSocketsStatisticsEntry::class,

        /*
         * Here you can specify the interval in seconds at which statistics should be logged.
         */
        'interval_in_seconds' => 60,

        /*
         * When the clean-command is executed, all recorded statistics older than
         * the number of days specified here will be deleted.
         */
        'delete_statistics_older_than_days' => 60
    ],

    /*
     * Define the optional SSL context for your WebSocket connections.
     * You can see all available options at: http://php.net/manual/en/context.ssl.php
     */
    'ssl' => [
        /*
         * Path to local certificate file on filesystem. It must be a PEM encoded file which
         * contains your certificate and private key. It can optionally contain the
         * certificate chain of issuers. The private key also may be contained
         * in a separate file specified by local_pk.
         */
        'local_cert' => null,

        /*
         * Path to local private key file on filesystem in case of separate files for
         * certificate (local_cert) and private key.
         */
        'local_pk' => null,

        /*
         * Passphrase for your local_cert file.
         */
        'passphrase' => null
    ],
];
Enter fullscreen mode Exit fullscreen mode

最后一步是填写一些环境变量。请确保你的配置文件中已填写了`<path>` APP_NAME、 `<your-path> ` PUSHER_APP_ID、 `<your-your-path>` 和 `<your-your-path>` 等参数。你可能想知道为什么这里要使用前缀。稍后会详细解释。PUSHER_APP_KEYPUSHER_APP_SECRET.envPUSHER_

完成这些准备工作后,您就可以执行此命令来启动服务器了。

php artisan websockets:serve
Enter fullscreen mode Exit fullscreen mode

现在你可以使用像 laravel-echo 这样的库来连接服务器。我不会详细介绍整个过程。要开始使用客户端,请查看我们的文档laravel-echo 文档以及这个演示应用程序

演示

它的底层工作原理是什么?

如果您对所有这些工作原理的细节不感兴趣,而只想使用它,您可以直接跳到调试仪表板部分。

执行时php artisan websockets:serve你会注意到命令永远不会结束。这是因为该命令内部启动了一个 Ratchet 服务器,该服务器开始接受连接(默认端口为 6001)。

protected function startWebSocketServer()
{
    $this->info("Starting the WebSocket server on port {$this->option('port')}...");

    $routes = WebSocketsRouter::getRoutes();

    /**? Start the server ? */
    (new WebSocketServerFactory())
        ->useRoutes($routes)
        ->setHost($this->option('host'))
        ->setPort($this->option('port'))
        ->setConsoleOutput($this->output)
        ->createServer()
        ->run();
}
Enter fullscreen mode Exit fullscreen mode

我们提供给服务器的其中一项内容是$routes。该变量是 的一个实例。它是在我们的 Router 类\Symfony\Component\Routing\RouteCollection中创建的

因为在我们的代码中StartWebSocketsCommand我们调用了echo方法,所以路由将包含这些路由

public function echo()
{
    $this->get('/app/{appKey}', WebSocketHandler::class);

    $this->post('/apps/{appId}/events', gerEventController::class);
    $this->get('/apps/{appId}/channels', hChannelsController::class);
    $this->get('/apps/{appId}/channels/{channelName}', hChannelController::class);
    $this->get('/apps/{appId}/channels/{channelName}/users', hUsersController::class);

}
Enter fullscreen mode Exit fullscreen mode

现在有趣的部分来了。第一个路由将处理传入的 WebSocket 请求。路由 URL 之所以/app/{appKey}是这个特定的 URL,是因为Pusher 就是使用这个 URL 的。使用这个 URL 至关重要,它能确保所有与 Pusher 兼容的 JavaScript 客户端都能正常工作。

其他的服务器则监听传入的 HTTP 连接。仔细想想,这真是太神奇了:我们的小服务器可以同时监听 WebSocket 和 HTTP 连接。PHP 是不是很棒?

传入的 WebSocket 连接

让我们更深入地了解一下 WebSocket 连接的处理方式。

$this->get('/app/{appKey}', WebSocketHandler::class);
Enter fullscreen mode Exit fullscreen mode

get方法会Router调用另一个getRoute方法。在该方法中,我们将实例化的对象封装WebSocketHandler在一个WsServer类中。WsServer该类由 Ratchet 提供,它提供了处理 WebSocket 细节工作的代码。

// inside the `BeyondCode\LaravelWebSockets\Server\Router` class

public function get(string $uri, $action)
{
    $this->addRoute('GET', $uri, $action);
}

protected function getRoute(string $method, string $uri, $action): Route
{
    /**
     * If the given action is a class that handles WebSockets, then it's not a regular
     * controller but a WebSocketHandler that needs to converted to a WsServer.
     *
     * If the given action is a regular controller we'll just instanciate it.
     */
    $action = is_subclass_of($action, MessageComponentInterface::class)
        ? $this->createWebSocketsServer($action)
        : app($action);

    return new Route($uri, ['_controller' => $action], [], [], null, [], [$method]);
}

protected function createWebSocketsServer(string $action): WsServer
{
    $app = app($action);

    if (WebsocketsLogger::isEnabled()) {
        $app = WebsocketsLogger::decorate($app);
    }

    return new WsServer($app);
}
Enter fullscreen mode Exit fullscreen mode

现在您已经了解了 WebSocket 部分的路由设置方式,让我们来看看它WebSocketHandler实际执行的操作。

你可以把这个WebSocketHandler类看作是一个控制器,但它是针对 WebSocket 而不是 HTTP 的。它包含各种生命周期方法:

  • onOpen当新客户端打开 WebSocket 连接时,将调用此函数。
  • onMessage:当客户端通过 webSocket 发送新消息时将被调用。
  • onClose:当客户端断开连接时(通常是通过导航到另一个页面,或关闭标签页或浏览器窗口),将调用此函数。

onOpen

这是该方法的代码onOpen

public function onOpen(ConnectionInterface $connection)
{
    $this
        ->verifyAppKey($connection)
        ->generateSocketId($connection)
        ->establishConnection($connection);
}
Enter fullscreen mode Exit fullscreen mode

当有新连接建立时,会发生三件事。第一件事是应用程序验证。还记得用于处理 WebSocket 连接的那条路由吗?

$this->get('/app/{appKey}', WebSocketHandler::class);
Enter fullscreen mode Exit fullscreen mode

verifyAppKey函数会检查给定的值是否appKey正确。在默认安装中,App::findByAppKey它会检查配置文件中是否存在定义了具有此键的应用程序websockets.php

protected function verifyAppKey(ConnectionInterface $connection)
{
    $appKey = QueryParameters::create($connection->httpRequest)->get('appKey');

    if (!$app = App::findByKey($appKey)) {
        throw new UnknownAppKey($appKey);
    }

    $connection->app = $app;

    return $this;
}
Enter fullscreen mode Exit fullscreen mode

我们在连接本身上存储一个引用,app以便我们可以在连接的后续事件中使用它(例如,当收到消息时)。

其次,我们将生成一个套接字 ID。

protected function generateSocketId(ConnectionInterface $connection)
{
    $socketId = sprintf("%d.%d", random_int(1, 000000), random_int(1, 1000000000));

    $connection->socketId = $socketId;

    return $this;
}
Enter fullscreen mode Exit fullscreen mode

该 IDsocketId必须遵循特定格式:两个数字用点号分隔。我们将把这个 ID 存储在连接本身中。我们将使用该 ID 来标识此特定连接。请记住,我们的服务器进程永不结束,因此只要连接保持打开状态,该 ID 就会一直保留在连接中。

最后一步opOpen是向浏览器发送连接成功的响应。同样,我们遵循 Pusher 协议,并发送一个pusher:connection_established事件,以兼容所有现有的 Pusher 包。我们还会将此事件记录到控制面板中。

protected function establishConnection(ConnectionInterface $connection)
{
    $connection->send(json_encode([
        'event' => 'pusher:connection_established',
        'data' => json_encode([
            'socket_id' => $connection->socketId,
            'activity_timeout' => 30,
        ])
    ]));

    DashboardLogger::connection($connection);

    return $this;
}
Enter fullscreen mode Exit fullscreen mode

所有这些工作完成后,我们建立了畅通的连接。

现在我们来看看通过此连接发送消息时会发生什么。

onMessage

让我们来看一下中的函数onMessageWebSocketHandler

public function onMessage(ConnectionInterface $connection, MessageInterface $message)
{
    $message = PusherMessageFactory::createForMessage($message, $connection, $this->channelManager);

    $message->respond();
}
Enter fullscreen mode Exit fullscreen mode

在这里,我们接收消息并做出响应。让我们深入了解一下createForMessage的方法PusherMessageFactory

use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use Ratchet\ConnectionInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;

class PusherMessageFactory
{
    public static function createForMessage(
        MessageInterface $message,
        ConnectionInterface $connection,
        ChannelManager $channelManager): PusherMessage
    {
        $payload = json_decode($message->getPayload());

        return starts_with($payload->event, 'pusher:')
            ? new PusherChannelProtocolMessage($payload, $connection, $channelManager)
            : new PusherClientMessage($payload, $connection, $channelManager);
    }
}
Enter fullscreen mode Exit fullscreen mode

在上面的类中,我们将根据有效负载中的事件名称选择正确的消息类型。它PusherChannelProtocolMessage主要处理与频道订阅相关的消息。PusherClientMessage它将处理由已订阅频道的客户端发送的消息,稍后会详细介绍。

我们来看看发生了什么PusherChannelProtocolMessage。更具体地说,我们将查看它的respond方法,因为这是中调用的方法。WebSocketHandler

public function respond()
{
    $eventName = camel_case(str_after($this->payload->event, ':'));

    if (method_exists($this, $eventName)) {
        call_user_func([$this, $eventName], $this->connection, $this->payload->data ?? new stdClass());
    }
}
Enter fullscreen mode Exit fullscreen mode

$this->payload->event将包含类似这样的字符串pusher:subscribe。因此,在上面的代码中$eventName,将包含subscribe

我们先来谈谈订阅。Pusher 使用不同的频道channels。你可以把频道想象成收音机上的频道。如果你把收音机调到特定的频道,就能听到该频道发送的消息。根据Pusher 协议,想要订阅频道的客户端必须使用相应的subscribe订阅方法。

call_user_func上述函数中的调用respond尝试调用一个与事件同名的函数。因此,对于该事件subscribesubscribe将会调用该方法。

protected function subscribe(ConnectionInterface $connection, stdClass $payload)
{
    $channel = $this->channelManager->findOrCreate($connection->app->id, $payload->channel);

    $channel->subscribe($connection, $payload);
}
Enter fullscreen mode Exit fullscreen mode

在这里,我们首次与该类ChannelManager建立联系。该类负责跟踪哪些连接订阅了哪些频道。

让我们来看看被叫到的情况findOrCreateChannelManager

public function findOrCreate(string $appId, string $channelName): Channel
{
    if (!isset($this->channels[$appId][$channelName])) {
        $channelClass = $this->determineChannelClass($channelName);

        $this->channels[$appId][$channelName] = new $channelClass($channelName);
    }

    return $this->channels[$appId][$channelName];
}
Enter fullscreen mode Exit fullscreen mode

在这个函数中,我们将查找Channel给定参数的实例appIdchannelName如果实例不存在,我们将创建一个新的实例并将其存储在channels实例变量中。由于我们的服务器进程永不结束,并且对所有传入的连接都相同,因此将其存储在内存中就足够了,无需将其持久化到其他地方。

让我们深入了解determineChannelClass并讨论 Pusher 定义的不同通道类型。

protected function determineChannelClass(string $channelName): string
{
    if (starts_with($channelName, 'private-')) {
        return PrivateChannel::class;
    }

    if (starts_with($channelName, 'presence-')) {
        return PresenceChannel::class;
    }

    return Channel::class;
}
Enter fullscreen mode Exit fullscreen mode

Pusher有三种不同的通道类型:

  • 私有频道:客户端在实际订阅频道之前,需要进行身份验证检查。
  • 在线状态频道:与私人频道相同,但具有额外的功能来确定谁在频道中(如果您正在构建类似聊天室的功能,这将非常有用)。
  • 公共频道:任何人都可以订阅这些频道,无需进行身份验证检查。

这三种通道类型在我们的代码中分别实现为 `new` PrivateChannel、` PresenceChannelnew` 和 `new` Channel。在`new` 中determineChannelClass,我们创建一个新的通道并返回正确类型的通道。

让我们回到前面提到的类中subscribe方法。以下是代码。PusherChannelProtocolMessage

protected function subscribe(ConnectionInterface $connection, stdClass $payload)
{
    $channel = $this->channelManager->findOrCreate($connection->client->appId, $payload->channel);

    $channel->subscribe($connection, $payload);
}
Enter fullscreen mode Exit fullscreen mode

现在我们知道它$channel持有右通道类的一个实例,让我们来看看它的实现。首先,让我们看看常规subscribe类中发生了什么Channel

public function subscribe(ConnectionInterface $connection, stdClass $payload)
{
    $this->saveConnection($connection);

    $connection->send(json_encode([
        'event' => 'pusher_internal:subscription_succeeded',
        'channel' => $this->channelName
    ]));
}

protected function saveConnection(ConnectionInterface $connection)
{
    $hadConnectionsPreviously = $this->hasConnections();

    $this->subscriptions[$connection->socketId] = $connection;

    if (! $hadConnectionsPreviously) {
        DashboardLogger::occupied($connection, $this->channelName);
    }

    DashboardLogger::subscribed($connection, $this->channelName);
}
Enter fullscreen mode Exit fullscreen mode

订阅频道实际上就是存储socketId连接的 ID(还记得我们在方法中设置的 IDonOpen吗?是在subscriptions频道的实例变量中设置的?)。saveConnection同时,还会向连接发送一条消息,告知客户端订阅成功。该pusher_internal:subscription_succeeded事件名称由 Pusher 协议指定

这样,客户端就被订阅了,传递给客户端的消息onMessage也被处理了。

但如果我们不使用常规通道会发生什么呢PrivateChannel?让我们来看一下PrivateChannel

namespace BeyondCode\LaravelWebSockets\WebSockets\Channels;

use Ratchet\ConnectionInterface;
use stdClass;

class PrivateChannel extends Channel
{
    public function subscribe(ConnectionInterface $connection, stdClass $payload)
    {
        $this->verifySignature($connection, $payload);

        parent::subscribe($connection, $payload);
    }
}
Enter fullscreen mode Exit fullscreen mode

因此,该subscribe方法与常规通道中的操作完全相同,但会在实际订阅之前验证签名。由于本文篇幅较长,我不会详细介绍验证过程。如果您想了解更多信息,请查看该方法的实现以及Pusher 文档中关于身份验证的部分

这样,客户端就订阅了一个频道。现在我们来回顾一下当客户端向频道发送消息时会发生什么。

根据推送协议,客户端可以使用包含以 . 开头的事件名称的消息相互发送消息client-。让我们再看一下PusherMessageFactory.

return starts_with($payload->event, 'pusher:')
    ? new PusherChannelProtocolMessage($payload, $connection, $channelManager)
    : new PusherClientMessage($payload, $connection, $channelManager);
Enter fullscreen mode Exit fullscreen mode

我们已经介绍了PusherChannelProtocolMessage,现在让我们来看看respond方法PusherClientMessage

public function respond()
{
    if (!starts_with($this->payload->event, 'client-')) {
        return;
    }

    if (! $this->connection->app->clientMessagesEnabled) {
        return;
    }

    DashboardLogger::clientMessage($this->connection, $this->payload);

    $channel = $this->channelManager->find($this->connection->app->id, $this->payload->channel);

    optional($channel)->broadcastToOthers($this->connection, $this->payload);
}
Enter fullscreen mode Exit fullscreen mode

我们来看一下这段代码。如果消息的事件名称不是以“.”开头client-,则说明它不是客户端到客户端的消息,我们将中止操作。第二个 if 语句检查客户端到客户端消息传递是否已启用(可以针对每个应用程序禁用)。接下来,我们将把消息记录到调试仪表板上。

解决了这个问题,真正的工作就可以开始了。我们要让他们ChannelManager找到最合适的Channel表达方式。如果他们能找到,Channel我们就会采用这种方式broadcast

实现起来并不难。broadcastToOthers下面Channel的代码会遍历订阅该通道的每个连接,并将有效负载发送给这些连接。发送原始消息的客户端不需要接收该消息。我们使用之前设置的 socketId过滤掉发送者的连接。

public function broadcastToOthers(ConnectionInterface $connection, $payload)
{
    $this->broadcastToEveryoneExcept($payload, $connection->socketId);
}

public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
{
    if (is_null($socketId)) {
        return $this->broadcast($payload);
    }

    foreach ($this->subscribedConnections as $connection) {
        if ($connection->socketId !== $socketId) {
            $connection->send(json_encode($payload));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

这样,我们就向该渠道上的所有客户发送了一条消息。

关闭

第三个生命周期方法WebSocketHandler是 `onEnd()` onClose。当客户端离开时,例如关闭标签页或浏览器窗口时,就会调用此方法。它的实现非常简单。

public function onClose(ConnectionInterface $connection)
{
    $this->channelManager->removeFromAllChannels($connection);
}
Enter fullscreen mode Exit fullscreen mode

removeFromAllChannels在方法内部ChannelManager,我们将移除所有通道的连接。我们还会移除不再有任何连接的通道和应用,以避免内存泄漏。

public function removeFromAllChannels(ConnectionInterface $connection)
{
    if (!isset($connection->app)) {
        return;
    }

    /**
     * Remove the connection from all channels.
     */
    collect(array_get($this->channels, $connection->app->id, []))->each->unsubscribe($connection);

    /**
     * Unset all channels that have no connections so we don't leak memory.
     */
    collect(array_get($this->channels, $connection->app->id, []))
        ->reject->hasConnections()
        ->each(function (Channel $channel, string $channelName) use ($connection) {
            unset($this->channels[$connection->app->id][$channelName]);
        });

    if (count(array_get($this->channels, $connection->app->id, [])) === 0) {
        unset($this->channels[$connection->app->id]);
    };
}
Enter fullscreen mode Exit fullscreen mode

传入的 HTTP 连接

前面我们已经解释过如何php artisan websockets:serve启动 Ratchet 服务器。如果你从头到尾阅读了这篇文章,你应该已经了解 Ratchet 服务器是如何处理 WebSocket 的。现在我们来重点讨论 HTTP 部分。

Laravel 对事件广播提供了出色的支持。你有没有想过 Laravel 广播事件时底层发生了什么?简单来说,它会向指定的端点发送一个带有特定有效负载的 HTTP 请求。

我们的文档明确指出,您应该将hostPusher 配置更改为 '127.0.0.1'(或运行我们软件包的服务器主机名)。这样 Laravel 就会将 HTTP 请求发送到我们的软件包,而不是 Pusher。

还记得我们服务器中的那些路由吗?当 Laravel 广播一个事件时,它会向这个路由发送一个 POST 请求:

$this->post('/apps/{appId}/events', TriggerEventController::class);
Enter fullscreen mode Exit fullscreen mode

我们来看看那段代码。TriggerEventController

class TriggerEventController extends Controller
{
    public function __invoke(Request $request)
    {
        $this->ensureValidSignature($request);

        foreach ($request->json()->get('channels', []) as $channelName) {
            $channel = $this->channelManager->find($request->appId, $channelName);

            optional($channel)->broadcastToEveryoneExcept([
                'channel' => $channelName,
                'event' => $request->json()->get('name'),
                'data' => $request->json()->get('data'),
            ], $request->json()->get('socket_id'));

            DashboardLogger::apiMessage(
                $request->appId,
                $channelName,
                $request->json()->get('name'),
                $request->json()->get('data')
            );
        }

        return $request->json()->all();
    }
}
Enter fullscreen mode Exit fullscreen mode

我们不希望任何人都能调用这个端点。发送请求时,Laravel 会将应用签名的哈希版本以及其他一些参数添加到请求中。在ensureValidSignature方法中,我们会验证该签名是否正确。

如果签名正确,我们将让它ChannelManager找到传入请求的目标通道。这很有意思。这个通道管理器是如何实例化的?它又是如何知道这些通道的呢?我们的服务器是一个永不停止的进程。它同时处理 WebSocket 和 HTTP 请求。通道管理器ChannelManager绑定为单例模式,因此只能存在一个实例。它ChannelManager保存着当前订阅某个通道的所有客户端的连接。因此,在处理这个 HTTP 请求的同时,我们可以通过已连接的 WebSocket 发送消息。在我们看来,这真是太棒了!

ChannelManager我们将对找到的每个通道调用该broadcastToEveryoneExcept方法。以下是实现代码:

public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
{
    if (is_null($socketId)) {
        return $this->broadcast($payload);
    }

    foreach ($this->subscribedConnections as $connection) {
        if ($connection->socketId !== $socketId) {
            $connection->send(json_encode($payload));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

如前所述,通道 (channel) 保存所有订阅它的连接。如果 Laravel 没有收到特定客户端的广播指令,那么该消息很可能是由服务器自身发起的,并且应该广播给所有订阅的连接。

如果 Laravel 的广播是由特定客户端发起的,那么 Laravel 会将该连接的 socketId 与有效负载一起发送。在这种情况下,我们会将消息发送给除发起消息的客户端之外的所有已连接客户端。

这就是HTTP协议的工作原理!

如果你不想使用 Pusher 协议怎么办?

虽然我们的软件包的主要目的是尽可能简化 Pusher JavaScript 客户端或 Laravel Echo 的使用,但您完全不必局限于 Pusher 协议。在某些情况下,您可能只需要一个简单的、精简的 WebSocket 服务器,并且希望完全控制传入的有效负载及其处理方式,而无需使用“通道”等其他机制。

您可以轻松创建自己的自定义 WebSocketHandler 类。您只需实现 Ratchets 接口即可Ratchet\WebSocket\MessageComponentInterface

实现之后,您将得到一个类似这样的类:

namespace App;

use Ratchet\ConnectionInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;
use Ratchet\WebSocket\MessageComponentInterface;

class MyCustomWebSocketHandler implements MessageComponentInterface
{

    public function onOpen(ConnectionInterface $connection)
    {
        // TODO: Implement onOpen() method.
    }

    public function onClose(ConnectionInterface $connection)
    {
        // TODO: Implement onClose() method.
    }

    public function onError(ConnectionInterface $connection, \Exception $e)
    {
        // TODO: Implement onError() method.
    }

    public function onMessage(ConnectionInterface $connection, MessageInterface $msg)
    {
        // TODO: Implement onMessage() method.
    }
}
Enter fullscreen mode Exit fullscreen mode

在类本身中,您可以完全控制 WebSocket 连接的所有生命周期事件,并且可以拦截传入的消息并对其做出反应。

唯一缺少的部分是,您需要告诉我们的 WebSocket 服务器在特定的路由端点加载此处理程序。这可以通过WebSocketsRouter外观模式来实现。

这个类负责将路由注册到实际的 WebSocket 服务器。您可以使用该websocket方法定义自定义 WebSocket 端点。该方法需要两个参数:WebSocket 处理程序所在的路径以及 WebSocket 处理程序类的完全限定类名。

例如,这可以在您的routes/web.php文件中完成。

WebSocketsRouter::webSocket('/my-websocket', \App\MyCustomWebSocketHandler::class);
Enter fullscreen mode Exit fullscreen mode

添加自定义 WebSocket 路由后,请务必重启 WebSocket 服务器以使更改生效。

调试仪表板

Pusher 的众多强大功能之一是“调试控制台”。它提供了一个仪表盘,让您可以实时查看所有 WebSocket 连接、事件和 API 请求。我们也希望将此功能添加到我们的软件包中。因此,我们的软件包也包含一个与 Pusher 功能相同的调试仪表盘。

它看起来是这样的:调试仪表板

WebSocket 控制面板的默认位置是 [此处/laravel-websockets应填写默认位置]。路由会自动注册。如果您想更改控制面板的 URL,可以path在您的配置文件中进行配置config/websockets.php

要访问调试仪表板,您可以在浏览器中访问 Laravel 项目的仪表板 URL。由于您的 WebSocket 服务器支持多个应用程序,您可以选择要连接和检查的应用程序。

按下“连接”按钮,即可建立 WebSocket 连接,并实时查看 WebSocket 服务器上发生的所有事件。

默认情况下,只有当您的应用程序环境设置为 时,才允许访问 WebSocket 控制面板local

不过,您可以通过重写 Laravel Gate 来改变这种行为。LaravelAuthServiceProvider自带的配置文件是一个很好的修改位置。

public function boot()
{
    $this->registerPolicies();

    Gate::define('viewWebSocketsDashboard', function ($user = null) {
        return in_array([
            // 
        ], $user->email);
    });
}
Enter fullscreen mode Exit fullscreen mode

如果您想快速测试某个事件,也可以使用调试仪表板将事件发送到特定频道。

只需输入频道、事件名称并提供有效的 JSON 有效负载,即可将其发送给给定频道中所有已连接的客户端。

调试仪表板上的另一个功能是能够实时查看 WebSocket 服务器的关键指标。

WebSocket 服务器会在后台以固定的时间间隔存储当前峰值连接数、接收到的 WebSocket 消息数和接收到的 API 消息数的快照。默认设置为每 60 秒存储一次快照,但您也可以在配置文件中更改此间隔。

仪表盘上的实时统计数据

它能扩展吗?

这个问题没有简单的答案,因为实际情况可能因人而异。但只要服务器端配置得当,你的 WebSocket 服务器就能轻松应对大量并发连接。

这是一个在 Digital Ocean 最小的服务器实例上进行的基准测试示例,该实例上还运行着其他几个 Laravel 项目。在这个特定的服务器上,最大并发连接数最终达到了约 15,000。

图形

以下是另一个基准测试,该测试是在一台配备 2GB 内存和 2 个 CPU 的 Digital Ocean 服务器上运行的。在此服务器配置下,最大并发连接数接近 60,000。

图形

结对编程很有趣!

我们开发这个软件包的过程非常愉快。这个想法源于我们讨论马塞尔正在开发的 Dusk Dashboard 时产生的灵感。那个仪表盘使用了 WebSocket。我们都觉得肯定有更简便的方法来使用 WebSocket。我们想让类似的东西能够Route::websocket直接运行。

事情进展得很快。Marcel之前用Ratchet搭建过Dusk仪表盘,所以他创建了一个概念验证,证明理论上我们可以用Ratchet替代Pusher提供的所有功能。Freek对代码进行了一些润色,并实现了一些小功能。与此同时,Marcel还添加了炫酷的调试仪表盘,并负责文档的编写。

有几个晚上,我们用 Slack 共享屏幕,一起编程。多租户支持和推送消息的一些功能是通过结对编程实现的。这对我们俩来说都是一次非常棒的经历。

当软件包的功能全部完成后,我们保持联系,每天一起完善软件包和文档。临近尾声时,我们进行了一次大规模的重构,修改了许多命名空间和目录结构,使整个软件包更加清晰易懂。

博客文章驱动开发™

— Freek Van der Herten ( @freekmurze ) 2018 年 12 月 1 日

在撰写这篇博客文章的过程中,我们再次检查了所有代码,并对一些代码进行了优化,甚至添加了一些小功能。我们曾考虑创建一个新组织beyond-spatie来存放这个软件包,但最终认为将其放在一个现有的组织中会更好,因此选择了BeyondCode,也就是 Marcel 的公司。

我们在包装里倾注了很多心血,我们为我们共同完成的工作感到自豪。

最后

如果你读完了以上所有内容,恭喜你!我们知道信息量有点大。虽然后台运行了很多程序,但我们认为这个软件包很容易上手。如果你想了解更多关于这个软件包的信息,请务必阅读我们编写的详尽文档。前往GitHub 上的这个仓库即可查看源代码。

如果您担心在生产环境中使用它,请放心!事实上,该软件包已经在Freek 的监控 SaaS 服务Oh Dear!上运行了一周。该服务大量使用 WebSocket 来实现每个屏幕实时显示信息。所有这些都由我们的 laravel-websockets 软件包提供支持,该软件包每天处理超过 50000 个广播事件。

就连我们自己的文档也使用了由我们软件包支持的 WebSocket 技术。在每页的右上角,你可以看到还有多少其他用户也在阅读该文档。如果你点击那个铃铛图标,它不仅会在你的浏览器中显示动画效果,也会在所有其他读者的页面上显示动画效果。真有趣!

如果您想在生产环境中运行该软件包,请务必查看有关如何使用 SSL 运行它以及一些部署注意事项的部分。

我们俩之前都有丰富的软件包编写经验。如果您喜欢laravel-websockets,也请务必看看我们的其他作品。您可以在 GitHub 上 Beyondcode 的组织页面找到 Marcel 之前编写的软件包列表。Freek 和他的团队创建的软件包则可以在Spatie 网站的开源部分找到。

文章来源:https://dev.to/freekmurze/introducing-laravel-websockets-an-easy-to-use-websocket-server-implemented-in-php-1oj