Storm Topology的并发度

Understanding the parallelism of a Storm topology

https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology

 

概念

一个Topology可以包含一个或多个worker(并行的跑在不同的machine上), 所以worker process就是执行一个topology的子集, 并且worker只能对应于一个topology

一个worker可用包含一个或多个executor, 每个component (spout或bolt)至少对应于一个executor, 所以可以说executor执行一个compenent的子集, 同时一个executor只能对应于一个component

Task就是具体的处理逻辑对象, 一个executor线程可以执行一个或多个tasks 
但一般默认每个executor只执行一个task, 所以我们往往认为task就是执行线程, 其实不然 
task代表最大并发度, 一个component的task数是不会改变的, 但是一个componet的executer数目是会发生变化的 
当task数大于executor数时, executor数代表实际并发数 

worker process executes a subset of a topology. 
A worker process belongs to a specific topology and may run one or more executors for one or more components (spouts or bolts) of this topology. 
A running topology consists of many such processes running on many machines within a Storm cluster.

An executor is a thread that is spawned by a worker process. It may run one or more tasks for the same component (spout or bolt).

task performs the actual data processing — each spout or bolt that you implement in your code executes as many tasks across the cluster. 
The number of tasks for a component is always the same throughout the lifetime of a topology, but the number of executors (threads) for a component can change over time. This means that the following condition holds true: #threads ≤ #tasks
By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per thread.

 

Configuring the parallelism of a topology, 并发度的配置

The following sections give an overview of the various configuration options and how to set them in your code. There is more than one way of setting these options though, and the table lists only some of them.

Storm currently has the following order of precedence for configuration settings:

defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration

 

对于并发度的配置, 在storm里面可以在多个地方进行配置, 优先级如上面所示... 
具体包含,

worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大于machines的数目

executor的数目, component的并发线程数,只能在代码中配置(通过setBolt和setSpout的参数), 例如, setBolt("green-bolt", new GreenBolt(), 2)

tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置

Number of worker processes

  • Description: How many worker processes to create for the topology across machines in the cluster.
  • Configuration option: TOPOLOGY_WORKERS
  • How to set in your code (examples):

Number of executors (threads)

  • Description: How many executors to spawn per component.
  • Configuration option: ?
  • How to set in your code (examples):

Number of tasks

Here is an example code snippet to show these settings in practice:

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout);

In the above code we configured Storm to run the bolt GreenBolt with an initial number of two executors and four associated tasks. Storm will run two tasks per executor (thread). If you do not explicitly configure the number of tasks, Storm will run by default one task per executor.

 

Example of a running topology

The following illustration shows how a simple topology would look like in operation. 
The topology consists of three components: one spout called BlueSpout and two bolts called GreenBolt and YellowBolt
The components are linked such that BlueSpout sends its output to GreenBolt, which in turns sends its own output toYellowBolt.

 

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)                   //set tasks number to 4
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );

图和代码, 很清晰, 通过setBolt和setSpout一共定义2+2+6=10个executor threads 
并且同setNumWorkers设置2个workers, 所以storm会平均在每个worker上run 5个executors 
而对于green-bolt, 定义了4个tasks, 所以每个executor中有2个tasks

 

How to change the parallelism of a running topology, 动态的改变并发度

Storm支持在不restart topology的情况下, 动态的改变(增减)worker processes的数目和executors的数目, 称为rebalancing. 
通过Storm web UI, 或者通过storm rebalance命令, 见下面的例子

A nifty feature of Storm is that you can increase or decrease the number of worker processes and/or executors without being required to restart the cluster or the topology. The act of doing so is called rebalancing.

You have two options to rebalance a topology:

  1. Use the Storm web UI to rebalance the topology.
  2. Use the CLI tool storm rebalance as described below.

Here is an example of using the CLI tool:

# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

本文章摘自博客园,原文发布日期:2013-05-04

时间: 2024-09-20 05:54:13

Storm Topology的并发度的相关文章

《Storm分布式实时计算模式》——第1章 分布式单词计数1.1 Storm topology的组成部分——stream、spout和bolt

第1章 分布式单词计数 本章将介绍使用Storm建立一个分布式流式计算应用时涉及的核心概念.我们通过建立一个简单的计数器程序实现这个目的.计数器将持续输入的一句句话作为输入流,统计其中单词出现的次数.单词计数这个例子浅显易懂,引入了多种数据结构.技术和设计模式.这些都是实现更复杂计算所必须的基础. 本章首先概要介绍Storm的数据结构,然后实现一个完整Storm程序所需的各个组成部分.读完本章,读者将会了解Storm计算的基本结构.搭建开发环境的方法.Storm程序的开发和调试技术. 本章包括以

UNIX环境高级编程:线程属性之并发度

并发度控制着用户级线程可以映射的内核线程或进程的数目.如果操作系统的实现在内核级的线程和用户级的线程之间保持一对一的映射,那么改变并发度并不会有什么效果,因为所有的用户级线程都可能被调度到.但是,如果操作系统的实现让用户级线程到内核级线程或进程之间的映射关系是多对一的话,那么在给定时间内增加可运行的用户级线程数,可能会改善性能. pthread_setconcurrency函数可以用于提示系统,表明希望的并发度. #include <pthread.h> int pthread_getconc

性能测试中“并发度”的意义

之前的文章中曾出现过"并发度"这个概念,这个词不知道是不是我原创,它意在表达"并发"的可能性,是压力的一种度量.一些同学可能还没有理解这个概念的意义,下面我们看看它是怎么来-- 看过之前文章的同学应该知道,我将"并发"这个容易产生误解的词拆分成了"相对并发"和"绝对并发".为什么这么做呢?那是因为"绝对并发"说的是同一时刻发生的事情,这通常是我们无法观测和衡量的.而"相对并发&

Storm 简介

https://github.com/nathanmarz/storm/wiki/Documentation   安装和配置 Storm的安装比较简单, 下载storm的release版本, 解压, 并且把bin/目录加到环境变量PATH里面去, 就ok了. 参考配置storm开发环境  当然为了运行Storm, 需要装一些其他的依赖的包, 可以参考Twitter Storm 安装实战 Storm支持单机调试模式, 所以现在如果你已经有包含topology的jar包, 就可以直接运行单机模式来进

《Storm分布式实时计算模式》——1.4 Storm的并发机制

1.4 Storm的并发机制 在Storm的间接中提到过,Storm计算支持在多台机器上水平扩容,通过将计算切分为多个独立的tasks在集群上并发执行来实现.在Storm中,一个task可以简单地理解为在集群某节点上运行的一个spout或者bolt实例.为了理解storm的并发机制是如何运行的,我们先来解释下在集群中运行的topology的四个主要组成部分: Nodes(服务器):指配置在一个Storm集群中的服务器,会执行topology的一部分运算.一个Storm集群可以包括一个或者多个工作

《Storm分布式实时计算模式》——1.5 理解数据流分组

1.5 理解数据流分组 看了前面的例子,你会纳闷为什么没有增加ReportBolt的并发度.答案是,这样做没有任何意义.为了理解其中的原因,需要了解Storm中数据流分组的概念.数据流分组定义了一个数据流中的tuple如何分发给topology中不同bolt的task.举例说明,在并发版本的单词计数topology中,SplitSentenceBolt类指派了四个task.数据流分组决定了指定的一个tuple会分发到哪个task上.Storm定义了七种内置数据流分组的方式: Shuffle gr

《Storm分布式实时计算模式》——1.3 实现单词计数topology

1.3 实现单词计数topology 前面介绍了Storm的基础概念,我们已经准备好实现一个简单的应用.现在开始着手开发一个Storm topology,并且在本地模式执行.Storm本地模式会在一个JVM实例中模拟出一个Storm集群.大大简化了用户在开发环境或者IDE中进行开发和调试.后续章节将会演示如何将本地模式下开发好的topology部署到真实的Storm集群环境.1.3.1 配置开发环境 新建一个Storm项目其实就是将Storm及其依赖的类库添加到Java classpath中.在

三:Storm设计一个Topology用来统计单词的TopN的实例

Storm的单词统计设计 一:Storm的wordCount和Hadoop的wordCount实例对比   二:Storm的wordCount的方案实例设计   三:建立maven项目,添加maven相关依赖包(1)输入:search.maven.org网址,在其中找到storm的核心依赖(2)将核心依赖添加到pom.xml文件中          <dependency>            <groupId>com.github.aloomaio</groupId>

《Storm实时数据处理》一1.4 创建“Hello World”Topology

1.4 创建"Hello World"Topology "Hello World" Topology和其他所有的"Hello World"应用程序一样,并没有什么实际用途,其目的在于说明一些基本概念."Hello World" Topology结构将演示如何创建一个包含简单的Spout和Bolt的项目,如何构建项目,并在本地集群模式下运行项目. 1.4.1 实战 Step01 新建一个项目目录,并初始化你的Git代码仓库. S