边学边做,用 Ruby 构建一个后台处理系统
在今天的文章中,我们将实现一个简单的后台处理系统,纯粹是为了好玩!在这个过程中,我们或许还能了解一些关于像Sidekiq这样流行的后台处理系统的内部原理。需要注意的是,这个系统并非用于生产环境。
假设我们的应用程序中有一个任务,需要加载一个或多个网站并提取它们的标题。由于我们无法控制这些网站的性能,我们希望在主线程(或者如果是 Web 应用程序,则在当前请求之外)后台执行此任务。
封装任务
在深入后台处理之前,我们先来构建一个服务对象来执行当前任务。我们将使用OpenURI和Nokogiri来提取 title 标签的内容。
require 'open-uri'
require 'nokogiri'
class TitleExtractorService
def call(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
调用该服务会打印给定 URL 的标题。
TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
这部分功能运行正常,但我们不妨稍微改进一下语法,让它看起来更像其他后台处理系统。通过创建一个Magique::Worker模块,我们可以为服务对象添加一些语法糖。
module Magique
module Worker
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def perform_now(*args)
new.perform(*args)
end
end
def perform(*)
raise NotImplementedError
end
end
end
该模块perform向 worker 实例添加了一个方法,perform_now向 worker 类添加了一个方法,以改进调用。
让我们把这个模块包含到我们的服务对象中。顺便一提,我们也把它重命名为,TitleExtractorWorker并将call方法更改为perform。
class TitleExtractorWorker
include Magique::Worker
def perform(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
调用结果仍然相同,但现在情况更清楚了。
TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
实现异步处理
现在标题提取功能已经实现,我们可以获取所有以往 Ruby Magic 文章的标题。为此,假设我们有一个RUBYMAGIC常量,其中包含所有以往文章的 URL 列表。
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_now(url)
end
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
我们可以获取以往文章的标题,但提取所有标题需要一些时间。这是因为我们需要等待每个请求完成后才能进行下一个请求。
让我们通过perform_async在工作模块中引入一个方法来改进这一点。为了提高速度,它会为每个 URL 创建一个新线程。
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Thread.new { new.perform(*args) }
end
end
end
end
修改调用方式后TitleExtractorWorker.perform_async(url),我们几乎可以立即获取所有标题。然而,这也意味着我们会同时打开超过 20 个与 Ruby Magic 博客的连接。(抱歉打扰到大家的博客了! 😅)
如果您正在按照自己的实现进行操作,并在长时间运行的进程(例如 Web 服务器)之外进行测试,请不要忘记loop { sleep 1 }在脚本末尾添加类似以下内容,以确保进程不会立即终止。
任务排队
如果每次调用都创建一个新线程,最终我们会遇到资源限制(包括我们这边和我们访问的网站的资源限制)。为了避免对服务器造成负面影响,我们应该将实现方式改为异步,但又不会让人感觉像是在发起拒绝服务攻击。
解决此问题的常用方法是使用生产者/消费者模式。一个或多个生产者将任务推送到队列中,而一个或多个消费者从队列中取出任务并进行处理。
队列本质上就是一个元素列表。理论上,一个简单的数组就能满足需求。然而,由于我们处理的是并发问题,因此需要确保同一时间只有一个生产者或消费者可以访问队列。如果我们不注意这一点,就会出现混乱——就像两个人同时试图挤过一扇门一样。
这个问题被称为生产者-消费者问题,有多种解决方案。幸运的是,这是一个非常常见的问题,Ruby 自带一个完善的Queue实现,我们可以直接使用,无需担心线程同步的问题。
为了使用它,我们需要确保生产者和消费者都能访问队列。为此,我们向Magique模块添加一个类方法,并为其分配一个实例Queue。
module Magique
def self.backend
@backend
end
def self.backend=(backend)
@backend = backend
end
end
Magique.backend = Queue.new
接下来,我们修改perform_async实现方式,将任务推送到队列中,而不是创建新线程。任务以哈希表的形式表示,其中包含对工作类的引用以及传递给perform_async方法的参数。
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Magique.backend.push(worker: self, args: args)
end
end
end
end
至此,生产者一方的内容就讲完了。接下来,我们来看看消费者一方。
每个消费者都是一个独立的线程,它从队列中获取任务并执行。与线程执行完一个任务后停止不同,消费者会继续从队列中获取另一个任务并执行,以此类推。以下是一个名为 `<consumer_name>` 的消费者的基本实现Magique::Processor。每个处理器都会创建一个无限循环的新线程。在每次迭代中,它都会尝试从队列中获取一个新任务,创建一个新的工作类实例,并使用perform给定的参数调用其 `get` 方法。
module Magique
class Processor
def self.start(concurrency = 1)
concurrency.times { |n| new("Processor #{n}") }
end
def initialize(name)
thread = Thread.new do
loop do
payload = Magique.backend.pop
worker_class = payload[:worker]
worker_class.new.perform(*payload[:args])
end
end
thread.name = name
end
end
end
除了处理循环之外,我们还添加了一个名为 `startStart` 的便捷方法Magique::Processor.start。这允许我们同时启动多个处理器。虽然给线程命名并非绝对必要,但它可以帮助我们确认程序是否按预期运行。
让我们调整输出,TitleExtractorWorker使其包含当前线程的名称。
puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"
为了测试我们的后台处理设置,我们首先需要启动一组处理器,然后再将任务加入队列。
Magique.backend = Queue.new
Magique::Processor.start(5)
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
运行此程序后,我们仍然可以获取所有文章的标题。虽然它不如为每个任务使用单独的线程那么快,但仍然比最初没有后台处理的实现方式要快。由于添加了处理器名称,我们还可以确认所有处理器都在处理队列。通过调整并发处理器的数量,可以在处理速度和现有资源限制之间找到平衡点。
扩展到多个流程和机器
目前,我们后台处理系统的实现效果尚可。不过,它仍然局限于单个进程。资源密集型任务仍然会影响整个进程的性能。最后,我们来探讨一下如何将工作负载分配到多个进程,甚至多台机器上。
队列是生产者和消费者之间唯一的连接。目前,它使用的是内存队列。让我们从 Sidekiq 中汲取更多灵感,使用Redis来实现队列。
Redis 支持列表,允许我们推送和获取任务。此外,Redis Ruby gem 是线程安全的,并且修改列表的 Redis 命令是原子性的。这些特性使得我们可以将其用于异步后台处理系统,而不会遇到同步问题。
让我们创建一个基于 Redis 的队列,实现与我们之前使用的相同的push`and`方法。shiftQueue
require 'json'
require 'redis'
module Magique
module Backend
class Redis
def initialize(connection = ::Redis.new)
@connection = connection
end
def push(job)
@connection.lpush('magique:queue', JSON.dump(job))
end
def shift
_queue, job = @connection.brpop('magique:queue')
payload = JSON.parse(job, symbolize_names: true)
payload[:worker] = Object.const_get(payload[:worker])
payload
end
end
end
end
由于 Redis 对 Ruby 对象一无所知,因此我们必须先将任务序列化为 JSON,然后再使用lpush向列表开头添加元素的命令将其存储到数据库中。
要从队列中获取任务,我们使用 ` brpopget_task()` 命令,该命令会获取列表中的最后一个元素。如果列表为空,它将阻塞,直到有新元素可用。这是一种在没有任务可用时暂停处理器的好方法。最后,从 Redis 中获取任务后,我们需要使用 `get_task()` 命令根据 worker 的名称查找实际的 Ruby 类Object.const_get。
最后一步,我们将流程拆分成多个进程。在生产者端,我们只需要将后端更改为我们新实现的 Redis 队列即可。
# ...
Magique.backend = Magique::Backend::Redis.new
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
从消费者的角度来看,我们可以这样写几句话:
# ...
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
loop { sleep 1 }
执行时,消费者进程会等待新任务到达队列。一旦我们启动生产者进程并将任务推送到队列,我们就可以看到这些任务立即被处理。
请理性享用,切勿用于生产用途。
虽然我们尽量避免将其模拟成生产环境中实际使用的场景(所以千万不要这样做!),但我们还是采取了一些步骤来构建后台处理器。首先,我们让进程作为后台服务运行。然后,我们将其改为异步模式,并用它Queue来解决生产者-消费者问题。之后,我们使用 Redis 而不是内存实现,将该进程扩展到多个进程或多台机器上。
如前所述,这只是后台处理系统的简化实现。其中缺少许多功能,也没有明确处理。这些功能包括(但不限于)错误处理、多队列、调度、连接池和信号处理。
尽管如此,我们撰写本文的过程十分愉快,也希望您喜欢这次对后台处理系统运作机制的深入了解。或许您还能从中有所收获。
特约撰稿人Benedikt Deicke是一位软件工程师,也是Userlist.io的首席技术官。他业余时间正在撰写一本关于使用Ruby on Rails 构建 SaaS 应用的书籍。您可以通过Twitter联系 Benedikt 。
文章来源:https://dev.to/appsignal/learning-by-building-a-background-processing-system-in-ruby-3onb