ElasticSearch Aggregations 分析

承接上篇文章ElasticSearch Rest/RPC 接口解析,这篇文章我们重点分析让ES步入数据分析领域的Aggregation相关的功能和设计。

前言

我记得有一次到一家公司做内部分享,然后有研发问我,即席分析这块,他们用ES遇到一些问题。我当时直接就否了,我说ES还是个全文检索引擎,如果要做分析,还是应该用Impala,Phenix等这种主打分析的产品。随着ES的发展,我现在对它的看法,也有了比较大的变化。而且我认为ES+Spark SQL组合可以很好的增强即席分析能够处理的数据规模,并且能够实现复杂的逻辑,获得较好的易用性。

需要说明的是,我对这块现阶段的理解也还是比较浅。问题肯定有不少,欢迎指正。

Aggregations的基础

Lucene 有三个比较核心的概念:

  1. 倒排索引
  2. fieldData/docValue
  3. Collector

倒排索引不用我讲了,就是term -> doclist的映射。 

fieldData/docValue 你可以简单理解为列式存储,索引文件的所有文档的某个字段会被单独存储起来。 对于这块,Lucene 经历了两阶段的发展。第一阶段是fieldData ,查询时从倒排索引反向构成doc-term。这里面有两个问题:

  • 数据需要全部加载到内存
  • 第一次构建会很慢

这两个问题其实会衍生出很多问题:最严重的自然是内存问题。所以lucene后面搞了DocValue,在构建索引的时候就生成这个文件。DocValue可以充分利用操作系统的缓存功能,如果操作系统cache住了,则速度和内存访问是一样的。

另外就是Collector的概念,ES的各个Aggregator 实现都是基于Collector做的。我觉得你可以简单的理解为一个迭代器就好,所有的候选集都会调用Collector.collect(doc)方法,这里collect == iterate 可能会更容易理解些。

ES 能把聚合做快,得益于这两个数据结构,一个迭代器。我们大部分聚合功能,其实都是在fieldData/docValue 上工作的。

Aggregations 分类

Aggregations种类分为:

  1. Metrics
  2. Bucket

Metrics 是简单的对过滤出来的数据集进行avg,max等操作,是一个单一的数值。

Bucket 你则可以理解为将过滤出来的数据集按条件分成多个小数据集,然后Metrics会分别作用在这些小数据集上。

对于最后聚合出来的结果,其实我们还希望能进一步做处理,所以有了Pipline Aggregations,其实就是组合一堆的Aggregations 对已经聚合出来的结果再做处理。

Aggregations 类设计

下面是一个聚合的例子:

{
    "aggregations": {
        "user": {
            "terms": {
                "field": "user",
                "size": 10,
                "order": {
                    "_count": "desc"
                }
            }
        }
    }
}

其语义类似这个sql 语句: select count(*) as user_count group by user order by user_count  desc。

对于Aggregations 的解析,基本是顺着下面的路径分析:

TermsParser ->
        TermsAggregatorFactory ->
                  GlobalOrdinalsStringTermsAggregator

在实际的一次query里,要做如下几个阶段:

  1. Query Phase 此时 会调用GlobalOrdinalsStringTermsAggregator的Collector 根据user 的不同进行计数。
  2. RescorePhase
  3. SuggestPhase
  4. AggregationPhase  在该阶段会会执行实际的aggregation build, aggregator.buildAggregation(0),也就是一个特定Shard(分片)的聚合结果
  5. MergePhase。这一步是由接受到请求的ES来完成,具体负责执行Merge(Reduce)操作SearchPhaseController.merge。这一步因为会从不同的分片拿到数据再做Reduce,也是一个内存消耗点。所以很多人会专门搞出几台ES来做这个工作,其实就是ES的client模式,不存数据,只做接口响应。

在这里我们我们可以抽取出几个比较核心的概念:

  1. AggregatorFactory (生成对应的Aggregator)
  2. Aggregation (聚合的结果输出)
  3. Aggregator (聚合逻辑实现)

另外值得注意的,PipeLine Aggregator 我前面提到了,其实是对已经生成的Aggregations重新做加工,这个工作是只能单机完成的,会放在请求的接收端执行。

Aggregation Bucket的实现

前面的例子提到,在Query 阶段,其实就会调用Aggregator 的collect 方法,对所有符合查询条件的文档集都会计算一遍,这里我们涉及到几个对象:

  1. doc id
  2. field (docValue)
  3. IntArray 对象

collect 过程中会得到 doc id,然后拿着docId 到 docValue里去拿到field的值(一般而言字符串也会被编码成Int类型的),然后放到IntArray 进行计数。如果多个doc id 在某filed里的字段是相同的,则会递增计数。这样就实现了group by 的功能了。

Spark-SQL 和 ES 的组合

我之前一直在想这个问题,后面看了下es-hadoop的文档,发现自己有些思路和现在es-hadoop的实现不谋而合。主要有几点:

  1. Spark-SQL 的 where 语句全部(或者部分)下沉到 ES里进行执行,依赖于倒排索引,DocValues,以及分片,并行化执行,ES能够获得比Spark-SQL更优秀的响应时间
  2. 其他部分包括分片数据Merge(Reduce操作,Spark 可以获得更好的性能和分布式能力),更复杂的业务逻辑都交给Spark-SQL (此时数据规模已经小非常多了),并且可以做各种自定义扩展,通过udf等函数
  3. ES 无需实现Merge操作,可以减轻内存负担,提升并行Merge的效率(并且现阶段似乎ES的Reduce是只能在单个实例里完成)
时间: 2024-10-28 13:01:05

ElasticSearch Aggregations 分析的相关文章

ElasticSearch Aggregations GroupBy 实现源码分析

在前文ElasticSearch Aggregations 分析中,我们提及了 [Aggregation Bucket的实现],然而只是用文字简要描述了原理.今天我们会举个实际groupBy的例子进行剖析,让大家对ElasticSearch Aggregations 的工作原理有更深入的理解. 准备工作 为了方便调试,我对索引做了如下配置 { "mappings": { "my_type": { "properties": { "new

ElasticSearch Recovery 分析

这是一篇源码分析类的文章,大家需要先建立一个整体的概念,建议参看这篇文章 另外你可能还需要了解下 Recovery 阶段迁移过程: INIT ->  INDEX -> VERIFY_INDEX -> TRANSLOG -> FINALIZE -> DONE 概览 Recovery 其实有两种: Primary的迁移/Replication的生成和迁移  Primary的恢复 org.elasticsearch.indices.cluster.IndicesClusterSta

如何使用Elasticsearch和cAdvisor监控Docker容器

如果你正在运行 Swarm 模式的集群,或者只运行单台 Docker,你都会有下面的疑问: 我如何才能监控到它们都在干些什么?这个问题的答案是"很不容易". 你需要监控下面的参数: 容器的数量和状态. 一台容器是否已经移到另一个节点了,如果是,那是在什么时候,移动到哪个节点? 给定节点上运行着的容器数量. 一段时间内的通信峰值. 孤儿卷和网络(LCTT 译注:孤儿卷就是当你删除容器时忘记删除它的卷,这个卷就不会再被使用,但会一直占用资源). 可用磁盘空间.可用 inode 数. 容器数

Kibana——数据图形化制作

Kibana把数据图形化,可以帮助我们更好的去分析数据,找到数据里面的结构 注意看一下,上面缩看的界面,是一个聚合的结果,他是由CPU Usage图表.System Load图表.CPU usage over time图表. System Load over time图表一起构造而成的.我把这种聚合称作一组相关性的数据看板,你可以随意的组合你的看板,如果你觉得这样做是合适的.那么,CPU Usage图表.System Load图表.CPU usage over time图表. System Lo

表格存储结合Elasticsearch进行搜索的场景分析和实践

表格存储结合Elasticsearch进行搜索的场景分析和实践 表格存储(TableStore)是什么 TableStore是一个构建在阿里云飞天分布式系统上的Nosql数据库服务,熟悉阿里云的同学肯定听说过飞天5K,飞天是一个可以管理5000台机器的分布式系统,TableStore作为构建在其上的一个Nosql数据库,可以承载海量(单表几百TB)的数据存储,同时数据有三份拷贝,数据安全性有极高的保证. TableStore的数据是以行进行组织的,每行包含多个主键列和多个属性列,主键列的列名和类

安全日志收集与分析——抢先体验阿里云 ElasticSearch

10月13日,云栖大会上阿里云与Elastic公司联合发布阿里云Elasticsearch产品.根据官方介绍,阿里云提供基于开源Elasticsearch及商业版X-Pack插件,致力于数据分析.数据搜索等场景服务.在开源Elasticsearch基础上提供企业级权限管控.安全监控告警.自动报表生成等功能. Elasticsearch(ES)是中小企业安全日志分析的必备工具.团队在获得试用资格后,随即开展了产品试用工作,抢先体验这款"新产品". 申请开通服务 访问网址:https://

ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平台

ELK平台介绍 在搜索ELK资料的时候,发现这篇文章比较好,于是摘抄一小段: 以下内容来自:http://baidu.blog.51cto.com/71938/1676798 日志主要包括系统日志.应用程序日志和安全日志.系统运维和开发人员可以通过日志了解服务器软硬件信息.检查配置过程中的错误及错误发生的原因.经常分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误. 通常,日志被分散的储存不同的设备上.如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志.这样

Logstash+Redis+Elasticsearch+Kibana+Nginx搭建日志分析系统

前言: 随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析.目前我们服务的用户包括微博.微盘.云存储.弹性计算平台等十多个部门的多个产品的日志搜索分析业务,每天处理约32亿条(2TB)日志.哈哈 以上都是新浪的信息~不是我们小公司的分析业务规模. 当然为了使得运行在客户端的软件有良好的体验,并且得到有用的数据,我们需要对这些系统产生的数据,进行统计和分析,这个过程通常包括数据采集,清洗,建模,分析,报表等.接下来在本篇文章中,将会构建一个基于logstash,redis,elast

使用Akka、Kafka和ElasticSearch等构建分析引擎 -- good

本文翻译自Building Analytics Engine Using Akka, Kafka & ElasticSearch,已获得原作者Satendra Kumar和网站授权. 在这篇文章里,我将和大家分享一下我用Scala.Akka.Play.Kafka和ElasticSearch等构建大型分布式.容错.可扩展的分析引擎的经验. 我的分析引擎主要是用于文本分析的.输入有结构化的.非结构化的和半结构化的数据,我们会用分析引擎对数据进行大量处理.如下图所示为第一代架构,分析引擎可以用REST