Storm实时计算:流操作入门编程实践

Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易。下面,简单介绍编程实践过程中需要理解的Storm中的几个概念:

  • Topology

Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排、容纳一组计算逻辑组件(Spout、Bolt)的对象(Hadoop MapReduce中一个Job包含一组Map Task、Reduce Task),这一组计算组件可以按照DAG图的方式编排起来(通过选择Stream Groupings来控制数据流分发流向),从而组合成一个计算逻辑更加负责的对象,那就是Topology。一个Topology运行以后就不能停止,它会无限地运行下去,除非手动干预(显式执行bin/storm kill )或意外故障(如停机、整个Storm集群挂掉)让它终止。

  • Spout

Storm中Spout是一个Topology的消息生产的源头,Spout应该是一个持续不断生产消息的组件,例如,它可以是一个Socket Server在监听外部Client连接并发送消息,可以是一个消息队列(MQ)的消费者、可以是用来接收Flume Agent的Sink所发送消息的服务,等等。Spout生产的消息在Storm中被抽象为Tuple,在整个Topology的多个计算组件之间都是根据需要抽象构建的Tuple消息来进行连接,从而形成流。

  • Bolt

Storm中消息的处理逻辑被封装到Bolt组件中,任何处理逻辑都可以在Bolt里面执行,处理过程和普通计算应用程序没什么区别,只是需要根据Storm的计算语义来合理设置一下组件之间消息流的声明、分发、连接即可。Bolt可以接收来自一个或多个Spout的Tuple消息,也可以来自多个其它Bolt的Tuple消息,也可能是Spout和其它Bolt组合发送的Tuple消息。

  • Stream Grouping

Storm中用来定义各个计算组件(Spout、Bolt)之间流的连接、分组、分发关系。Storm定义了如下7种分发策略:Shuffle Grouping(随机分组)、Fields Grouping(按字段分组)、All Grouping(广播分组)、Global Grouping(全局分组)、Non Grouping(不分组)、Direct Grouping(直接分组)、Local or Shuffle Grouping(本地/随机分组),各种策略的具体含义可以参考Storm官方文档、比较容易理解。

下面,作为入门实践,我们简单介绍几种开发中常用的流操作处理方式的实现:

Storm组件简单串行

这种方式是最简单最直观的,只要我们将Storm的组件(Spout、Bolt)串行起来即可实现,只需要了解编写这些组件的基本方法即可。在实际应用中,如果我们需要从某一个数据源连续地接收消息,然后顺序地处理每一个请求,就可以使用这种串行方式来处理。如果说处理单元的逻辑非常复杂,那么就需要处理逻辑进行分离,属于同一类操作的逻辑封装到一个处理组件中,做到各个组件之间弱耦合(除了定义Field的schema外,只通过发送消息来连接各个组件)。
下面,我实现一个简单的WordCount的例子,各个组件之间的连接方式,如下图所示:

ProduceRecordSpout类是一个Spout组件,用来产生消息,我们这里模拟发送一些英文句子,实际应用中可以指定任何数据源,如数据库、消息中间件、Socket连接、RPC调用等等。ProduceRecordSpout类代码如下所示:

01 public static class ProduceRecordSpout extends BaseRichSpout {
02
03 private static final long serialVersionUID = 1L;
04 private static final Log LOG = LogFactory.getLog(ProduceRecordSpout.class);
05 private SpoutOutputCollector collector;
06 private Random random;
07 private String[] records;
08
09 public ProduceRecordSpout(String[] records) {
10 this.records = records;
11 }
12
13 @Override
14 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
15 this.collector = collector;
16 random = new Random();
17 }
18
19 @Override
20 public void nextTuple() {
21 Utils.sleep(500);
22 String record = records[random.nextInt(records.length)];
23 List<Object> values = new Values(record);
24 collector.emit(values, values);
25 LOG.info("Record emitted: record=" + record);
26 }
27
28 @Override
29 public void declareOutputFields(OutputFieldsDeclarer declarer) {
30 declarer.declare(new Fields("record"));
31 }
32 }

构造一个ProduceRecordSpout对象时,传入一个字符串数组,然后随机地选择其中一个句子,emit到下游(Downstream)的WordSplitterBolt组件,只声明了一个Field,WordSplitterBolt组件可以根据声明的Field,接收到emit的消息,WordSplitterBolt类代码实现如下所示:

01 public static class WordSplitterBolt extends BaseRichBolt {
02
03 private static final long serialVersionUID = 1L;
04 private static final Log LOG = LogFactory.getLog(WordSplitterBolt.class);
05 private OutputCollector collector;
06
07 @Override
08 public void prepare(Map stormConf, TopologyContext context,
09 OutputCollector collector) {
10 this.collector = collector;
11 }
12
13 @Override
14 public void execute(Tuple input) {
15 String record = input.getString(0);
16 if(record != null && !record.trim().isEmpty()) {
17 for(String word : record.split("\\s+")) {
18 collector.emit(input, new Values(word, 1));
19 LOG.info("Emitted: word=" + word);
20 collector.ack(input);
21 }
22 }
23 }
24
25 @Override
26 public void declareOutputFields(OutputFieldsDeclarer declarer) {
27 declarer.declare(new Fields("word", "count"));
28 }
29
30 }

在execute方法中,传入的参数是一个Tuple,该Tuple就包含了上游(Upstream)组件ProduceRecordSpout所emit的数据,直接取出数据进行处理。上面代码中,我们将取出的数据,按照空格进行的split,得到一个一个的单词,然后在emit到下一个组件,声明的输出schema为2个Field:word和count,当然这里面count的值都为1。
进行统计词频的组件为WordCounterBolt,实现代码如下所示:

01 public static class WordCounterBolt extends BaseRichBolt {
02
03 private static final long serialVersionUID = 1L;
04 private static final Log LOG = LogFactory.getLog(WordCounterBolt.class);
05 private OutputCollector collector;
06 private final Map<String, AtomicInteger> counterMap = Maps.newHashMap();
07
08 @Override
09 public void prepare(Map stormConf, TopologyContext context,
10 OutputCollector collector) {
11 this.collector = collector;
12 }
13
14 @Override
15 public void execute(Tuple input) {
16 String word = input.getString(0);
17 int count = input.getIntegerByField("count"); // 通过Field名称取出对应字段的数据
18 AtomicInteger ai = counterMap.get(word);
19 if(ai == null) {
20 ai = new AtomicInteger(0);
21 counterMap.put(word, ai);
22 }
23 ai.addAndGet(count);
24 LOG.info("DEBUG: word=" + word + ", count=" + ai.get());
25 collector.ack(input);
26 }
27
28 @Override
29 public void declareOutputFields(OutputFieldsDeclarer declarer) {
30 }
31
32 @Override
33 public void cleanup() {
34 // print count results
35 LOG.info("Word count results:");
36 for(Entry<String, AtomicInteger> entry : counterMap.entrySet()) {
37 LOG.info("\tword=" + entry.getKey() + ", count=" + entry.getValue().get());
38 }
39 }
40
41 }

上面代码通过一个Map来对每个单词出现的频率进行累加计数,比较简单。因为该组件是Topology的最后一个组件,所以不需要在declareOutputFields方法中声明Field的Schema,而是在cleanup方法中输出最终的结果,只有在该组件结束任务退出时才会调用cleanup方法输出。
最后,需要基于上面的3个组件来创建一个Topology实例,提交到Storm集群去运行,配置代码如下所示:

01 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
02 // configure & build topology
03 TopologyBuilder builder = new TopologyBuilder();
04 String[] records = new String[] {
05 "A Storm cluster is superficially similar to a Hadoop cluster",
06 "All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster",
07 "The core abstraction in Storm is the stream"
08 };
09 builder
10 .setSpout("spout-producer", new ProduceRecordSpout(records), 1)
11 .setNumTasks(3);
12 builder
13 .setBolt("bolt-splitter", new WordSplitterBolt(), 2)
14 .shuffleGrouping("spout-producer")
15 .setNumTasks(2);
16 builder.setBolt("bolt-counter", new WordCounterBolt(), 1)
17 .fieldsGrouping("bolt-splitter", new Fields("word"))
18 .setNumTasks(2);
19
20 // submit topology
21 Config conf = new Config();
22 String name = WordCountTopology.class.getSimpleName();
23 if (args != null && args.length > 0) {
24 String nimbus = args[0];
25 conf.put(Config.NIMBUS_HOST, nimbus);
26 conf.setNumWorkers(2);
27 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
28 } else {
29 LocalCluster cluster = new LocalCluster();
30 cluster.submitTopology(name, conf, builder.createTopology());
31 Thread.sleep(60000);
32 cluster.shutdown();
33 }
34 }

上面通过TopologyBuilder来配置组成一个Topology的多个组件(Spout或Bolt),然后通过调用createTopology()方法创建一个Topology实例。上面方法中,对应着2种运行模式:如果没有传递任何参数,则是使用LocalCluster来运行,适合本地调试代码;如果传递一个Topology名称作为参数,则是在真实的Storm集群上运行,需要对实现的Topology代码进行编译打包,通过StormSubmitter提交到集群上作为服务运行。

Storm组合多种流操作

Storm支持流聚合操作,将多个组件emit的数据,汇聚到同一个处理组件来统一处理,可以实现对多个Spout组件通过流聚合到一个Bolt组件(Sout到Bolt的多对一、多对多操作),也可以实现对多个Bolt通过流聚合到另一个Bolt组件(Bolt到Bolt的多对一、多对多操作)。实际,这里面有两种主要的操作,一种是类似工作流中的fork,另一种是类似工作流中的join。下面,我们实现一个例子来演示如何使用,实时流处理逻辑如下图所示:



上图所描述的实时流处理流程,我们期望能够按照如下流程进行处理:

  • 存在3类数据:数字字符串(NUM)、字母字符串(STR)、特殊符号字符串(SIG)
  • 每个ProduceRecordSpout负责处理上面提到的3类数据
  • 所有数据都是字符串,字符串中含有空格,3种类型的ProduceRecordSpout所emit的数据都需要被相同的逻辑处理:根据空格来拆分字符串
  • 一个用来分发单词的组件DistributeWordByTypeBolt能够接收到所有的单词(包含类型信息),统一将每类单词分别分发到指定的一个用来存储数据的组件
  • SaveDataBolt用来存储处理过的单词,对于不同类型单词具有不同的存储逻辑,需要设置3类SaveDataBolt

将Spout分为3类,每一个Spout发射不同类型的字符串,这里定义了一个Type常量类来区分这三种类型:

1 interface Type {
2 String NUMBER = "NUMBER";
3 String STRING = "STRING";
4 String SIGN = "SIGN";
5 }

首先看一下,我们实现的Topology是如何进行创建的,代码如下所示:

01 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
02
03 // configure & build topology
04 TopologyBuilder builder = new TopologyBuilder();
05
06 // configure 3 spouts
07 builder.setSpout("spout-number", new ProduceRecordSpout(Type.NUMBER, new String[] {"111 222 333", "80966 31"}), 1);
08 builder.setSpout("spout-string", new ProduceRecordSpout(Type.STRING, new String[] {"abc ddd fasko", "hello the word"}), 1);
09 builder.setSpout("spout-sign", new ProduceRecordSpout(Type.SIGN, new String[] {"++ -*% *** @@", "{+-} ^#######"}), 1);
10
11 // configure splitter bolt
12 builder.setBolt("bolt-splitter", new SplitRecordBolt(), 2)
13 .shuffleGrouping("spout-number")
14 .shuffleGrouping("spout-string")
15 .shuffleGrouping("spout-sign");
16
17 // configure distributor bolt
18 builder.setBolt("bolt-distributor", new DistributeWordByTypeBolt(), 6)
19 .fieldsGrouping("bolt-splitter", new Fields("type"));
20
21 // configure 3 saver bolts
22 builder.setBolt("bolt-number-saver", new SaveDataBolt(Type.NUMBER), 3)
23 .shuffleGrouping("bolt-distributor", "stream-number-saver");
24 builder.setBolt("bolt-string-saver", new SaveDataBolt(Type.STRING), 3)
25 .shuffleGrouping("bolt-distributor", "stream-string-saver");
26 builder.setBolt("bolt-sign-saver", new SaveDataBolt(Type.SIGN), 3)
27 .shuffleGrouping("bolt-distributor", "stream-sign-saver");
28
29 // submit topology
30 Config conf = new Config();
31 String name = MultiStreamsWordDistributionTopology.class.getSimpleName();
32 if (args != null && args.length > 0) {
33 String nimbus = args[0];
34 conf.put(Config.NIMBUS_HOST, nimbus);
35 conf.setNumWorkers(3);
36 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
37 } else {
38 LocalCluster cluster = new LocalCluster();
39 cluster.submitTopology(name, conf, builder.createTopology());
40 Thread.sleep(60 * 60 * 1000);
41 cluster.shutdown();
42 }
43 }

一个SplitRecordBolt组件从3个不同类型的ProduceRecordSpout接收数据,这是一个多Spout流聚合。SplitRecordBolt将处理后的数据发送给DistributeWordByTypeBolt组件,然后根据收到的数据的类型进行一个分发处理,这里用了fieldsGrouping操作,也就是SplitRecordBolt发送的数据会按照类型发送到不同的DistributeWordByTypeBolt任务(Task),每个Task收到的一定是同一个类型的数据,如果直接使用shuffleGrouping操作也没有问题,只不过每个Task可能收到任何类型的数据,在DistributeWordByTypeBolt内部进行流向控制。DistributeWordByTypeBolt组件中定义了多个stream,根据类型来分组发送给不同类型的SaveDataBolt组件。
下面看每个组件的实现:

  • ProduceRecordSpout组件

通过我们定义的一个ProduceRecordSpout类,可以创建3个不同的ProduceRecordSpout实例,每个实例负责生产特定类型的数据,实现代码如下所示:

01 public static class ProduceRecordSpout extends BaseRichSpout {
02
03 private static final long serialVersionUID = 1L;
04 private static final Log LOG = LogFactory.getLog(ProduceRecordSpout.class);
05 private SpoutOutputCollector collector;
06 private Random rand;
07 private String[] recordLines;
08 private String type;
09
10 public ProduceRecordSpout(String type, String[] lines) {
11 this.type = type;
12 recordLines = lines;
13 }
14
15 @Override
16 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
17 this.collector = collector;
18 rand = new Random();
19 }
20
21
22 @Override
23 public void nextTuple() {
24 Utils.sleep(500);
25 String record = recordLines[rand.nextInt(recordLines.length)];
26 List<Object> values = new Values(type, record);
27 collector.emit(values, values);
28 LOG.info("Record emitted: type=" + type + ", record=" + record);
29 }
30
31 @Override
32 public void declareOutputFields(OutputFieldsDeclarer declarer) {
33 declarer.declare(new Fields("type", "record"));
34 }
35 }

这比较简单,根据传递的参数来创建上图中的3个Spout实例。

  • SplitRecordBolt组件

由于前面3个ProduceRecordSpout产生的数据,在开始时的处理逻辑是相同的,所以可以将3个ProduceRecordSpout聚合到一个包含通用逻辑的SplitRecordBolt组件,实现如下所示:

01 public static class SplitRecordBolt extends BaseRichBolt {
02
03 private static final long serialVersionUID = 1L;
04 private static final Log LOG = LogFactory.getLog(SplitRecordBolt.class);
05 private OutputCollector collector;
06
07 @Override
08 public void prepare(Map stormConf, TopologyContext context,
09 OutputCollector collector) {
10 this.collector = collector;
11 }
12
13 @Override
14 public void execute(Tuple input) {
15 String type = input.getString(0);
16 String line = input.getString(1);
17 if(line != null && !line.trim().isEmpty()) {
18 for(String word : line.split("\\s+")) {
19 collector.emit(input, new Values(type, word));
20 LOG.info("Word emitted: type=" + type + ", word=" + word);
21 // ack tuple
22 collector.ack(input);
23 }
24 }
25 }
26
27 @Override
28 public void declareOutputFields(OutputFieldsDeclarer declarer) {
29 declarer.declare(new Fields("type", "word"));
30 }
31 }

无论接收到的Tuple是什么类型(STRING、NUMBER、SIGN)的数据,都进行split,然后在emit的时候,仍然将类型信息传递给下一个Bolt组件。

  • DistributeWordByTypeBolt组件

DistributeWordByTypeBolt组件只是用来分发Tuple,通过定义Stream,将接收到的Tuple发送到指定的下游Bolt组件进行处理。通过SplitRecordBolt组件emit的Tuple包含了类型信息,所以在DistributeWordByTypeBolt中根据类型来进行分发,代码实现如下:

01 public static class DistributeWordByTypeBolt extends BaseRichBolt {
02
03 private static final long serialVersionUID = 1L;
04 private static final Log LOG = LogFactory.getLog(DistributeWordByTypeBolt.class);
05 private OutputCollector collector;
06
07 @Override
08 public void prepare(Map stormConf, TopologyContext context,
09 OutputCollector collector) {
10 this.collector = collector;
11 Map<GlobalStreamId, Grouping> sources = context.getThisSources();
12 LOG.info("sources==> " + sources);
13 }
14
15 @Override
16 public void execute(Tuple input) {
17 String type = input.getString(0);
18 String word = input.getString(1);
19 switch(type) {
20 case Type.NUMBER:
21 emit("stream-number-saver", type, input, word);
22 break;
23 case Type.STRING:
24 emit("stream-string-saver", type, input, word);
25 break;
26 case Type.SIGN:
27 emit("stream-sign-saver", type, input, word);
28 break;
29 default:
30 // if unknown type, record is discarded.
31 // as needed, you can define a bolt to subscribe the stream 'stream-discarder'.
32 emit("stream-discarder", type, input, word);
33 }
34 // ack tuple
35 collector.ack(input);
36 }
37
38 private void emit(String streamId, String type, Tuple input, String word) {
39 collector.emit(streamId, input, new Values(type, word));
40 LOG.info("Distribution, typed word emitted: type=" + type + ", word=" + word);
41 }
42
43 @Override
44 public void declareOutputFields(OutputFieldsDeclarer declarer) {
45 declarer.declareStream("stream-number-saver", new Fields("type", "word"));
46 declarer.declareStream("stream-string-saver", new Fields("type", "word"));
47 declarer.declareStream("stream-sign-saver", new Fields("type", "word"));
48 declarer.declareStream("stream-discarder", new Fields("type", "word"));
49 }
50 }

实际上,下游的3个Bolt组件(SaveDataBolt)在订阅该流组件(DistributeWordByTypeBolt)的时候,方式相同,只是分发的逻辑交由DistributeWordByTypeBolt来统一控制。
我们在配置该Bolt组件时,使用了fieldsGrouping分组方式,实际每个DistributeWordByTypeBolt只会收到同一种类型的Tuple,这里也可以使用shuffleGrouping分组方式,这种分组方式会有不同类型的Tuple被emit到同一个DistributeWordByTypeBolt组件上。
另外,该Bolt组件中我们还定义了一个名称为stream-discarder的stream,在Topology中并没有满足该stream的条件,可以根据实际情况选择是否实现它。

  • SaveDataBolt组件

最后这个Bolt用来模拟保存处理过的数据内容,代码如下:

01 public static class SaveDataBolt extends BaseRichBolt {
02
03 private static final long serialVersionUID = 1L;
04 private static final Log LOG = LogFactory.getLog(SaveDataBolt.class);
05 private OutputCollector collector;
06
07 private String type;
08
09 public SaveDataBolt(String type) {
10 this.type = type;
11 }
12
13 @Override
14 public void prepare(Map stormConf, TopologyContext context,
15 OutputCollector collector) {
16 this.collector = collector;
17 }
18
19 @Override
20 public void execute(Tuple input) {
21 // just print the received tuple for being waited to persist
22 LOG.info("[" + type + "] " +
23 "SourceComponent=" + input.getSourceComponent() +
24 ", SourceStreamId=" + input.getSourceStreamId() +
25 ", type=" + input.getString(0) +
26 ", value=" + input.getString(1));
27 }
28
29 @Override
30 public void declareOutputFields(OutputFieldsDeclarer declarer) {
31 // do nothing
32 }
33
34 }

在实际应用中,你可能需要将处理过的数据保存到数据库中,就可以在该Bolt中实现存储数据的逻辑。

总结

Storm中最核心的计算组件的抽象就是Spout、Bolt,以及Stream Grouping,其它高级的功能,像Trident、DRPC,他们或者基于这些基础组件以及Streaming Grouping分发策略来实现的,屏蔽了底层的分发计算处理逻辑以更高层的编程抽象面向开发者,减轻了开发人员对底层复杂机制的处理;或者是为了方便使用Storm计算服务而增加的计算机制衍生物,如批量事务处理、RPC等。

时间: 2024-10-31 12:51:07

Storm实时计算:流操作入门编程实践的相关文章

实时计算 流数据处理系统简单分析

一. 实时计算的概念 实时计算一般都是针对海量数据进行的,一般要求为秒级.实时计算主要分为两块:数据的实时入库.数据的实时计算. 主要应用的场景: 1) 数据源是实时的不间断的,要求用户的响应时间也是实时的(比如对于大型网站的流式数据:网站的访问PV/UV.用户访问了什么内容.搜索了什么内容等,实时的数据计算和分析可以动态实时地刷新用户访问数据,展示网站实时流量的变化情况,分析每天各小时的流量和用户分布情况) 2) 数据量大且无法或没必要预算,但要求对用户的响应时间是实时的.比如说: 昨天来自每

基于HBase做Storm 实时计算指标存储

基于 HBase 做 Storm 实时计算指标存储 HBase 实时指标存储是我入职乐视云后对原有的实时系统改造的 一部分.部分分享内容其实还处于实施阶段.架构方案设计的话应该是仁者见仁智者见智,也会有很多考虑不周的地方,欢迎大家批评指正.说不定大家听完分享后好的提议我们会用到工程上,也为后面的实际课程做好准备. HBase 存储设计 Storm 结果如何存储到 HBase HBase 写入性能优化 与传统方案 (Redis/MySQL) 对比 乐视云内部用 Storm 做 CDN,点播,直播流

(课程)基于HBase做Storm 实时计算指标存储

Hi,大家好!我是祝威廉,本来微博也想叫祝威廉的,可惜被人占了,于是改名叫·祝威廉二世.然后总感觉哪里不对.目前在乐视云数据部门里从事实时计算,数据平台.搜索和推荐等多个方向.曾从事基础框架,搜索研发四年,大数据平台架构.推荐三年多,个人时间现专注于集群自动化部署,服务管理,资源自动化调度等方向. 这次探讨的主题是: 基于 HBase 做 Storm 实时计算指标存储 HBase 实时指标存储是我入职乐视云后对原有的实时系统改造的一部分.部分分享内容其实还处于实施阶段.架构方案设计的话应该是仁者

Akka入门编程实践

Akka是使用Scala语言开发一个编程库,基于事件驱动的架构实现异步处理,它能够简化编写分布式应用程序.Akka中最核心的概念是Actor模型,它为编写分布式/并行计算应用程序提供了高层次抽象,在实际编程实践中,开发人员可以从对复杂网络通信细节的处理.多线程应用场景下对锁的管理中解脱出来. Akka能够给应用程序带来的几个重要的特性是: 容错性 可伸缩性 异步性 事件驱动架构(EDA) 远程透明性 Actor是Akka中最核心的组件,以至于我们在编写基于Akka的应用程序时,大部分时间都会和A

storm消费kafka实现实时计算

大致架构 * 每个应用实例部署一个日志agent * agent实时将日志发送到kafka * storm实时计算日志 * storm计算结果保存到hbase storm消费kafka 创建实时计算项目并引入storm和kafka相关的依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming.Spark SQL.MLlib.GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑.这也得益于Scala编程语言的简洁性.这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算. 我们的应用场景是分析用户使用手机App的行为,描述如下所示: 手机客户端会收集用户的行为事件(我们以点击

百分点实时计算实践:架构和算法

当今时代,数据不再昂贵,但从海量数据中获取价值变得昂贵,而要及时获取价值则更加昂贵,这正是大数据实时计算越来越流行的原因.以百分点公司为例,在高峰期每秒钟会有近万HTTP请求发送到百分点服务器上,这些请求包含了用户行为和个性化推荐请求.如何从这些数据中快速挖掘用户兴趣偏好并作出效果不错的推荐呢?这是百分点推荐引擎面临的首要问题.本文将从系统架构和算法两方面全介绍百分点公司在实时计算方面的经验和心得体会,供读者参考. a) 实时计算架构 图 1百分点大数据平台原理示意图 工欲善其事,必先利其器.一

阿里流计算平台开发实例之电商双11实时计算

由于之前没写过博客之类的文章,所以这次写也是心中揣揣,也是由于这个项目间没有找到相关的一些文档,当时就想着完成后写一个出来,如果有写的不周到的地方,请联系我改正,谢谢. 一. 项目案例 用户商业模式含盖电商零售与加盟店批发零售,本次主要业务需求在于淘宝双11期间能实时计算用户所关注的一些指标数据,如:订单数.订单金额.商品SKU数.订单来源地.商品排名等等. 基于这些指标需求,除了要达到实时的要求以外,还需要具备适当的展现图设计,本次使用的是阿里云的DATAV,提供饼状图占比分析.商品与类目数据

《Storm分布式实时计算模式》——2.3 在Linux上安装Storm

2.3 在Linux上安装Storm Storm是设计运行在Unix兼容的操作系统上.但在0.9.1版本,它也支持在Windows机器上部署. 为了简化部署,我们使用Ubuntu 12.04LTS的发行版作为安装服务器.将会使用服务器版本,默认不包括图形界面接口,因为我们用不到..在实体机和虚拟机上安装ubuntu都是非常方便的.出于学习和开发的目的,你会发现在虚拟机里进行部署更加方便,尤其是手头没有那么多实体机的情况. OSX.Linux.Windows都有着对应的虚拟机软件.我们建议从下面集