Apache Storm 官方文档 —— 常用模式

原文链接    译者:魏勇

本文列出了 Storm 拓扑中使用的一些常见模式,包括:

  1. 数据流的 join
  2. 批处理
  3. BasicBolt
  4. 内存缓存与域分组的结合
  5. Top N 流式计算
  6. TimeCacheMap
  7. CoordinatedBolt 与 KeyedFairBolt

Joins

数据流的 join 一般指的是通过共有的域来聚合两个或多个数据流的过程。与一般的数据库中 join 操作要求有限的输入与清晰的语义不同,数据流 join 的输入往往是无限的数据集,而且并不具备明确的语义。

join 的类型一般是由应用的需求决定的。有些应用需要将两个流在某个固定时间内的所有 tuple 进行 join,另外一些应用却可能要求对每个 join 域的 join 操作过程的两侧只保留一个 tuple,而其他的应用也许还有一些其他需求。不过这些 join 类型一般都会有一个基本的模式,那就是将多个输入流进行分区。Storm 可以很容易地使用域分组的方法将多个输入流聚集到一个联结 bolt 中,比如下面这样:

builder.setBolt("join", new MyJoiner(), parallelism)
  .fieldsGrouping("1", new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping("2", new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping("3", new Fields("joinfield1", "joinfield2"));

当然,上面的代码只是个例子,实际上不同的流完全可以具有不同的输入域。

批处理

通常由于效率或者其他方面的原因,你需要使用将 tuple 们组合成 batch 来处理,而不是一个个分别处理它们。比如,在做数据库更新操作或者流聚合操作时,你就会需要这样的批处理形式。

要确保数据处理的可靠性,正确的方式是在 bolt 进行批处理之前将 tuple 们缓存在一个实例变量中。在完成批处理操作之后,你就可以一起 ack 所有的缓存的 tuple 了。

如果这个批处理 bolt 还需要继续向下游发送 tuple,你可能还需要使用多锚定(multi-anchoring)来确保可靠性。具体怎么做取决于应用的需求。想要了解更多关于可靠性的工作机制的内容请参考消息的可靠性保障一文。

BasicBolt

Bolt 处理 tuple 的一种基本模式是在 execute 方法中读取输入 tuple、发送出基于输入 tuple 的新 tuple,然后在方法末尾对 tuple 进行应答(ack)。符合这种模式的 bolt 一般是一种函数或者过滤器。对于这种基本的处理模式,Storm 提供了IBasicBolt 接口来自动实现这个过程。更多内容请参考消息的可靠性保障一文。

内存缓存与域分组的结合

在 Storm 的 bolt 中保存一定的缓存也是一种比较常见的方式。尤其是在于域分组结合的时候,缓存的作用特别显著。例如,假如你有一个用于将短链接(short URLs,例如 bit.ly, t.co,等等)转化成长链接(龙 URLs)的 bolt。你可以通过一个将短链接映射到长链接的 LRU 缓存来提高系统的性能,避免反复的 HTTP 请求操作。假如现在有一个名为 “urls” 的组件用于发送短链接,另外有一个 “expand” 组件用于将短链接扩展为长链接,并且在 “expand” 内部保留一个缓存。让我们来看看下面两段代码有什么不同:

builder.setBolt("expand", new ExpandUrl(), parallelism)
  .shuffleGrouping(1);
builder.setBolt("expand", new ExpandUrl(), parallelism)
  .fieldsGrouping("urls", new Fields("url"));

由于域分组可以使得相同的 URL 永远被发往同一个 task,第二段代码会比第一段代码高效得多。这样可以避免在不同的 task 的缓存中的复制动作,并且看上去短 URL 可以更好地在命中缓存。

Top N

Storm 中一种常见的连续计算模式是计算数据流中某种形式的 Top N 结果。假如现在有一个可以以 [“value”, “count”] 的形式发送 tuple 的 bolt,并且你需要一个可以根据 count 计算结果输出前 N 个 tuple 的 bolt。实现这个操作的最简单的方法就是使用一个对数据流进行全局分组的 bolt,并且在内存中维护一个包含 top N 结果的列表。

这种方法并不适用于大规模数据流,因为整个数据流都会发往同一个 task,会造成该 task 的内存负载过高。更好的做法是将数据流分区,同时对每个分区计算 top N 结果,然后将这些结果汇总来得到最终的全局 top N 结果。下面是这个模式的代码:

builder.setBolt("rank", new RankObjects(), parallelism)
  .fieldsGrouping("objects", new Fields("value"));
builder.setBolt("merge", new MergeObjects())
  .globalGrouping("rank");

这个方法之所以可行是因为第一个 bolt 的域分组操作确保了每个小分区在语义上的正确性。你可以在 storm-starter 里看到使用这个模式的一个例子。

当然,如果待处理的数据集存在较严重的数据倾斜,那么还是应该使用 partialKeyGrouping 来代替 fieldsGrouping,因为 partialKeyGrouping 可以通过两个下游 bolt 分散每个 key 的负载。

builder.setBolt("count", new CountObjects(), parallelism)
  .partialKeyGrouping("objects", new Fields("value"));
builder.setBolt("rank" new AggregateCountsAndRank(), parallelism)
  .fieldsGrouping("count", new Fields("key"))
builder.setBolt("merge", new MergeRanksObjects())
  .globalGrouping("rank");

这个拓扑中需要一个中间层来聚合来自上游 bolt 数据流的分区计数结果,但这一层仅仅会做一个简单的聚合处理,这样 bolt 就不会受到由于数据倾斜带来的负载压力。你可以在 storm-starter 中看到使用这个模式的一个例子。

支持 LRU 的 TimeCacheMap

有时候你可能会需要一个能够保留“活跃的”数据并且能够使得超时的“非活跃的”数据自动失效的缓存。TimeCacheMap 是一个可以高效地实现此功能的数据结构。它还提供了一个钩子用于实现在数据失效后的回调操作。

用于分布式 RPC 的 CoordinatedBolt 与 KeyedFairBolt

在构建 Storm 上层的分布式 RPC 应用时,通常会用到两种常用的模式。现在这两种模式已经被封装为 CoordinatedBoltKeyedFairBolt,并且已经加入了 Storm 标准库中。

CoordinatedBolt 将你的处理逻辑 bolt 包装起来,并且在你的 bolt 收到了指定请求的所有 tuple 之后发出通知。CoordinatedBolt 中大量使用了直接数据流组来实现此功能。

KeyedFairBolt 同样包装了你的处理逻辑 bolt,并且可以让你的拓扑同时处理多个 DRPC 调用,而不是每次只执行一个。

时间: 2024-09-25 00:56:00

Apache Storm 官方文档 —— 常用模式的相关文章

Apache Storm 官方文档 —— 本地模式

原文链接    译者:魏勇 本地模式是一种在本地进程中模拟 Storm 集群的工作模式,对于开发和测试拓扑很有帮助.在本地模式下运行拓扑与在集群模式下运行拓扑的方式很相似. 创建一个进程内的"集群"只需要使用 LocalCluster 类即可,例如: import backtype.storm.LocalCluster; LocalCluster cluster = new LocalCluster(); 随后,你就可以使用 LocalCluster 中的 submitTopology

Apache Storm 官方文档中文版

原文链接    译者:魏勇 About 本项目是 Apache Storm 官方文档的中文翻译版,致力于为有实时流计算项目需求和对 Apache Storm 感兴趣的同学提供有价值的中文资料,希望能够对大家的工作和学习有所帮助. 虽然 Storm 的正式推出已经有好几个年头了,发行版也已经到了 0.10.x,但是目前网络上靠谱的学习资料仍然不多,很多比较有价值的资料都过时了(甚至官方网站自己的资料都没有及时更新,这大概也是发展太快的社区的通病),而较新的资料大多比较零碎,在关键内容的描述上也有些

Apache Storm 官方文档 —— 内部技术实现

这部分的 wiki 是为了说明 Storm 是怎样实现的.在阅读本章之前你需要先了解怎样使用 Storm. 代码库架构 拓扑的生命周期1 消息传递的实现1 Ack 框架的实现 Metrics 事务型拓扑的工作机制1 单元测试2 时间模拟 完整的拓扑 集群跟踪 说明 1 该文内容已过期.2 该文官方文档暂未提供. 转载自 并发编程网 - ifeve.com

Apache Storm 官方文档 —— 配置开发环境

本文详细讲解了配置 Storm 开发环境的相关信息.简单地说,配置过程包含以下几个步骤: 下载 Storm 发行版,将其解压缩并复制到你的 PATH 环境变量的 bin 目录中(也可以根据需要自定义安装目录 -- 译者注): 如果需要在远程集群中运行拓扑,则需要在 ~/.storm/storm.yaml 文件中配置好集群的相关信息. 上述几步的详细内容如下. 什么是开发环境? Storm 包含两种操作模式:本地模式与远程模式(即集群模式 -- 译者注).在本地模式下,你可以在本地机器上的一个进程

Apache Storm 官方文档 —— 使用 Maven 构建 Storm 应用

在开发拓扑的时候,你需要在 classpath 中包含 Storm 的相关 jar 包.你可以将各个 jar 包直接包含到你的项目的 classpath 中,也可以使用 Maven 将 Storm 添加到依赖项中.Storm 已经集成到 Maven 的中心仓库中.你可以在项目的 pom.xml 中添加以下依赖来将 Storm 包含进项目中: <dependency> <groupId>org.apache.storm</groupId> <artifactId&g

Apache Storm 官方文档 —— Storm 集群安装配置

原文链接    译者:魏勇 本文详细介绍了 Storm 集群的安装配置方法.如果需要在 AWS 上安装 Storm,你应该先了解一下 storm-deploy 项目.storm-deploy 可以自动完成 E2 上 Storm 集群的准备.配置.安装的全部过程,同时还设置好了 Ganglia,方便监控 CPU.磁盘以及网络的使用信息. 如果你在使用 Storm 集群时遇到问题,请先查看"问题与解决"一文中是否已有相应的解决方案.如果检索不到有效的解决方法,请向社区的邮件列表发送关于问题

Apache Storm 官方文档 —— 在生产环境中运行拓扑

在生产环境集群中运行拓扑的方式与本地模式非常相似,主要包括以下几个步骤: 1) 定义拓扑(如果使用 Java 进行开发就可以使用 TopologyBuilder) 2) 使用 StormSubmitter 向集群提交拓扑.StormSubmitter 接收拓扑名称.拓扑配置信息以及拓扑对象本身作为参数,如下所示: Config conf = new Config(); conf.setNumWorkers(20); conf.setMaxSpoutPending(5000); StormSubm

Apache Storm 官方文档 —— Trident API 概述

Trident 的核心数据模型是"流"(Stream),不过与普通的拓扑不同的是,这里的流是作为一连串 batch 来处理的.流是分布在集群中的不同节点上运行的,并且对流的操作也是在流的各个 partition 上并行运行的. Trident 中有 5 类操作: 针对每个小分区(partition)的本地操作,这类操作不会产生网络数据传输: 针对一个数据流的重新分区操作,这类操作不会改变数据流中的内容,但是会产生一定的网络传输: 通过网络数据传输进行的聚合操作: 针对数据流的分组操作:

Apache Storm 官方文档 —— 源码组织结构

原文链接    译者:魏勇 Strom 的代码有三个层次: 第一,Storm 在一开始就是按照兼容多语言的目的来设计的.Nimbus 是一个 Thrift 服务,拓扑也被定义为 Thrift 架构.Thrift 的使用使得 Storm 可以用于任何一种语言. 第二,所有的 Storm 接口都设计为 Java 接口.所以,尽管 Storm 核心代码中有大量的 Clojure 实现,所有的访问都必须经过 Java API.这就意味着 Storm 的每个特性都可以通过 Java 来实现. 第三,Sto