storm消费kafka实现实时计算

大致架构


* 每个应用实例部署一个日志agent
* agent实时将日志发送到kafka
* storm实时计算日志
* storm计算结果保存到hbase

storm消费kafka

  • 创建实时计算项目并引入storm和kafka相关的依赖
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.0.2</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.0</version>
</dependency>
  • 创建消费kafka的spout,直接用storm提供的KafkaSpout即可。
  • 创建处理从kafka读取数据的Bolt,JsonBolt负责解析kafka读取到的json并发送到下个Bolt进一步处理(下一步处理的Bolt不再写,只要继承BaseRichBolt就可以对tuple处理)。
public class JsonBolt extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory
            .getLogger(JsonBolt.class);

    private Fields fields;
    private OutputCollector collector;

    public JsonBolt() {
        this.fields = new Fields("hostIp", "instanceName", "className",
                "methodName", "createTime", "callTime", "errorCode");
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String spanDataJson = tuple.getString(0);
        LOG.info("source data:{}", spanDataJson);
        Map<String, Object> map = (Map<String, Object>) JSONValue
                .parse(spanDataJson);
        Values values = new Values();
        for (int i = 0, size = this.fields.size(); i < size; i++) {
            values.add(map.get(this.fields.get(i)));
        }
        this.collector.emit(tuple, values);
        this.collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(this.fields);
    }
}
  • 创建拓扑MyTopology,先配置好KafkaSpout的配置SpoutConfig,其中zk的地址端口和根节点,将id为KAFKA_SPOUT_ID的spout通过shuffleGrouping关联到jsonBolt对象。
public class MyTopology {

    private static final String TOPOLOGY_NAME = "SPAN-DATA-TOPOLOGY";
    private static final String KAFKA_SPOUT_ID = "kafka-stream";
    private static final String JsonProject_BOLT_ID = "jsonProject-bolt";

    public static void main(String[] args) throws Exception {
        String zks = "132.122.252.51:2181";
        String topic = "span-data-topic";
        String zkRoot = "/kafka-storm";
        BrokerHosts brokerHosts = new ZkHosts(zks);
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot,
                KAFKA_SPOUT_ID);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConf.zkServers = Arrays.asList(new String[] { "132.122.252.51" });
        spoutConf.zkPort = 2181;
        JsonBolt jsonBolt = new JsonBolt();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConf));
        builder.setBolt(JsonProject_BOLT_ID, jsonBolt).shuffleGrouping(
                KAFKA_SPOUT_ID);

        Config config = new Config();
        config.setNumWorkers(1);
        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(TOPOLOGY_NAME, config,
                    builder.createTopology());
            Utils.waitForSeconds(100);
            cluster.killTopology(TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            StormSubmitter.submitTopology(args[0], config,
                    builder.createTopology());
        }
    }
}
  • 本地测试时直接不带运行参数运行即可,放到集群是需带拓扑名称作为参数。
  • 另外需要注意的是:KafkaSpout默认从上次运行停止时的位置开始继续消费,即不会从头开始消费一遍,因为KafkaSpout默认每2秒钟会提交一次kafka的offset位置到zk上,如果要每次运行都从头开始消费可以通过配置实现。

========广告时间========

鄙人的新书《Tomcat内核设计剖析》已经在京东销售了,有需要的朋友可以到 https://item.jd.com/12185360.html 进行预定。感谢各位朋友。

为什么写《Tomcat内核设计剖析》

=========================

欢迎关注:

时间: 2024-10-31 12:51:04

storm消费kafka实现实时计算的相关文章

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming.Spark SQL.MLlib.GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑.这也得益于Scala编程语言的简洁性.这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算. 我们的应用场景是分析用户使用手机App的行为,描述如下所示: 手机客户端会收集用户的行为事件(我们以点击

基于HBase做Storm 实时计算指标存储

基于 HBase 做 Storm 实时计算指标存储 HBase 实时指标存储是我入职乐视云后对原有的实时系统改造的 一部分.部分分享内容其实还处于实施阶段.架构方案设计的话应该是仁者见仁智者见智,也会有很多考虑不周的地方,欢迎大家批评指正.说不定大家听完分享后好的提议我们会用到工程上,也为后面的实际课程做好准备. HBase 存储设计 Storm 结果如何存储到 HBase HBase 写入性能优化 与传统方案 (Redis/MySQL) 对比 乐视云内部用 Storm 做 CDN,点播,直播流

(课程)基于HBase做Storm 实时计算指标存储

Hi,大家好!我是祝威廉,本来微博也想叫祝威廉的,可惜被人占了,于是改名叫·祝威廉二世.然后总感觉哪里不对.目前在乐视云数据部门里从事实时计算,数据平台.搜索和推荐等多个方向.曾从事基础框架,搜索研发四年,大数据平台架构.推荐三年多,个人时间现专注于集群自动化部署,服务管理,资源自动化调度等方向. 这次探讨的主题是: 基于 HBase 做 Storm 实时计算指标存储 HBase 实时指标存储是我入职乐视云后对原有的实时系统改造的一部分.部分分享内容其实还处于实施阶段.架构方案设计的话应该是仁者

Storm实时计算:流操作入门编程实践

Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易.下面,简单介绍编程实践过程中需要理解的Storm中的几个概念: Topology Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排.容纳一组计算逻辑组件(Spout.Bolt)的对象(Hadoop MapReduce中一个Job包含一组Map Task.Reduce Task),这一组计算组件可以按照DAG图的方式编排起来(通过选择Stre

转 大数据实时处理:百分点实时计算架构和算法

当今时代,数据不再昂贵,但从海量数据中获取价值变得昂贵,而要及时获取价值则更加昂贵,这正是大数据实时计算越来越流行的原因.以百分点公司为例,在高峰期每秒钟会有近万HTTP请求发送到百分点服务器上,这些请求包含了用户行为和个性化推荐请求.如何从这些数据中快速挖掘用户兴趣偏好并作出效果不错的推荐呢?这是百分点推荐引擎面临的首要问题.本文将从系统架构和算法两方面全介绍百分点公司在实时计算方面的经验和心得体会,供读者参考. a) 实时计算架构 图 1百分点大数据平台原理示意图 工欲善其事,必先利其器.一

大数据处理:百分点实时计算架构和算法

当今时代,数据不再昂贵,但从海量数据中获取价值变得昂贵,而要及时获取价值则更加昂贵,这正是大数据实时计算越来越流行的原因.以百分点公司为例,在高峰期每秒钟会有近万HTTP请求发送到百分点服务器上,这些请求包含了用户行为和个性化推荐请求.如何从这些数据中快速挖掘用户兴趣偏好并作出效果不错的推荐呢?这是百分点推荐引擎面临的首要问题.本文将从系统架构和算法两方面全介绍百分点公司在实时计算方面的经验和心得体会,供读者参考. a) 实时计算架构 图 1百分点大数据平台原理示意图 工欲善其事,必先利其器.一

百分点实时计算实践:架构和算法

当今时代,数据不再昂贵,但从海量数据中获取价值变得昂贵,而要及时获取价值则更加昂贵,这正是大数据实时计算越来越流行的原因.以百分点公司为例,在高峰期每秒钟会有近万HTTP请求发送到百分点服务器上,这些请求包含了用户行为和个性化推荐请求.如何从这些数据中快速挖掘用户兴趣偏好并作出效果不错的推荐呢?这是百分点推荐引擎面临的首要问题.本文将从系统架构和算法两方面全介绍百分点公司在实时计算方面的经验和心得体会,供读者参考. a) 实时计算架构 图 1百分点大数据平台原理示意图 工欲善其事,必先利其器.一

如何基于Spark Streaming构建实时计算平台

1.前言 随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台搭建以来,经过两年多不断的技术演进,目前实时集群规模已达上百台,平台涵盖各个SBU与公共部门数百个实时应用,全年JStorm集群稳定性达到100%.目前实时平台主要基于JStorm与Spark Streaming构建而成,相信关注携程实时平台的朋友在去年已经看到一篇关于携程实时平台的分享:

Spark 实时计算整合案例

1.概述 最近有同学问道,除了使用 Storm 充当实时计算的模型外,还有木有其他的方式来实现实时计算的业务.了解到,在使用 Storm 时,需要编写基于编程语言的代码.比如,要实现一个流水指标的统计,需要去编写相应的业务代码,能不能有一种简便的方式来实现这一需求.在解答了该同学的疑惑后,整理了该实现方案的一个案例,供后面的同学学习参考. 2.内容 实现该方案,整体的流程是不变的,我这里只是替换了其计算模型,将 Storm 替换为 Spark,原先的数据收集,存储依然可以保留. 2.1 Spar