发布于 2026-01-06 1 阅读
0

边学边做,用 Ruby 构建一个后台处理系统

边学边做,用 Ruby 构建一个后台处理系统

在今天的文章中,我们将实现一个简单的后台处理系统,纯粹是为了好玩!在这个过程中,我们或许还能了解一些关于像Sidekiq这样流行的后台处理系统的内部原理。需要注意的是,这个系统并非用于生产环境。

假设我们的应用程序中有一个任务,需要加载一个或多个网站并提取它们的标题。由于我们无法控制这些网站的性能,我们希望在主线程(或者如果是 Web 应用程序,则在当前请求之外)后台执行此任务。

封装任务

在深入后台处理之前,我们先来构建一个服务对象来执行当前任务。我们将使用OpenURINokogiri来提取 title 标签的内容。

require 'open-uri' 
require 'nokogiri' 

class TitleExtractorService
  def call(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}" 
  end
end

调用该服务会打印给定 URL 的标题。

TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

这部分功能运行正常,但我们不妨稍微改进一下语法,让它看起来更像其他后台处理系统。通过创建一个Magique::Worker模块,我们可以为服务对象添加一些语法糖。

module Magique
  module Worker
    def self.included(base)
      base.extend(ClassMethods)
    end

    module ClassMethods
      def perform_now(*args)
        new.perform(*args)
      end
    end

    def perform(*)
      raise NotImplementedError
    end
  end
end

该模块perform向 worker 实例添加了一个方法,perform_now向 worker 类添加了一个方法,以改进调用。

让我们把这个模块包含到我们的服务对象中。顺便一提,我们也把它重命名为,TitleExtractorWorker并将call方法更改为perform

class TitleExtractorWorker
  include Magique::Worker

  def perform(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

调用结果仍然相同,但现在情况更清楚了。

TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

实现异步处理

现在标题提取功能已经实现,我们可以获取所有以往 Ruby Magic 文章的标题。为此,假设我们有一个RUBYMAGIC常量,其中包含所有以往文章的 URL 列表。

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_now(url)
end

# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

我们可以获取以往文章的标题,但提取所有标题需要一些时间。这是因为我们需要等待每个请求完成后才能进行下一个请求。

让我们通过perform_async在工作模块中引入一个方法来改进这一点。为了提高速度,它会为每个 URL 创建一个新线程。

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Thread.new { new.perform(*args) }
      end
    end
  end
end

修改调用方式后TitleExtractorWorker.perform_async(url),我们几乎可以立即获取所有标题。然而,这也意味着我们会同时打开超过 20 个与 Ruby Magic 博客的连接。(抱歉打扰到大家的博客了! 😅)

如果您正在按照自己的实现进行操作,并在长时间运行的进程(例如 Web 服务器)之外进行测试,请不要忘记loop { sleep 1 }在脚本末尾添加类似以下内容,以确保进程不会立即终止。

任务排队

如果每次调用都创建一个新线程,最终我们会遇到资源限制(包括我们这边和我们访问的网站的资源限制)。为了避免对服务器造成负面影响,我们应该将实现方式改为异步,但又不会让人感觉像是在发起拒绝服务攻击。

解决此问题的常用方法是使用生产者/消费者模式。一个或多个生产者将任务推送到队列中,而一个或多个消费者从队列中取出任务并进行处理。

队列本质上就是一个元素列表。理论上,一个简单的数组就能满足需求。然而,由于我们处理的是并发问题,因此需要确保同一时间只有一个生产者或消费者可以访问队列。如果我们不注意这一点,就会出现混乱——就像两个人同时试图挤过一扇门一样。

这个问题被称为生产者-消费者问题,有多种解决方案。幸运的是,这是一个非常常见的问题,Ruby 自带一个完善的Queue实现,我们可以直接使用,无需担心线程同步的问题。

为了使用它,我们需要确保生产者和消费者都能访问队列。为此,我们向Magique模块添加一个类方法,并为其分配一个实例Queue

module Magique
  def self.backend
    @backend
  end

  def self.backend=(backend)
    @backend = backend
  end
end

Magique.backend = Queue.new

接下来,我们修改perform_async实现方式,将任务推送到队列中,而不是创建新线程。任务以哈希表的形式表示,其中包含对工作类的引用以及传递给perform_async方法的参数。

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Magique.backend.push(worker: self, args: args)
      end
    end
  end
end

至此,生产者一方的内容就讲完了。接下来,我们来看看消费者一方。

每个消费者都是一个独立的线程,它从队列中获取任务并执行。与线程执行完一个任务后停止不同,消费者会继续从队列中获取另一个任务并执行,以此类推。以下是一个名为 `<consumer_name>` 的消费者的基本实现Magique::Processor。每个处理器都会创建一个无限循环的新线程。在每次迭代中,它都会尝试从队列中获取一个新任务,创建一个新的工作类实例,并使用perform给定的参数调用其 `get` 方法。

module Magique
  class Processor
    def self.start(concurrency = 1)
      concurrency.times { |n| new("Processor #{n}") }
    end

    def initialize(name)
      thread = Thread.new do
        loop do
          payload = Magique.backend.pop
          worker_class = payload[:worker]
          worker_class.new.perform(*payload[:args])
        end
      end

      thread.name = name
    end
  end
end

除了处理循环之外,我们还添加了一个名为 `startStart` 的便捷方法Magique::Processor.start。这允许我们同时启动多个处理器。虽然给线程命名并非绝对必要,但它可以帮助我们确认程序是否按预期运行。

让我们调整输出,TitleExtractorWorker使其包含当前线程的名称。

puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"

为了测试我们的后台处理设置,我们首先需要启动一组处理器,然后再将任务加入队列。

Magique.backend = Queue.new
Magique::Processor.start(5)

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

运行此程序后,我们仍然可以获取所有文章的标题。虽然它不如为每个任务使用单独的线程那么快,但仍然比最初没有后台处理的实现方式要快。由于添加了处理器名称,我们还可以确认所有处理器都在处理队列。通过调整并发处理器的数量,可以在处理速度和现有资源限制之间找到平衡点。

扩展到多个流程和机器

目前,我们后台处理系统的实现效果尚可。不过,它仍然局限于单个进程。资源密集型任务仍然会影响整个进程的性能。最后,我们来探讨一下如何将工作负载分配到多个进程,甚至多台机器上。

队列是生产者和消费者之间唯一的连接。目前,它使用的是内存队列。让我们从 Sidekiq 中汲取更多灵感,使用Redis来实现队列。

Redis 支持列表,允许我们推送和获取任务。此外,Redis Ruby gem 是线程安全的,并且修改列表的 Redis 命令是原子性的。这些特性使得我们可以将其用于异步后台处理系统,而不会遇到同步问题。

让我们创建一个基于 Redis 的队列,实现与我们之前使用的相同的push`and`方法shiftQueue

require 'json'
require 'redis'

module Magique
  module Backend
    class Redis
      def initialize(connection = ::Redis.new)
        @connection = connection
      end

      def push(job)
        @connection.lpush('magique:queue', JSON.dump(job))
      end

      def shift
        _queue, job = @connection.brpop('magique:queue')
        payload = JSON.parse(job, symbolize_names: true)
        payload[:worker] = Object.const_get(payload[:worker])
        payload
      end
    end
  end
end

由于 Redis 对 Ruby 对象一无所知,因此我们必须先将任务序列化为 JSON,然后再使用lpush向列表开头添加元素的命令将其存储到数据库中。

要从队列中获取任务,我们使用 ` brpopget_task()` 命令,该命令会获取列表中的最后一个元素。如果列表为空,它将阻塞,直到有新元素可用。这是一种在没有任务可用时暂停处理器的好方法。最后,从 Redis 中获取任务后,我们需要使用 `get_task()` 命令根据 worker 的名称查找实际的 Ruby 类Object.const_get

最后一步,我们将流程拆分成多个进程。在生产者端,我们只需要将后端更改为我们新实现的 Redis 队列即可。

# ...

Magique.backend = Magique::Backend::Redis.new

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

从消费者的角度来看,我们可以这样写几句话:

# ...

Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)

loop { sleep 1 }

执行时,消费者进程会等待新任务到达队列。一旦我们启动生产者进程并将任务推送到队列,我们​​就可以看到这些任务立即被处理。

请理性享用,切勿用于生产用途。

虽然我们尽量避免将其模拟成生产环境中实际使用的场景(所以千万不要这样做!),但我们还是采取了一些步骤来构建后台处理器。首先,我们让进程作为后台服务运行。然后,我们将其改为异步模式,并用它Queue来解决生产者-消费者问题。之后,我们使用 Redis 而不是内存实现,将该进程扩展到多个进程或多台机器上。

如前所述,这只是后台处理系统的简化实现。其中缺少许多功能,也没有明确处理。这些功能包括(但不限于)错误处理、多队列、调度、连接池和信号处理。

尽管如此,我们撰写本文的过程十分愉快,也希望您喜欢这次对后台处理系统运作机制的深入了解。或许您还能从中有所收获。

特约撰稿人Benedikt Deicke是一位软件工程师,也是Userlist.io的首席技术官。他业余时间正在撰写一本关于使用Ruby on Rails 构建 SaaS 应用的书籍。您可以通过Twitter联系 Benedikt

文章来源:https://dev.to/appsignal/learning-by-building-a-background-processing-system-in-ruby-3onb