了解反应器并构建一个迷你 Sidekiq
什么是Ractor?
让我们来打造一个迷你版的Sidekiq吧!
结论
在本文中,您将了解更多关于 Ractor 的信息,以及如何使用它们来构建您自己的sidekiq(一个 Ruby 的后台处理框架)克隆版本。
什么是Ractor?
Ruby 3.0 引入了Reactor类。这是 Ruby 中类似 Actor 的并发抽象层,其目标是在不考虑线程安全问题的前提下,提供 Ruby 的并行执行特性。
根据维基百科:
计算机科学中的 Actor 模型是一种并发计算的数学模型,它将 Actor 视为并发计算的通用原语。
演员能够:
- 创造更多演员
- 接收消息
- 发送消息
- 采取本地化决策
请注意,Ractor 的实现尚不稳定,请勿将其用于生产代码。如果您仍然需要一些理由来决定是否(暂时)在生产环境中使用它,请查看使用时显示的警告:
警告:Ractor 仍处于实验阶段,其行为可能会在未来的 Ruby 版本中发生变化!此外,它还存在许多实现问题。
创建一台拖拉机
创建 ractor 非常简单,只需使用Ractor.new:
ractor = Ractor.new { puts 'Hello Ractor!' }
收到消息
接收消息有两种方式,取决于您是否拥有发送消息的拖拉机的参考信息。
Ractor.receive如果您不知道是谁发送的消息,请使用此方法:
message = Ractor.receive
Ractor#take如果您有发送消息的拖拉机的参考信息,请使用该参考信息:
message = ractor.take
由于这些方法调用会阻塞,直到收到消息为止,因此请避免期望从永远不会发送消息的 ractor 收到消息,否则您的程序将永远卡住。
另外,请注意,您发送的对象必须是可共享的。
发送消息
如果您知道收件人是谁:
ractor.send(message)
# or
ractor << message # `<<` is an alias to `send`
如果你不这样做:
Ractor.yield(message)
Ractor.yield也会被屏蔽,直到有演员收到你的消息为止。
采取地方性决策
在传递给 `Ractor::TVar` 的代码块中,你可以执行几乎任何操作,除非需要访问共享对象Ractor.new。如果你尝试使用在代码块外部定义的变量,将会引发异常。例如,为了在多个 Ractor 之间共享变量,你可以使用 ` Ractor::TVar` 这个 gem。
我稍后会用到的另一个有趣的方法是这样Ractor.select(*actors)的。它接受多个 ractor 作为输入,并返回第一个发送数据的 ractor 及其输出:
slow_ractor = Ractor.new { sleep 2; Ractor.yield(:too_late) }
fast_ractor = Ractor.new { Ractor.yield(:fast) }
ractor, output = Ractor.select(slow_ractor, fast_ractor)
# output == :fast && ractor == fast_ractor
让我们来打造一个迷你版的Sidekiq吧!
请注意,这个简易的概念验证版本仅允许您使用 10 个 ractor 池来实现作业的并行执行。它无法处理错误流控制、统计信息、队列以及其他所有使 Sidekiq 成为一个超级实用项目的功能。
我们将构建一个简单的设计:
- A
WorkerPool,负责管理我们的拖拉机队 - 一个
Job基类,我们所有的专用作业都将继承自它。
目标是让人们能够轻松地实现自己的任务,而无需处理所有的池逻辑。
工人池
class WorkerPool
attr_reader :ractors
def initialize
@ractors = 10.times.map { spawn_worker }
end
def spawn_worker
Ractor.new do
Ractor.yield(:ready)
loop { Ractor.yield Job.run(Ractor.receive) }
end
end
def self.run(parameters)
ractor, _ignored_result =
Ractor.select(*(@instance ||= new).ractors)
ractor << parameters
end
end
这里有个技巧。使用时Ractor.yield(:ready),我们只是确保池中的 ractors 有东西要发送,以便初始操作能够Ractor.select正常工作(记住,它是阻塞的)。
职业基础类
class Job
def self.process(*args)
WorkerPool.run({ class: self, args: args })
end
def self.run(hash)
case hash
in { class: klass, args: args }
klass.new.process(*args)
end
end
end
请注意,你提供的任何论据都必须是可共享的。
执行一项具体工作
假设我们想要创建一个异步作业来打印一些内容:
class PrintJob < Job
def process(message)
puts message
end
end
现在异步使用它非常简单:
PrintJob.process('Hello World!')
结论
Ractor 为 Ruby 中的并行执行引入了一种新颖而有趣的模型。
如果您对拖拉机这个话题感兴趣,我建议您看看以下这些有趣的资源:
- https://github.com/ruby/ruby/blob/master/doc/ractor.md
- https://docs.ruby-lang.org/en/3.0.0/Ractor.html
如果您喜欢这篇文章,请查看我们精彩的每周技术简报。我们会定期分享顶尖的 Ruby 和 JavaScript 内容!
照片由马克·汤普森拍摄
文章来源:https://dev.to/doctolib/learn-about-racctors-and-build-a-mini-sidekiq-3ba2