发布于 2026-01-06 0 阅读
0

用 Ruby 重建 Redis - 第一章 - 一个基本的 TCP 服务器

用 Ruby 重建 Redis - 第一章 - 一个基本的 TCP 服务器

我们将涵盖的内容

本章将介绍如何使用 Ruby 创建 TCP 服务器,以及如何使用 Linux 和 macOS 系统上netcat提供的实用程序与其交互nc。我们将简要了解并发和并行,以及线程如何
影响服务器的行为。

介绍

本书的目标是从零开始实现一个类似 Redis 的服务器。截至撰写本文时,Redis 支持 9 种不同的数据类型,数十个与这些数据类型相关的命令,以及许多管理类别的功能,例如用于高可用性的 Redis Sentinel。

我将从一个高度简化的 Redis 版本开始,然后慢慢添加更多功能,力求尽可能接近 Redis 目前支持的功能。

我选择 Ruby 作为本书的编程语言,纯粹是因为我喜欢它,而且觉得用它开发很有趣。Ruby 常被认为是一门“慢语言”,但性能在这里并不是我主要考虑的因素。虽然我会确保做出合理的
实现选择,但本书的目标是学习更多关于 Redis、Ruby、网络、操作系统信号等方面的知识,而不是开发
一个可以直接用于生产环境的软件。

关于“从零开始”的说明

“从零开始”这个词可能含义模糊,尤其对于像 Ruby 这样开箱即用功能如此丰富的语言而言更是如此

本书的目标是完全依赖 Ruby 标准库。截至 2020 年 4 月撰写本文时,
Ruby 的最新版本为 2.7.1。

现在,让我们来编写一些代码。

Redis 的主要组件是 ` redis-server` redis-server,它是一个可执行文件,用于启动 TCP 服务器。在使用
Redis 进行实验时,通常会使用 ` redis-cliredis-client`,它是一个可执行文件,用于启动连接到 Redis 服务器的 REPL 客户端。
默认情况下,Redis 在本地运行于 6379 端口。

官方 Ruby 文档中给出了TCPServer 类的以下示例

require 'socket'

server = TCPServer.new 2000 # Server bind to port 2000
loop do
  client = server.accept    # Wait for a client to connect
  client.puts "Hello !"
  client.puts "Time is #{Time.now}"
  client.close
end
Enter fullscreen mode Exit fullscreen mode

接下来是以下示例和评论:“一个更易于使用的服务器(服务于多个客户端)”

require 'socket'

server = TCPServer.new 2000
loop do
  Thread.start(server.accept) do |client|
    client.puts "Hello !"
    client.puts "Time is #{Time.now}"
    client.close
  end
end
Enter fullscreen mode Exit fullscreen mode

我暂时忽略第二个例子,因为并发/并行是一个复杂的话题,我打算在以后的文章中深入探讨。
所以,现在我们逐行分析第一个例子:

首先,我们需要引入'socket'Ruby。Ruby 的 require 语法一直让我觉得很奇怪,尤其是
与 Python 等更明确的语法相比。
浏览了一些 Ruby 源代码后,我猜测这个 require 会将Ruby 源代码所在文件夹
所有 c 文件定义的常量添加到`<class>` 中,从而创建诸如 ` <class>` 、`<class>`、`<class>`、`<class>` ` <class>`` <class>` 之类的类 你可以在 shell 中尝试一下,在 调用 ` require`之前引入这些常量会失败,但在调用 `require` 之后就可以正常工作了:lib/socket/$LOAD_PATH
AddrinfoUnixServer
UNIXSocketUDPSocketTCPSocket
TCPServerSOCKSocket
irbNameError
require 'socket'

irb(main):001:0> TCPSocket
Traceback (most recent call last):
# [truncated]
NameError (uninitialized constant TCPSocket)
irb(main):002:0> UNIXServer
Traceback (most recent call last):
# [truncated]
NameError (uninitialized constant UNIXServer)
irb(main):003:0> require 'socket'
=> true
irb(main):004:0> TCPSocket
=> TCPSocket
irb(main):005:0> UNIXServer
=> UNIXServer
Enter fullscreen mode Exit fullscreen mode

下一行代码创建一个新TCPServer实例,监听端口 2000。new 的文档如下

new([hostname], port) => tcpserver

创建绑定到端口的新服务器套接字。

如果提供了主机名,则套接字将绑定到该主机名。

在内部,`::new` 会调用 `getaddrinfo()` 函数来获取地址。如果 `getaddrinfo()` 返回多个地址,`::new`
会尝试为每个地址创建一个服务器套接字,并返回第一个创建成功的套接字。

文档暗示该参数是可选的,省略该参数则默认hostname运行。 很好,现在我们已经有一个运行中的 TCP 服务器,但它真的在做什么吗?让我们来看看。localhost
localhost:2000

我们可以在 shell 中运行服务器irb

irb(main):001:0> require 'socket'
=> true
irb(main):002:0> TCPServer.new 2000
=> #<TCPServer:fd 10, AF_INET6, ::, 2000> # Note that fd value might be different on your system.
Enter fullscreen mode Exit fullscreen mode

我们会花更多时间深入探讨这些数值的含义,但现在让我们先简单看一下我们
目前掌握的信息:

fd 10

fd代表文件描述符(File Descriptor)。如果您感兴趣,可以使用 `ps aux`lsof工具查看 macOS 上进程使用的所有描述符lsof -p <process id>。我们可以使用pgrep它来获取启动服务器的会话的进程 ID(通常称为 pid)irb。在我的机器上只有一个结果,即 86096。但如果您irb运行了多个会话,可以使用 `ps aux` 获取有关特定进程的更多信息,例如其启动时间戳。这是我的最后一行输出,我们可以看到有一个打开的文件,其
FD 值为 10。

[...]
ruby    86096 pierre   10u   IPv6 0x80d76e9380eb855d      0t0     TCP *:callbook (LISTEN)
Enter fullscreen mode Exit fullscreen mode

2000

这是端口值,我们将其作为参数传递给了构造函数。

AF_INET6&::

我不太确定中间的两个值AF_INET6&是什么,所以我首先尝试 通过查看实例上该方法的来源::来弄清楚这个字符串是如何构建的
inspectTCPServer

irb(main):001:0> TCPServer.new(2000).method(:inspect)
=> #<Method: TCPServer(IPSocket)#inspect()>
Enter fullscreen mode Exit fullscreen mode

这很合理,TCPServer它继承自 `<Object>` TCPSocket,而 `<Object>` 本身又继承自 ` <Object>` IPSocket。` inspect<Object>`IPSocket中的方法被定义为一个 C 函数ipsocket.c。看起来这些值来自 ` addr<Object>` 函数,根据Ruby 文档,该函数“返回包含 `address_family`、`port`、
`hostname` 和 `numeric_address` 的本地地址数组”。稍加搜索便AF_INET6 证实了这一点。`address_family`AF代表地址族,`INET` 代表互联网协议版本 4,`INET6` 代表互联网协议版本 6。`<object>`对 IPv6 地址::特殊含义,相当于 IPv4 地址的 0.0.0.0。


回到我们的服务器,如果您仍然打开着一个终端(之前您TCPServer.new 2000在 shell 中运行了命令irb),现在打开一个
新的终端并运行nc localhost 2000`.` nc,这是该netcat实用程序的命令行界面,根据其页面描述,它用于
man:[...]几乎所有涉及 TCP 或 UDP 的操作”。`.`telnet是另一个类似的工具,但它
默认情况下并未与 macOS 捆绑在一起,不过,它只需一个brew install终端即可获得。

运行程序nc localhost 2000时应该会“卡住”,没有任何反应,你也无法在 shell 中输入更多命令。你可以
使用 Ctrl-C 退出。你可以通过尝试使用一个未使用的端口(
例如 2001)来确认程序是否确实执行了某些操作,程序应该会立即返回,并返回退出代码 1,即错误。如果使用 2001 端口也出现卡住的情况,
可能是因为你的机器上已经有其他程序正在占用 2001 端口。

nc有一个-w标志位可以指定超时时间,您可以使用 `--timeout` 命令查看所有可用标志位nc -h。您可以自行尝试,
运行相同的命令,并设置 `--timeout` 的值-w。运行该命令nc -w 5 localhost 2000将等待 5 秒,然后以
状态码 0 退出。

无论在何种环境下,我认为在处理
诸如 HTTP 请求之类的网络调用时,设置超时时间都是最佳实践。虽然服务器不太可能像我们目前
遇到的情况那样,接受请求后却完全不做任何处理,但
如果例如由于流量激增导致服务器无响应,那么服务器可能需要很长时间才能返回响应。在
这种情况下,客户端在超时后中止请求可能比继续等待更有利。

引用维基百科页面

每个进程终止时都会返回一个退出状态,以整数形式表示。0 表示成功,其他所有值均表示成功。

有些终端会显示退出状态,有些终端只在非零代码时显示,但你始终可以使用命令echo $?来查看最后一个命令的状态。运行命令ls后跟echo $?应该会输出0

如果你看过 C 代码,就会知道main函数签名是 `return` 的,int main()因为除非显式main调用 `return`,否则 `return` 返回的值会被用作退出状态exit
所以这就是为什么你可能会在 C 程序的函数return 0末尾看到main`return` 的原因,它表示“返回成功”。使用 `return`exit(EXIT_SUCCESS)或 `return`似乎也很常见exit(EXIT_FAILURE)

有些代码有特殊含义,例如 127 表示“命令未找到”。

退出状态并非POSIX系统特有的,正如维基百科上所记载的那样。

让我们通过传递详细模式标志来深入了解一下-vnc -v localhost 2000

found 0 associations
found 1 connections:
     1: flags=82<CONNECTED,PREFERRED>
        outif lo0
        src 127.0.0.1 port 53022
        dst 127.0.0.1 port 2000
        rank info not available
        TCP aux info available

Connection to localhost port 2000 [tcp/callbook] succeeded!
Enter fullscreen mode Exit fullscreen mode

所以它确实连接上了,但是什么也没做,因为我们的服务器刚刚启动,但是没有指示它对
传入的连接执行任何操作。

loop在 Ruby 中,`if` 会启动一个无限循环,这没什么特别的,服务器启动后永不停止是很常见的情况。想想
Redis,一旦服务器运行起来,我们希望它永远运行下去,除非被告知停止,否则我们不希望它停止
。对于这样的用例来说,无限循环是完全合理的。

下一行真的很有意思,可能是这段代码中最有趣的一行:server.accept
我们先回到这里irb,因为这样更容易逐一尝试这些方法。

irb(main):001:0> require 'socket'
=> true
irb(main):002:0> s = TCPServer.new 2000
irb(main):003:0> s.accept
Enter fullscreen mode Exit fullscreen mode

accept方法没有返回值。令人遗憾的是,文档非常简略:

接受传入的连接。它返回一个新的 TCPSocket 对象。

它没有告诉我们的是,这实际上是一个“阻塞”方法。从技术上讲,我们可以从
另一个方法的存在推断出这一点TCPServeraccept_nonblock根据文档,该方法是:

在底层文件描述符设置为 O_NONBLOCK 后,使用 accept(2) 接受传入连接。它返回
一个已接受的 TCPSocket 连接。

我们将在后面的章节中更详细地探讨这个问题accept_nonblock。让我们回到第二个 shell,注意,如果您关闭第一个shell,挂起的状态就会结束。这是因为服务器关闭了连接, 客户端也随之停止了。让我们重新运行相同的命令。我们会看到与上面非常相似的输出 ,表明连接成功。 主要区别在于,挂起的调用现在返回了,并返回了一个实例。让我们使用获取 此套接字的引用是指向中返回的最后一个值的引用),并向 我们的客户端发送一些内容:。如果您回到运行的终端,您应该会看到“Hey!”。现在让我们 使用关闭连接,我们可以观察到调用返回了,退出代码为 0,即 成功。nc
localhost 2000
irb
ncnc localhost 2000

acceptirbTCPSocket
socket = __irb
socket.puts "Hey!"nc
socket.closenc

好了,我们已经了解了官方示例TCPServer。该示例在 2000 端口启动一个 TCP 服务器,然后
进入无限循环。它首先等待传入的连接,直到客户端连接后才会执行其他操作。
一旦客户端连接,它首先写入“Hello!”,然后写入当前时间,最后关闭连接,并
重新开始。

如果你还记得,一开始我们简要地看一下文档中的第二个示例,它使用了 ` Thread.start.`。这个示例的优点在于它能够服务多个客户端,因此更易于使用。但
第一个示例确实存在一个主要问题:它一次只能服务一个客户端。如果你把这个
示例放在一个文件中,比如 `.`,server.rb然后用 `.` 运行它ruby server.rb,它会启动一个进程、一个线程,然后开始
执行循环。
第二个示例改进了这种情况,它将阻塞操作的结果传递server.accept给一个新
线程,让新线程来处理客户端。

这里有几点需要注意。首先,Ruby 不支持惰性求值,所以当我们写 `.` 时
Thread.start(server.accept),需要先对参数进行求值,然后才会start调用 `.`,并将
求值结果传递给 `.`。
在我们的例子中,这意味着循环会先启动,然后阻塞直到客户端连接,一旦客户端
连接成功,生成的套接字就会传递给 ` Thread.start.`。

我们举个例子来说明,通过添加一个sleep调用来模拟一个运行缓慢的服务器。仍然在当前状态下irb,运行
以下命令:

loop { socket = server.accept; socket.write "Hello"; sleep 5; socket.close }
Enter fullscreen mode Exit fullscreen mode

这与第一个例子几乎完全相同,区别在于主线程在关闭
套接字之前会休眠五秒钟。回到另一个终端,nc localhost 2000再次运行该命令,你会看到“Hello”几乎
立即被打印出来,然后进程会挂起五秒钟,最后退出。在进程挂起期间,打开另一个终端并
运行相同的命令,nc localhost 2000如果你立即看到“Hello”,可能是因为之前的终端中五秒钟已经过去了
。你可以使用irbCtrl-C 结束无限循环,并将该值增加到 10
秒或更长,并进行实验。

这表明,当服务器忙于处理某个客户端请求时,即使它什么也不做,只是
进入睡眠状态,所有其他传入的客户端实际上都在等待被服务。第二个例子对此进行了改进,如果您运行以下命令
即可看到:irb

loop {
  Thread.start(server.accept) { |socket|
    socket.write "Hello"
    sleep 5
    socket.close
  }
}
Enter fullscreen mode Exit fullscreen mode

运行相同的手动测试,我们应该看到两个nc localhost 2000调用都收到了“Hello”响应,然后它们
都等待五秒钟,直到服务器关闭套接字。
这是因为每个客户端都由不同的线程处理。第一个客户端会触发第一个
server.accept调用返回,从而启动第二个线程(之所以是第二个线程,是因为初始程序
本身也运行在一个线程中)。当第二个线程处于休眠状态时,主线程不再被阻塞,它将
套接字传递给了第二个线程,并重新阻塞server.accept。当第二个客户端连接时,
同样的事情再次发生,启动了一个新线程,并将新创建的套接字传递给它。

我们有一个可以同时服务多个客户端的服务器,这很棒。但是,
这种方法存在一个很大的问题。线程数量有限,如果我们创建越来越多的线程,可能会导致
整个系统运行速度变慢。使用多线程需要非常谨慎,因为它很容易导致
竞态条件。正如前面提到的,并发和并行都是非常复杂的主题,我们将
在后续章节中详细讲解。

结论

现在我们知道如何运行一个基本的 TCP 服务器,它可以向客户端写入字符串,然后关闭连接,仅此
而已。话虽如此,正如我们将在后续章节中看到的,我们可以利用它做很多事情。我们也探讨了单线程方法的局限性,虽然我们可以使用线程来改善这种情况, 但为了保持示例的简洁性并逐步改进,
我们暂时不这样做。

下一章我们将创建一个服务器类,使其能够读取客户端的输入,并响应客户端的请求和请求GETSET但仅限于
最基本的形式,我们不会实现诸如 TTL 或其他选项之类的功能。

附录 - 服务器的 AC 实现

我很好奇底层实现是什么样的,所以我尝试用 C 语言写了一个
服务器示例,该服务器监听 2000 端口,并在客户端连接时将数据写回给客户端。以下是代码。

注意:如果您的机器上没有gcc安装,您可以使用以下命令在 macOS 上安装:
xcode-select --install,如果您使用的是其他操作系统,请自行搜索,这应该是一个相当常见的问题,
在 stackoverflow 等网站上有很多答案。

我在这里分享的代码是
GeeksforGeeks 网站上提供的客户端/服务器代码的简化版本:

服务器:

#include <stdio.h> // For printf
#include <netdb.h> // For bind, listen, AF_INET, SOCK_STREAM, socklen_t, sockaddr_in, INADDR_ANY
#include <stdlib.h> // For exit
#include <string.h> // For bzero
#include <unistd.h> // For close & write
#include <errno.h> // For errno, duh!
#include <arpa/inet.h> // For inet_ntop

#define MAX 80
#define PORT 2000
#define SA struct sockaddr

int main()
{
    socklen_t client_address_length;
    int server_socket_file_descriptor, client_socket_file_descriptor;
    struct sockaddr_in server_address, client_address;

    // socket create and verification
    server_socket_file_descriptor = socket(AF_INET, SOCK_STREAM, 0);
    if (server_socket_file_descriptor == -1) {
        printf("socket creation failed...\n");
        exit(0);
    }
    else {
        printf("Socket successfully created..\n");
    }
    bzero(&server_address, sizeof(server_address));

    // assign IP, PORT
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(PORT);

    // Binding newly created socket to given IP and verification
    if ((bind(server_socket_file_descriptor, (SA*)&server_address, sizeof(server_address))) != 0) {
        printf("socket bind failed... : %d, %d\n", server_socket_file_descriptor, errno);
        exit(0);
    }
    else {
        printf("Socket successfully binded..\n");
    }

    // Now server is ready to listen and verification
    if ((listen(server_socket_file_descriptor, 5)) != 0) {
        printf("Listen failed...\n");
        exit(0);
    }
    else {
        printf("Server listening..\n");
    }
    client_address_length = sizeof(client_address);

    // Accept the data packet from client and verification
    client_socket_file_descriptor = accept(server_socket_file_descriptor, (SA*)&client_address, &client_address_length);
    if (client_socket_file_descriptor < 0) {
        printf("server acccept failed: %d,%d...\n", client_socket_file_descriptor, errno);
        exit(0);
    }
    else {
        printf("server acccept the client...\n");
        char human_readable_address[INET_ADDRSTRLEN];
        inet_ntop(AF_INET, &client_address.sin_addr, human_readable_address, sizeof(human_readable_address));
        printf("Client address: %s\n", human_readable_address);
    }

    char message_buffer[MAX];
    read(client_socket_file_descriptor, message_buffer, sizeof(message_buffer));
    printf("From Client: %s\n", message_buffer);
    bzero(message_buffer, MAX);

    strcpy(message_buffer, "Hello, this is Server!");
    write(client_socket_file_descriptor, message_buffer, sizeof(message_buffer));

    // After chatting close the socket
    printf("Closing server_socket_file_descriptor\n");
    close(server_socket_file_descriptor);
}
Enter fullscreen mode Exit fullscreen mode

客户:

#include <stdio.h> // For printf
#include <netdb.h> // For AF_INET, SOCK_STREAM, sockaddr_in
#include <stdlib.h> // For exit
#include <string.h> // For bzero
#include <sys/socket.h> // For connect
#include <arpa/inet.h> // For inet_addr
#include <unistd.h> // for close

#define MAX 80
#define PORT 2000
#define SA struct sockaddr

int main() {
    int server_socket_file_descriptor;
    struct sockaddr_in server_address;

    // socket create and varification
    server_socket_file_descriptor = socket(AF_INET, SOCK_STREAM, 0);
    if (server_socket_file_descriptor == -1) {
        printf("socket creation failed...\n");
        exit(0);
    }
    else {
        printf("Socket successfully created..\n");
    }

    bzero(&server_address, sizeof(server_address));

    // assign IP, PORT
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = inet_addr("127.0.0.1");
    server_address.sin_port = htons(PORT);

    // connect the client socket to server socket
    if (connect(server_socket_file_descriptor, (SA*)&server_address, sizeof(server_address)) != 0) {
        printf("connection with the server failed...\n");
        exit(0);
    }
    else {
        printf("connected to the server..\n");
    }

    char message_buffer[MAX] = "Hello, this is Client";
    write(server_socket_file_descriptor, message_buffer, sizeof(message_buffer));
    bzero(message_buffer, sizeof(message_buffer));
    read(server_socket_file_descriptor, message_buffer, sizeof(message_buffer));
    printf("From Server: %s", message_buffer);

    // close the socket
    close(server_socket_file_descriptor);
}
Enter fullscreen mode Exit fullscreen mode

和往常一样,我们来运行一些手动测试,但由于这是 C 语言代码,我们首先需要编译它:

$ gcc server.c -o server
$ gcc client.c -o client
Enter fullscreen mode Exit fullscreen mode

我们需要两个 shell,在第一个 shell 中启动服务器./server。它应该会记录以下内容:

Socket successfully created..
Socket successfully binded..
Server listening..
Enter fullscreen mode Exit fullscreen mode

请注意,正如我们之前在 Ruby 创建服务器时看到的那样,它处于“挂起”状态,尚未返回。在另一个
shell 中运行客户端:./client,您应该看到以下输出:

$ ./client
Socket successfully created..
connected to the server..
From Server: Hello, this is Server!
Enter fullscreen mode Exit fullscreen mode

如果你返回到服务器运行所在的另一个 shell,你会看到它现在已经返回,并且
在返回之前还记录了一些其他信息:

server acccept the client...
Client address: 127.0.0.1
From Client: Hello, this is Client
Closing server_socket_file_descriptor
Enter fullscreen mode Exit fullscreen mode

它奏效了!服务器会等待客户端连接,读取客户端发送的内容并写回消息,完成所有这些操作后,服务器就会退出。

逐一讲解客户端和服务端代码有点超出本文范围,而且说实话,我目前也确实没有能力做到这一点。话虽如此,我认为将一个类似的实现可视化,以便大致了解 Ruby 在底层是如何工作的,会很有意思。

文章来源:https://dev.to/pjam/chapter-1-a-basic-tcp-server-4ipe