发布于 2026-01-06 0 阅读
0

现代编程语言中的并发性:Rust

现代编程语言中的并发性:Rust

原文发表于deepu.tech

这是一个系列文章,我将探讨现代编程语言中的并发性,并基于Rust 书籍中的示例,使用 Rust、Go、JavaScript (NodeJS)、TypeScript (Deno)、Kotlin 和 Java 等流行语言构建并测试一个并发 Web 服务器,以比较这些语言/平台之间的并发性和性能差异。本系列文章的章节如下,我将尽量每周更新。

  1. 介绍
  2. Rust 中的并发 Web 服务器
  3. 用 Golang 实现并发 Web 服务器
  4. 使用 NodeJS 的 JavaScript 并发 Web 服务器
  5. 使用 Deno 的 TypeScript 并发 Web 服务器
  6. 使用 JVM 的 Java 并发 Web 服务器
  7. 基准测试的比较与结论

Rust 中的并发

安全高效地处理并发编程是 Rust 的另一个主要目标。

-- Rust 文档

高效且内存安全的并发是 Rust 的主要目标之一,这并非空谈。Rust 语言为并发编程提供了强大的特性,结合其一流的内存安全模型,使其成为并发应用场景的理想选择。Rust 的核心理念是让用户在前期(编译时)花费更多时间修复问题,而不是在生产环境(运行时)中耗费时间。因此,对于 Rust 新手来说,这看似意味着需要花费更多时间编写代码,但实际上,它能避免许多在内存安全性欠佳的语言中常见的问题,从而在后期节省大量精力。Rust 团队将这种特性称为“无畏并发”

与 Rust 中的其他所有事物一样,其理念是花费更多的时间在前期(即编译时)修复问题,而不是在生产环境中(即运行时)修复问题。

还有其他语言,例如 Go,它们提供了更简单且性能相当的并发解决方案,但由于 Rust 的灵活性,它们的功能不如 Rust 强大。Rust 基本上提供了并发、并行和异步编程所需的构建模块,您可以根据需要扩展或实现不同的解决方案,也可以使用社区提供的解决方案。这使得您可以针对具体用例选择最佳解决方案,而不是一味地使用同一种方案。

Rust 既支持多线程并发或并行编程,也支持异步编程。这意味着,正如我们在上一章中看到的,我们可以混合搭配这些模型,从而针对任何用例获得最佳性能。

多线程

Rust 标准库提供了用于创建和管理操作系统线程的构建模块,并提供了使用通道(类似于 Go)实现消息传递并发以及使用互斥锁和智能指针实现共享状态并发所需的实现。Rust 的类型系统和所有权模型有助于避免常见的并发问题,例如数据竞争、锁等。

异步处理

严格来说,异步编程并非并发编程的一部分,但在实践中,它在许多应用场景中与并发编程相辅相成,能够提升性能并提高资源利用效率。最新版本的 Rust 提供了异步编程所需的构建模块和语言特性,并内置了async/.await相应的语法。但需要注意的是,使用异步编程模型会增加整体复杂性,而且该生态系统仍在不断发展。虽然 Rust 提供了所需的语言特性,但标准库并未提供任何必要的实现,因此您需要使用外部 crate 才能Futures有效地使用异步编程模型。

基准测试

现在我们已经对 Rust 的并发特性有了一些基本的了解,接下来让我们用 Rust 构建一个简单的并发 Web 服务器。由于 Rust 提供了多种实现方式,我们将构建三个示例应用程序并进行比较。本文使用的 Rust 版本为撰写本文时的最新版本 (1.48.0)。

多线程并发 Web 服务器

这个例子更接近Rust官方书籍中的例子,为了简洁起见,我省略了 import 语句。你可以在GitHub 上找到完整的例子(链接在此)ThreadPool结构体与 Rust 书籍中的完全相同。本例中我们没有使用任何外部依赖。

fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); // bind listener
    let pool = ThreadPool::new(100); // same number as max concurrent requests

    let mut count = 0; // count used to introduce delays

    // listen to all incoming request streams
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        count = count + 1;
        pool.execute(move || {
            handle_connection(stream, count); // spawning each connection in a new thread
        });
    }
}

fn handle_connection(mut stream: TcpStream, count: i64) {
    // Read the first 1024 bytes of data from the stream
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    // add 2 second delay to every 10th request
    if (count % 10) == 0 {
        println!("Adding delay. Count: {}", count);
        thread::sleep(Duration::from_secs(2));
    }

    let header = "
HTTP/1.0 200 OK
Connection: keep-alive
Content-Length: 174
Content-Type: text/html; charset=utf-8
    ";
    let contents = fs::read_to_string("hello.html").unwrap();

    let response = format!("{}\r\n\r\n{}", header, contents);

    stream.write(response.as_bytes()).unwrap(); // write response
    stream.flush().unwrap();
}
Enter fullscreen mode Exit fullscreen mode

如您所见,我们将 TCP 监听器绑定到 8080 端口,并监听所有传入的请求。每个请求都在一个新线程中处理ThreadPool

让我们使用 ApacheBench 进行基准测试。我们将发出 10000 个请求,并发请求数为 100 个。

ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Document Path:          /
Document Length:        176 bytes

Concurrency Level:      100
Time taken for tests:   20.173 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      2830000 bytes
HTML transferred:       1760000 bytes
Requests per second:    495.72 [#/sec] (mean)
Time per request:       201.726 [ms] (mean)
Time per request:       2.017 [ms] (mean, across all concurrent requests)
Transfer rate:          137.00 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.9      0       7
Processing:     0  201 600.0      0    2014
Waiting:        0  200 600.0      0    2013
Total:          0  201 600.0      0    2017

Percentage of the requests served within a certain time (ms)
  50%      0
  66%      1
  75%      1
  80%      3
  90%   2000
  95%   2001
  98%   2001
  99%   2002
 100%   2017 (longest request)
Enter fullscreen mode Exit fullscreen mode

正如你所见,请求处理程序每​​处理 10 个请求就会休眠 2 秒。因此,如果我们设置一个实际可行的线程池数量(例如 8),它将限制我们(8 x 10) / 2 = 40每秒最多可以处理 100 个请求。所以,我们在这里设置线程池为 100,以匹配最大并发请求数。设置更高的值不会有任何区别。我想你已经看出问题所在了。线程池本身成为了瓶颈。在实际应用中,你可能无法设置这么多线程,因为操作系统可能无法提供这么多线程,从而导致资源使用量增加和瓶颈。在这个简单的用例中,由于每个线程都能非常快速地生成和处理请求,因此我们不会遇到问题。

那么,让我们看看能否找到另一种没有这种瓶颈的解决方案。

异步并发 Web 服务器

这个例子更接近Rust 异步文档中的示例,为了简洁起见,我省略了 import 语句。你可以在GitHub 上找到完整的示例(链接在此) 。`<async> TcpListener`、TcpStream`<async>` 和 ` task<async>` 来自 ` async_stdcrate`,并且async-std是本例中使用的唯一外部依赖项。

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); // bind listener
    let mut count = 0; // count used to introduce delays

    loop {
        count = count + 1;
        // Listen for an incoming connection.
        let (stream, _) = listener.accept().await.unwrap();
        // spawn a new task to handle the connection
        task::spawn(handle_connection(stream, count));
    }
}

async fn handle_connection(mut stream: TcpStream, count: i64) {
    // Read the first 1024 bytes of data from the stream
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    // add 2 second delay to every 10th request
    if (count % 10) == 0 {
        println!("Adding delay. Count: {}", count);
        task::sleep(Duration::from_secs(2)).await;
    }

    let header = "
HTTP/1.0 200 OK
Connection: keep-alive
Content-Length: 174
Content-Type: text/html; charset=utf-8
    ";
    let contents = fs::read_to_string("hello.html").unwrap();

    let response = format!("{}\r\n\r\n{}", header, contents);

    stream.write(response.as_bytes()).await.unwrap(); // write response
    stream.flush().await.unwrap();
}
Enter fullscreen mode Exit fullscreen mode

如您所见,我们将一个异步 TCP 监听器绑定到 8080 端口,并监听所有传入的请求。每个请求都在一个新任务中处理async_std。我们这里没有使用任何线程池,所有传入的请求都是异步处理的,因此不存在连接数瓶颈。

让我们使用 ApacheBench 进行基准测试。我们将发出 10000 个请求,并发请求数为 100 个。

ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Document Path:          /
Document Length:        176 bytes

Concurrency Level:      100
Time taken for tests:   20.186 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      2830000 bytes
HTML transferred:       1760000 bytes
Requests per second:    495.38 [#/sec] (mean)
Time per request:       201.863 [ms] (mean)
Time per request:       2.019 [ms] (mean, across all concurrent requests)
Transfer rate:          136.91 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   0.8      0       6
Processing:     0  201 600.0      0    2010
Waiting:        0  201 600.0      0    2010
Total:          0  201 600.0      1    2014
WARNING: The median and mean for the initial connection time are not within a normal deviation
        These results are probably not that reliable.

Percentage of the requests served within a certain time (ms)
  50%      1
  66%      1
  75%      2
  80%      3
  90%   2000
  95%   2001
  98%   2001
  99%   2003
 100%   2014 (longest request)
Enter fullscreen mode Exit fullscreen mode

我们在这里得到了几乎相同的结果。因此,对于这个特定的用例,这个版本似乎比多线程版本效率更高。可以使用其他 crate 构建类似的解决方案,例如 `<crate_name>` smolhyper`<crate_name> tokio`、`<crate_name>` 等等。你可以在这个仓库中找到其中一些。

我们来看看能否将两者结合起来,创建一个异步多线程版本。

异步多线程并发 Web 服务器

本示例使用了异步操作ThreadPool。为了简洁起见,我省略了导入语句。您可以在GitHub 上找到完整的示例(链接在此)。该ThreadPool结构体来自futurescrate,也是本示例中使用的唯一外部依赖项。

fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); // bind listener

    let mut pool_builder = ThreadPoolBuilder::new();
    pool_builder.pool_size(100);
    let pool = pool_builder.create().expect("couldn't create threadpool");
    let mut count = 0; // count used to introduce delays

    // Listen for an incoming connection.
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        count = count + 1;
        let count_n = Box::new(count);

        // spawning each connection in a new thread asynchronously
        pool.spawn_ok(async {
            handle_connection(stream, count_n).await;
        });
    }
}

async fn handle_connection(mut stream: TcpStream, count: Box<i64>) {
    // Read the first 1024 bytes of data from the stream
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    // add 2 second delay to every 10th request
    if (*count % 10) == 0 {
        println!("Adding delay. Count: {}", count);
        thread::sleep(Duration::from_secs(2));
    }

    let header = "
    HTTP/1.0 200 OK
    Connection: keep-alive
    Content-Length: 174
    Content-Type: text/html; charset=utf-8
        ";

    let contents = fs::read_to_string("hello.html").unwrap();

    let response = format!("{}\r\n\r\n{}", header, contents);

    stream.write(response.as_bytes()).unwrap(); // write response
    stream.flush().unwrap();
}
Enter fullscreen mode Exit fullscreen mode

这与第一个Threadpool例子非常相似,区别在于异步调用。遗憾的是,在这种情况下,我们也遇到了线程池瓶颈,因此我们将线程池大小设置为 100,以匹配最大并发请求数。

让我们使用 ApacheBench 进行基准测试。我们将发出 10000 个请求,并发请求数为 100 个。

ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Document Path:          /
Document Length:        176 bytes

Concurrency Level:      100
Time taken for tests:   20.161 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      3030000 bytes
HTML transferred:       1760000 bytes
Requests per second:    496.00 [#/sec] (mean)
Time per request:       201.615 [ms] (mean)
Time per request:       2.016 [ms] (mean, across all concurrent requests)
Transfer rate:          146.76 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.8      0       5
Processing:     0  201 600.0      0    2007
Waiting:        0  200 600.0      0    2007
Total:          0  201 600.0      0    2010

Percentage of the requests served within a certain time (ms)
  50%      0
  66%      1
  75%      2
  80%      2
  90%   2000
  95%   2000
  98%   2001
  99%   2002
 100%   2010 (longest request)
Enter fullscreen mode Exit fullscreen mode

与之前的解决方案相比,它似乎速度略快几毫秒。

使用 Tokio 的异步多线程并发 Web 服务器

这是使用Tokio实现的另一个异步多线程 Web 服务器版本,由Remco Bloemen贡献。为了简洁起见,我省略了导入语句。您可以在GitHub 上找到完整的示例(链接在此)

#[tokio::main()] // Tokio uses a threadpool sized for number of cpus by default
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();  // bind listener
    let mut count = 0; // count used to introduce delays

    // Listen for an incoming connection.
    loop {
        count = count + 1;
        let (socket, _) = listener.accept().await.unwrap();
        // spawning each connection in a new tokio thread asynchronously
        tokio::spawn(async move { handle_connection(socket, Box::new(count)).await });
    }
}

async fn handle_connection(mut stream: TcpStream, count: Box<i64>) {
    // Read the first 1024 bytes of data from the stream
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    // add 2 second delay to every 10th request
    if (*count % 10) == 0 {
        println!("Adding delay. Count: {}", count);
        sleep(Duration::from_secs(2)).await;
    }

    let header = "
    HTTP/1.0 200 OK
    Connection: keep-alive
    Content-Length: 174
    Content-Type: text/html; charset=utf-8
        ";

    let contents = read_to_string("hello.html").await.unwrap();

    let response = format!("{}\r\n\r\n{}", header, contents);

    stream.write_all(response.as_bytes()).await.unwrap(); // write response
}
Enter fullscreen mode Exit fullscreen mode

这与之前的例子非常相似,但使用的线程池数量更少,并且采用了异步调用。在这种情况下,我们不会遇到之前线程池示例中的瓶颈问题。

让我们使用 ApacheBench 进行基准测试。我们将发出 10000 个请求,并发请求数为 100 个。

ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Document Path:          /
Document Length:        176 bytes

Concurrency Level:      100
Time taken for tests:   20.569 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      3030000 bytes
HTML transferred:       1760000 bytes
Requests per second:    486.17 [#/sec] (mean)
Time per request:       205.688 [ms] (mean)
Time per request:       2.057 [ms] (mean, across all concurrent requests)
Transfer rate:          143.86 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   2.4      0      22
Processing:     0  202 600.3      1    2013
Waiting:        0  202 600.3      1    2012
Total:          0  203 600.3      2    2029

Percentage of the requests served within a certain time (ms)
  50%      2
  66%      3
  75%      5
  80%      7
  90%   2000
  95%   2003
  98%   2006
  99%   2008
 100%   2029 (longest request)
Enter fullscreen mode Exit fullscreen mode

与之前的解决方案相比,速度似乎稍慢了几毫秒。

结论

正如我在本系列文章的第一部分中所解释的,这种简单的基准测试并不能准确代表所有并发用例。它只是一个针对特定用例的简单测试,即一个简单的并发 Web 服务器,它只负责提供文件服务。其目的是为了比较不同解决方案之间的差异,并理解 Rust 中并发的工作原理。而对于这个特定的用例,异步解决方案似乎是最佳选择。

敬请期待下一篇文章,我们将探讨 Golang 中的并发性,并在 Go 中构建相同的用例。


参考


如果您喜欢这篇文章,请点赞或留言。

你可以在TwitterLinkedIn上关注我。

封面图片来源: Jacob Mejicanos拍摄,来自Unsplash

文章来源:https://dev.to/deepu105/concurrency-in-modern-programming-languages-rust-19co