发布于 2026-01-06 1 阅读
0

在 Docker 上玩转 Apache Storm——像个大佬一样

在 Docker 上玩转 Apache Storm——像个大佬一样

本文并非《风暴》的终极指南,也无意成为终极指南。《风暴》的内容非常丰富,一篇长文恐怕难以全面展现。当然,任何补充、反馈或建设性批评都将不胜感激。
好了,言归正传,让我们来看看本文将涵盖哪些内容:

  • Storm的必要性,它存在的意义,它是什么,它不是什么。
  • 从高空俯瞰其工作原理。
  • Storm拓扑结构在代码(Java)中的大致形式
  • 在 Docker 上搭建和使用生产环境适用的 Storm 集群。
  • 关于消息处理可靠性的几点说明。

我还假设您至少对Docker和容器化技术有一定的了解。

持续不断的数据流无处不在,随着物联网设备数量的不断增长,这种情况愈发普遍。当然,这些数据会被存储、处理和分析,以提供预测性和可操作的结果。但即使使用Hadoop(尽管 MapReduce 功能强大)或Spark(弥补 MapReduce 局限性的方案),分析 PB 级的数据也需要很长时间。其次,很多时候我们并不需要推断长期趋势。对于几个月来收集的 PB 级数据,在任何给定时刻,我们可能并不需要考虑所有数据,而只需要一个实时快照。也许我们不需要知道五年内最热门的话题标签,而只需要知道当前的热门话题标签。Storm 正是为此而设计的:它能够以极快的速度接收来自各种来源的大量数据,对其进行分析,并将实时更新发布到用户界面或其他位置,而无需自身存储任何数据

工作原理

Storm 的架构可以比作连接一系列检查点的道路网络。流量从某个检查点(称为Spout)开始,经过其他检查点(称为Bolt)。流量当然是指Spout从数据源(例如公共 API)检索并路由到各个Bolt的数据流。在 Bolt 中,数据会被过滤、清理、聚合、分析,然后发送到用户界面供用户查看,或发送到其他目标。Spout 和 Bolt 构成的网络称为拓扑结构,数据以元组(包含不同类型的值的列表)的形式流动。


来源:https ://dzone.com/articles/apache-storm-architecture

需要重点讨论的是数据流量的方向。通常情况下,我们会使用一个或多个 Spout 从 API、Kafka 主题或其他队列系统读取数据。数据随后单向流向一个或多个 Bolt,这些 Bolt 又可能将数据转发给其他 Bolt,以此类推。Bolt 可以将分析后的数据发布到 UI 或其他 Bolt。但流量几乎总是单向的,类似于有向无环图 (DAG)。虽然理论上可以形成循环,但我们不太可能需要如此复杂的拓扑结构。

安装 Storm 版本需要多个步骤,您可以自行在本地机器上操作。但稍后我会使用 Docker 容器部署 Storm 集群,镜像会自动完成所有必要的配置。

一些代码

虽然 Storm 确实支持其他语言,但大多数拓扑都是用 Java 编写的,因为这是我们所能获得的最有效的选择。

一个非常简单的、只能随机输出数字的喷嘴可能看起来像这样:

public class RandomDigitSpout extends BaseRichSpout 
{
  // To output tuples from spout to the next stage bolt
  SpoutOutputCollector collector;  

  public void nextTuple() 
  {
    int randomDigit = ThreadLocalRandom.current().nextInt(0, 10);

    // Emit the digit to the next stage bolt
    collector.emit(new Values(randomDigit));
  }

  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
  {
    // Tell Storm the schema of the output tuple for this spout.
    // It consists of a single column called 'random-digit'.
    outputFieldsDeclarer.declare(new Fields("random-digit"));
  }
}
Enter fullscreen mode Exit fullscreen mode

还有一个简单的螺栓,它接收随机数流,然后只发出偶数:

public class EvenDigitBolt extends BaseRichBolt 
{
  // To output tuples from this bolt to the next bolt.
  OutputCollector collector;

  public void execute(Tuple tuple) 
  {
    // Get the 1st column 'random-digit' from the tuple
    int randomDigit = tuple.getInt(0);

    if (randomDigit % 2 == 0) {
      collector.emit(new Values(randomDigit));
    }
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) 
  {
    // Tell Storm the schema of the output tuple for this bolt.
    // It consists of a single column called 'even-digit'
    declarer.declare(new Fields("even-digit"));
  }
}
Enter fullscreen mode Exit fullscreen mode

另一个简单的螺栓,它将接收来自的过滤数据流EvenDigitBolt,并将每个偶数乘以 10 后向前发射:

public class MultiplyByTenBolt extends BaseRichBolt 
{
  OutputCollector collector;

  public void execute(Tuple tuple) 
  {
    // Get 'even-digit' from the tuple.
    int evenDigit = tuple.getInt(0);

    collector.emit(new Values(evenDigit * 10));
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) 
  {
    declarer.declare(new Fields("even-digit-multiplied-by-ten"));
  }
}
Enter fullscreen mode Exit fullscreen mode

将它们组合起来形成我们的拓扑结构:

package packagename
// ...

public class OurSimpleTopology { 

  public static void main(String[] args) throws Exception
  {
    // Create the topology
    TopologyBuilder builder = new TopologyBuilder();

    // Attach the random digit spout to the topology.
    // Use just 1 thread for the spout.
    builder.setSpout("random-digit-spout", new RandomDigitSpout());

    // Connect the even digit bolt to our spout. 
    // The bolt will use 2 threads and the digits will be randomly
    // shuffled/distributed among the 2 threads.
    // The third parameter is formally called the parallelism hint.
    builder.setBolt("even-digit-bolt", new EvenDigitBolt(), 2)
           .shuffleGrouping("random-digit-spout");

    // Connect the multiply-by-10 bolt to our even digit bolt.
    // This bolt will use 4 threads, among which data from the
    // even digit bolt will be shuffled/distributed randomly.
    builder.setBolt("multiplied-by-ten-bolt", new MultiplyByTenBolt(), 4)
           .shuffleGrouping("even-digit-bolt");

    // Create a configuration object.
    Config conf = new Config();

    // The number of independent JVM processes this topology will use.
    conf.setNumWorkers(2);

    // Submit our topology with the configuration.
    StormSubmitter.submitTopology("our-simple-topology", conf, builder.createTopology());
  }
}
Enter fullscreen mode Exit fullscreen mode

风暴拓扑中的并行性

完全理解 Storm 中的并行性可能令人望而生畏,至少就我的经验而言是这样。一个拓扑结构至少需要一个进程来运行(这是显而易见的)。在这个进程中,我们可以使用线程来并行执行 Spout 和 Bolt。在我们的示例中,RandomDigitSpout只会启动一个线程,而该线程输出的数据将被分配到两个线程中EvenDigitBolt。但是,这种分配方式(称为流分组)可能至关重要。例如,您可能有一个来自两个城市的温度记录流,其中 Spout 输出的元组如下所示:

// City name, temperature, time of recording

("Atlanta",       94, "2018-05-11 23:14")
("New York City", 75, "2018-05-11 23:15")
("New York City", 76, "2018-05-11 23:16")
("Atlanta",       96, "2018-05-11 23:15")
("New York City", 77, "2018-05-11 23:17")
("Atlanta",       95, "2018-05-11 23:16")
("New York City", 76, "2018-05-11 23:18")
Enter fullscreen mode Exit fullscreen mode

假设我们只连接一个螺栓,其作用是计算每个城市平均温度的变化。如果我们可以合理预期在任何给定的时间间隔内,从两个城市获得的数据元组数量大致相等,那么为这个螺栓分配两个线程就很有意义,可以将亚特兰大的数据发送到一个线程,将纽约的数据发送到另一个线程。字段分组可以满足我们的需求,它可以根据分组中指定的字段值将数据分配给不同的线程:

// The tuples with the same city name will go to the same thread.
builder.setBolt("avg-temp-bolt", new AvgTempBolt(), 2)
       .fieldsGrouping("temp-spout", new Fields("city_name"));
Enter fullscreen mode Exit fullscreen mode

当然,还有其他类型的分组。不过,在大多数情况下,分组可能并不重要,您可以直接打乱数据顺序,然后随机将其分配到各个线程中(打乱分组)。
现在,这里还有另一个重要的组成部分:运行我们拓扑结构的进程数。我们指定的线程总数将平均分配给各个进程。因此,在我们的随机数字拓扑示例中,我们有 1 个 Spout 线程、2 个偶数位线程和 4 个乘以 10 的线程(共 7 个线程)。每个进程将负责运行 2 个乘以 10 的线程、1 个偶数位线程,其中一个进程还将运行 1 个 Spout 线程。


当然,这两个工作进程会有各自的主线程,这些主线程又会启动 spout 和 bolt 线程。所以总共会有 9 个线程。这些线程统称为执行器

需要注意的是,如果将 Spout 的并行提示设置为大于 1(即使用多个执行器),则可能会多次发出相同的数据。例如,假设 Spout 从公共 Twitter 流 API 读取数据并使用两个执行器。这意味着接收 Spout 数据的 Bolt 将收到两次相同的推文。只有在 Spout 发出元组之后,数据并行才会发挥作用,即根据指定的流分组将元组分配给各个 Bolt。

在单个节点上运行多个工作进程意义不大。不过,稍后我们会使用一个真正的分布式多节点集群,看看工作进程是如何分配到不同节点上的。

构建我们的拓扑结构

以下是我建议的目录结构:

yourproject/
            pom.xml  
            src/
                jvm/
                    packagename/
                           RandomDigitSpout.java
                           EvenDigitBolt.java
                           MultiplyByTenBolt.java
                           OurSimpleTopology.java
Enter fullscreen mode Exit fullscreen mode

Maven通常用于构建 Storm 拓扑,它需要一个pom.xml文件(POM 文件)来定义各种配置细节、项目依赖项等。深入探讨POM 文件的细节在这里可能有点过于复杂。

  • 首先,我们将运行mvn clean内部命令yourproject来清除可能存在的任何已编译文件,确保从头开始编译每个模块。
  • 然后,mvn package我们需要编译代码并将其打包成一个可执行的 JAR 文件,放在新建的target文件夹中。第一次执行此操作可能需要几分钟时间,尤其是在您的拓扑结构有很多依赖项的情况下。
  • 提交我们的拓扑结构:storm jar target/packagename-{version number}.jar packagename.OurSimpleTopology

希望到目前为止,Storm 的概念和代码之间的差距已经有所缩小。然而,任何真正意义上的 Storm 部署都不会是运行在单个服务器上的单一拓扑实例。

风暴群是什么样子的

为了充分利用 Storm 的可扩展性容错性,任何生产级拓扑结构都将提交给机器集群。

Storm 发行版安装在主节点(Nimbus)和所有从节点(Supervisor)上。
节点运行 Storm Nimbus守护进程和 Storm 用户界面。从节点运行 Storm Supervisor守护进程。一个位于独立节点上的ZooKeeper守护进程用于主节点和从节点之间的协调。顺便一提,ZooKeeper 仅用于集群管理,绝不用于任何消息传递。它不会像 Spout 和 Bolt 那样互相发送数据。Nimbus 守护进程通过 ZooKeeper 查找可用的 Supervisor,Supervisor 守护进程则向这些 Supervisor 注册自身。此外,它还执行其他管理任务,其中一些任务稍后会详细说明。

Storm UI 是一个用于管理集群状态的 Web 界面。我们稍后会详细介绍它。

我们的拓扑结构会提交给主节点上的 Nimbus 守护进程,然后分发给运行在从节点/主管节点上的工作进程。由于采用了 Zookeeper,初始运行多少个从节点/主管节点都无关紧要,因为您可以随时无缝添加更多节点,Storm 会自动将它们集成到集群中。

每当我们启动一个 Supervisor 节点时,它都会分配一定数量的工作进程(我们可以配置这些进程),这些进程随后可供提交的拓扑使用。因此,在上图中,总共分配了 5 个工作进程。请记住这行代码: 这意味着拓扑将尝试使用总共 5 个工作进程。由于我们的两个 Supervisor 节点总共分配了 5 个工作进程,因此每个分配的工作进程将运行一个拓扑实例。如果我们执行了以下操作: 那么将有一个工作进程处于空闲/未使用状态。如果指定的工作进程数量为 6,而分配的工作进程总数为 5,那么由于限制,只有 5 个实际的拓扑工作进程会运行。
conf.setNumWorkers(5)

conf.setNumWorkers(4)

在使用 Docker 进行所有设置之前,关于容错性,有几点需要牢记:

  • 如果任何从节点上的任何工作进程崩溃,Supervisor守护进程会重启该进程。如果重启多次失败,该工作进程将被重新分配到另一台机器上。
  • 如果整个从节点崩溃,它所承担的工作将交给另一个主管/从节点。
  • 如果 Nimbus 服务器宕机,工作节点不会受到影响。但是,在 Nimbus 服务器恢复之前,即使工作节点所在的节点发生故障,它们也不会被重新分配到其他从节点。
  • Nimbus 和 Supervisor 本身是无状态的,但通过 Zookeeper,会存储一些状态信息,以便在节点崩溃或守护进程意外终止时,可以从中断的地方继续运行。
  • Nimbus、Supervisor 和 Zookeeper 守护进程都是快速失败的。这意味着它们本身对意外错误的容忍度很低,一旦遇到错误就会立即关闭。因此,它们必须由监控程序进行监督运行,该程序会持续监控它们并在崩溃时自动重启。Supervisord可能是最常用的选择(不要与 Storm Supervisor 守护进程混淆)。

注意:在大多数 Storm 集群中,Nimbus 本身并非以单实例形式部署,而是以集群形式部署。如果未采用这种容错机制,一旦唯一的 Nimbus 实例发生故障,我们将无法提交新的拓扑、优雅地终止正在运行的拓扑、在 Supervisor 节点崩溃时将工作重新分配给其他节点等等。为简单起见,我们的示例集群将使用单实例。同样,Zookeeper 通常也以集群形式部署,但我们这里也只使用一个实例。

集群容器化

启动单个容器及其相关操作可能比较繁琐,所以我更倾向于使用Docker Compose。初始阶段,我们将使用一个 Zookeeper 节点、一个 Nimbus 节点和一个 Supervisor 节点。它们将被定义为 Compose 服务,每个服务最初对应一个容器。之后,我将使用Compose 扩展功能添加另一个 Supervisor 节点(容器)。以下是我们的完整代码和项目结构:

zookeeper/
         Dockerfile
storm-nimbus/
         Dockerfile
         storm.yaml
         code/
             pom.xml
             src/
                 jvm/
                     coincident_hashtags/
                                ExclamationTopology.java  
storm-supervisor/
         Dockerfile
         storm.yaml
docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

以及我们的docker-compose.yml

version: '3.2'

services:
    zookeeper:
        build: ./zookeeper
        # Keep it running.  
        tty: true

    storm-nimbus:
        build: ./storm-nimbus
        # Run this service after 'zookeeper' and make 'zookeeper' reference.
        links:
            - zookeeper
        tty: true
        # Map port 8080 of the host machine to 8080 of the container.
        # To access the Storm UI from our host machine.
        ports:
            - 8080:8080
        volumes:
            - './storm-nimbus:/theproject'

    storm-supervisor:
        build: ./storm-supervisor
        links:
            - zookeeper
            - storm-nimbus
        tty: true

# Host volume used to store our code on the master node (Nimbus).
volumes:
    storm-nimbus:
Enter fullscreen mode Exit fullscreen mode

欢迎查看 Dockerfile 文件。它们主要用于在相关容器中安装依赖项(Java 8、Storm、Maven、Zookeeper 等)。
这些storm.yaml文件会覆盖 Storm 安装的某些默认配置。NimbusADD storm.yaml /conf和 Supervisor Dockerfile 中的相应行会将它们放置在 Storm 可以读取的容器中
storm-nimbus/storm.yaml

# The Nimbus needs to know where the Zookeeper is. This specifies the list of the
# hosts in the Zookeeper cluster. We're using just one node, of course.
# 'zookeeper' is the Docker Compose network reference.
storm.zookeeper.servers:
  - "zookeeper"
Enter fullscreen mode Exit fullscreen mode

storm-supervisor/storm.yaml

# Telling the Supervisor where the Zookeeper is.
storm.zookeeper.servers:
  - "zookeeper"

# The worker nodes need to know which machine(s) are the candidate of master
# in order to download the topology jars.
nimbus.seeds : ["storm-nimbus"]

# For each Supervisor, we configure how many workers run on that machine. 
# Each worker uses a single port for receiving messages, and this setting 
# defines which ports are open for use. We define four ports here, so Storm will 
# allocate up to four workers to run on this node.
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
Enter fullscreen mode Exit fullscreen mode

这些选项足以满足我们集群的需求。感兴趣的用户可以点击此处查看所有默认配置

docker-compose up在项目根目录下运行。

所有镜像构建完成且所有服务启动完毕后,打开一个新的终端,输入命令docker ps,您将看到类似这样的内容:

启动光环

让我们使用容器名称通过 SSH 连接到 Nimbus 容器, 然后启动 Nimbus 守护进程:
docker exec -it coincidenthashtagswithapachestorm_storm-nimbus_1 bash

storm nimbus

启动风暴用户界面

同样地,打开另一个终端,再次通过 SSH 连接到 Nimbus,然后使用以下命令启动 UI storm ui

在浏览器中访问localhost:8080该网站,您将看到我们集群的概览:

集群概览中的空闲槽位”表示所有 Supervisor 节点上有多少工作节点可用,并等待拓扑结构来使用它们。“已用槽位”表示当前有多少工作节点正在处理某个拓扑结构。由于我们尚未启动任何 Supervisor 节点,因此这两个数值均为零。稍后我们将讨论ExecutorTasks。此外,正如我们所见,目前还没有任何拓扑结构提交。

启动监督节点

通过 SSH 连接到 Supervisor 容器并启动 Supervisor 守护进程:
docker exec -it coincidenthashtagswithapachestorm_storm-supervisor_1 bash
storm supervisor

现在我们来刷新一下用户界面:

注意:集群中的任何更改可能需要几秒钟才能反映在用户界面上。

我们新运行了一个 Supervisor,它自带四个分配的工作节点。这四个工作节点是通过在storm.yamlSupervisor 节点配置中指定四个端口而分配的。当然,它们都是空闲的(四个空闲槽位)。让我们向 Nimbus 提交一个拓扑结构,然后让它们开始工作。

向 Nimbus 提交拓扑结构

在新的终端中通过 SSH 连接到 Nimbus 服务器。我已经编写了Dockerfile,以便我们进入工作(着陆)目录/theproject。该目录下code包含我们的拓扑结构。我们的拓扑结构非常简单。它使用一个 Spout 来生成随机单词,并使用一个 Bolt 来在单词末尾添加三个感叹号 (!!!)。两个 Bolt 会连续添加,因此在流的末尾,我们会得到带有六个感叹号的单词。它还指定需要三个工作进程 ( conf.setNumWorkers(3) )。

public static void main(String[] args) throws Exception
{
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);

    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();

    // Turn on  debugging mode
    conf.setDebug(true);

    conf.setNumWorkers(3);

    StormSubmitter.submitTopology("exclamation-topology", conf, builder.createTopology());
}
Enter fullscreen mode Exit fullscreen mode
  1. cd code
  2. mvn clean
  3. mvn package
  4. storm jar target/coincident-hashtags-1.2.1.jar coincident_hashtags.ExclamationTopology

拓扑提交成功后,刷新用户界面:

拓扑结构提交后,Zookeeper 立刻收到了通知。Zookeeper 随后通知 Supervisor 从 Nimbus 下载代码。现在我们可以看到拓扑结构,其中三个工作线程正在运行,只剩下一个空闲的。
此外,还有 10 个单词输出线程、3 个 exclaim1 线程、2 个 exclaim 线程以及来自工作线程的 3 个主线程,总共 18 个执行器。您可能已经注意到一些新东西:任务

任务到底是什么?

这是 Storm 并行处理机制中的另一个概念。不过别担心,任务只是执行器使用的 Spout 或 Bolt 的一个实例;真正执行处理的是 Spout 或 Bolt。默认情况下,任务数量等于执行器数量。在极少数情况下,你可能需要每个执行器实例化更多任务。

// Each of the two executors (threads) of this bolt will instantiate
// two objects of this bolt (total 4 bolt objects/tasks).
builder.setBolt("even-digit-bolt", new EvenDigitBolt(), 2)
       .setNumTasks(4) 
       .shuffleGrouping("random-digit-spout");
Enter fullscreen mode Exit fullscreen mode


这是我的一个不足之处,但我实在想不出有什么合适的用例需要每个执行器执行多个任务。或许如果我们自己添加一些并行机制,比如在 Bolt 内部创建一个新线程来处理耗时较长的任务,那么主执行器线程就不会阻塞,可以继续使用其他 Bolt 进行处理。然而,这样做会使我们的拓扑结构变得难以理解。如果有人知道多任务带来的性能提升能够抵消其增加的复杂性,请留言分享。

总之,回到刚才的话题,我们来看一下拓扑概览。点击“拓扑概览”下方的名称,向下滚动到“工作节点资源” 我们可以清楚地看到执行器(线程)在 3 个工作节点上的分配情况。当然,这 3 个工作节点都运行在我们运行的同一个 Supervisor 节点上。

现在,让我们假设要进行规模化扩展!

添加另一位主管

从项目根目录,我们添加另一个 Supervisor 节点/容器。
docker-compose scale storm-supervisor=2

通过 SSH 连接到新容器:
docker exec -it coincidenthashtagswithapachestorm_storm-supervisor_2 bash

启动:
storm supervisor

刷新界面后,你会看到我们已成功添加了一个新的 Supervisor 和四个 Worker(总共 8 个 Worker/插槽)。为了充分发挥新 Supervisor 的优势,让我们增加拓扑中的 Worker 数量。

  • 先干掉正在奔跑的那个:storm kill exclamation-topology
  • 将此行更改为:conf.setNumWorkers(6)
  • 请修改项目版本号pom.xml。尝试使用更合适的版本控制方案,例如语义化版本控制。我个人还是会使用 1.2.1 版本。
  • 重建拓扑结构:mvn package
  • 重新提交:storm jar target/coincident-hashtags-1.2.1.jar coincident_hashtags.ExclamationTopology

重新加载用户界面: 现在您可以看到新的 Supervisor 和 8 个可用工作线程中的 6 个正在运行。同样重要的是,这 6 个正在运行的工作线程已平均分配给两个 Supervisor。再次点击拓扑名称并向下滚动。 我们可以看到两个唯一的 Supervisor ID,它们分别运行在不同的节点上,并且所有执行器都均匀地分布在它们之间。这很棒。但 Storm 还提供了另一种在拓扑运行时进行此操作的巧妙方法,称为重新平衡。在 Nimbus 上,我们可以运行:(将工作线程数从 3 个增加到 6 个) 或者,要更改特定组件的执行器数量:




storm rebalance exclamation-topology -n 6

storm rebalance exclamation-topology -e even-digit-bolt=3

可靠消息处理

我们尚未解决的一个问题是,如果一个 Bolt 处理元组失败会发生什么。Storm 提供了一种机制,允许发起 Spout(具体来说是任务)重放失败的元组。这种处理保证并非自然而然发生,而是一种有意为之的设计选择,并且确实会增加延迟。Spout
将元组发送给 Bolt,Bolt 再将基于输入元组的子元组发送给其他 Bolt,以此类推。最初的一个元组会引发一整棵元组树。如果原始元组的任何子元组(姑且这么说)失败,那么任何补救措施(回滚等)都可能需要在多个 Bolt 上执行。这可能会变得非常复杂,因此 Storm 的做法是允许从源头(Spout)重新发送原始元组。因此,Bolt 执行的任何基于传入元组的操作都应该是幂等的。当元组树中的每个元组都已被处理完毕时,该元组才被视为“完全处理”,并且每个元组都必须由 Bolt 显式确认。然而,这还不是全部。还有另一件事需要显式完成:维护原始元组与其子元组之间的链接。Storm 随后就能追踪子元组的来源,从而重放原始元组。这称为锚定而我们的感叹号 Bolt 中已经实现了这一点

// ExclamationBolt

// 'tuple' is the original one received from the test word spout.
// It's been anchored to/with the tuple going out.
_collector.emit(tuple, new Values(exclamatedWord.toString()));

// Explicitly acknowledge that the tuple has been processed.
_collector.ack(tuple);
Enter fullscreen mode Exit fullscreen mode

如果 Spout 上的方法已实现,则调用该方法ack会导致调用它。例如,假设你正在从某个队列中读取元组数据,并且只有在元组完全处理完毕后才能将其从队列中取出。那么,你就可以在该方法中执行此操作。你也可以在不进行锚定的情况下发出元组,从而放弃可靠性。ackack_collector.emit(new Values(exclamatedWord.toString()))

元组失败有两种情况:
i) Bolt 进程崩溃,元组超时;或者由于其他原因超时。默认超时时间为 30 秒,可以通过以下方式更改:config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60)
ii)fail在 Bolt 进程中显式调用元组的方法_collector.fail(tuple)。您可以在发生异常时执行此操作。

在这两种情况下,fail如果 Spout 上的方法已实现,则会调用该方法。如果我们希望重放元组,则必须在fail方法中显式调用 `replay()` emit,就像在 `replay()` 中一样nextTuple()。跟踪元组时,每个元组都必须被ack`ed` 或fail`ed`。否则,拓扑最终会耗尽内存。
同样重要的是,编写自定义 Spout 和 Bolt 时,所有这些操作都必须自行完成。但 Storm 核心可以提供帮助。例如,实现了`BaseBasicBolt` 的Bolt会自动执行确认操作。或者,内置的适用于Kafka等常用数据源的 Spout会在确认和失败后处理排队和重放逻辑。

临别赠言

设计 Storm 拓扑或集群始终是一个不断调整各种参数并最终找到最佳结果的过程。一些技巧可以帮助你完成这个过程,例如使用配置文件读取并行提示、工作节点数量等信息,这样就无需反复编辑和重新编译代码。逻辑地定义你的 Bolt,每个 Bolt 对应一个不可分割的任务,并保持它们的轻量级和高效性。同样,Spout 的nextTuple()方法也应该进行优化。
有效利用 Storm UI。默认情况下,它不会显示完整的信息,只会显示 5% 的已发出元组。要监控所有元组,请使用 `mvn` 命令config.setStatsSampleRate(1.0d)。通过 UI 密切关注各个 Bolt 和拓扑的AcksLatency值,这才是你在调整参数时需要关注的重点。

文章来源:https://dev.to/usamaashraf/playing-with-apache-storm-on-docker---like-a-boss-4bgb