ElasticSearch的基本用法与集群搭建

一、简介

ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。

这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/

二、基本用法

Elasticsearch集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含多个文档(documents),然后每个文档包含多个字段(Fields),这种面向文档型的储存,也算是NoSQL的一种吧。

ES比传统关系型数据库,对一些概念上的理解:

Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices   -> Types  -> Documents -> Fields

从创建一个Client到添加、删除、查询等基本用法:

1、创建Client

public ElasticSearchService(String ipAddress, int port) {
        client = new TransportClient()
                .addTransportAddress(new InetSocketTransportAddress(ipAddress,
                        port));
    }

这里是一个TransportClient。

ES下两种客户端对比:

TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。

Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。

2、创建/删除Index和Type信息

    // 创建索引
    public void createIndex() {
        client.admin().indices().create(new CreateIndexRequest(IndexName))
                .actionGet();
    }

    // 清除所有索引
    public void deleteIndex() {
        IndicesExistsResponse indicesExistsResponse = client.admin().indices()
                .exists(new IndicesExistsRequest(new String[] { IndexName }))
                .actionGet();
        if (indicesExistsResponse.isExists()) {
            client.admin().indices().delete(new DeleteIndexRequest(IndexName))
                    .actionGet();
        }
    }

    // 删除Index下的某个Type
    public void deleteType(){
        client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();
    }

    // 定义索引的映射类型
    public void defineIndexTypeMapping() {
        try {
            XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
            mapBuilder.startObject()
            .startObject(TypeName)
                .startObject("properties")
                    .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()
                    .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()
                    .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                    .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                    .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                    .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                    .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                    .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()
                .endObject()
            .endObject()
            .endObject();

            PutMappingRequest putMappingRequest = Requests
                    .putMappingRequest(IndexName).type(TypeName)
                    .source(mapBuilder);
            client.admin().indices().putMapping(putMappingRequest).actionGet();
        } catch (IOException e) {
            log.error(e.toString());
        }
    }

这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。

注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。

详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html

3、索引数据

    // 批量索引数据
    public void indexHotSpotDataList(List<Hotspotdata> dataList) {
        if (dataList != null) {
            int size = dataList.size();
            if (size > 0) {
                BulkRequestBuilder bulkRequest = client.prepareBulk();
                for (int i = 0; i < size; ++i) {
                    Hotspotdata data = dataList.get(i);
                    String jsonSource = getIndexDataFromHotspotData(data);
                    if (jsonSource != null) {
                        bulkRequest.add(client
                                .prepareIndex(IndexName, TypeName,
                                        data.getId().toString())
                                .setRefresh(true).setSource(jsonSource));
                    }
                }

                BulkResponse bulkResponse = bulkRequest.execute().actionGet();
                if (bulkResponse.hasFailures()) {
                    Iterator<BulkItemResponse> iter = bulkResponse.iterator();
                    while (iter.hasNext()) {
                        BulkItemResponse itemResponse = iter.next();
                        if (itemResponse.isFailed()) {
                            log.error(itemResponse.getFailureMessage());
                        }
                    }
                }
            }
        }
    }

    // 索引数据
    public boolean indexHotspotData(Hotspotdata data) {
        String jsonSource = getIndexDataFromHotspotData(data);
        if (jsonSource != null) {
            IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
                    TypeName).setRefresh(true);
            requestBuilder.setSource(jsonSource)
                    .execute().actionGet();
            return true;
        }

        return false;
    }

    // 得到索引字符串
    public String getIndexDataFromHotspotData(Hotspotdata data) {
        String jsonString = null;
        if (data != null) {
            try {
                XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
                jsonBuilder.startObject().field(IDFieldName, data.getId())
                        .field(SeqNumFieldName, data.getSeqNum())
                        .field(IMSIFieldName, data.getImsi())
                        .field(IMEIFieldName, data.getImei())
                        .field(DeviceIDFieldName, data.getDeviceID())
                        .field(OwnAreaFieldName, data.getOwnArea())
                        .field(TeleOperFieldName, data.getTeleOper())
                        .field(TimeFieldName, data.getCollectTime())
                        .endObject();
                jsonString = jsonBuilder.string();
            } catch (IOException e) {
                log.equals(e);
            }
        }

        return jsonString;
    }

ES支持批量和单个数据索引。

4、查询获取数据

    // 获取少量数据100个
    private List<Integer> getSearchData(QueryBuilder queryBuilder) {
        List<Integer> ids = new ArrayList<>();
        SearchResponse searchResponse = client.prepareSearch(IndexName)
                .setTypes(TypeName).setQuery(queryBuilder).setSize(100)
                .execute().actionGet();
        SearchHits searchHits = searchResponse.getHits();
        for (SearchHit searchHit : searchHits) {
            Integer id = (Integer) searchHit.getSource().get("id");
            ids.add(id);
        }
        return ids;
    }

    // 获取大量数据
    private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
        List<Integer> ids = new ArrayList<>();
        // 一次获取100000数据
        SearchResponse scrollResp = client.prepareSearch(IndexName)
                .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
                .setQuery(queryBuilder).setSize(100000).execute().actionGet();
        while (true) {
            for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                Integer id = (Integer) searchHit.getSource().get(IDFieldName);
                ids.add(id);
            }
            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
                    .setScroll(new TimeValue(600000)).execute().actionGet();
            if (scrollResp.getHits().getHits().length == 0) {
                break;
            }
        }

        return ids;
    }

这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。

5、聚合(Aggregation Facet)查询 

    // 得到某段时间内设备列表上每个设备的数据分布情况<设备ID,数量>
    public Map<String, String> getDeviceDistributedInfo(String startTime,
            String endTime, List<String> deviceList) {

        Map<String, String> resultsMap = new HashMap<>();

        QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
        QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
        QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                .must(deviceQueryBuilder).must(rangeBuilder);

        TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE)
                .field(DeviceIDFieldName);
        SearchResponse searchResponse = client.prepareSearch(IndexName)
                .setQuery(queryBuilder).addAggregation(termsBuilder)
                .execute().actionGet();
        Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");
        if (terms != null) {
            for (Terms.Bucket entry : terms.getBuckets()) {
                resultsMap.put(entry.getKey(),
                        String.valueOf(entry.getDocCount()));
            }
        }
        return resultsMap;
    }

Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。

详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

三、集群配置

配置文件elasticsearch.yml

集群名和节点名:

#cluster.name: elasticsearch

#node.name: "Franz Kafka"

是否参与master选举和是否存储数据

#node.master: true

#node.data: true

分片数和副本数

#index.number_of_shards: 5
#index.number_of_replicas: 1

master选举最少的节点数,这个一定要设置为整个集群节点个数的一半加1,即N/2+1

#discovery.zen.minimum_master_nodes: 1

discovery ping的超时时间,拥塞网络,网络状态不佳的情况下设置高一点

#discovery.zen.ping.timeout: 3s

注意,分布式系统整个集群节点个数N要为奇数个!!

如何避免ElasticSearch发生脑裂(brain split):http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/

即使集群节点个数为奇数,minimum_master_nodes为整个集群节点个数一半加1,也难以避免脑裂的发生,详情看讨论:https://github.com/elastic/elasticsearch/issues/2488

四、Elasticsearch插件

1、elasticsearch-head是一个elasticsearch的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head

2、elasticsearch-sql:使用SQL语法查询elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql

github地址:https://github.com/NLPchina/elasticsearch-sql

3、elasticsearch-bigdesk是elasticsearch的一个集群监控工具,可以通过它来查看ES集群的各种状态。

安装:./bin/plugin -install lukas-vlcek/bigdesk

访问:http://192.103.101.203:9200/_plugin/bigdesk/

4、elasticsearch-servicewrapper插件是ElasticSearch的服务化插件,

在https://github.com/elasticsearch/elasticsearch-servicewrapper下载该插件后,解压缩,将service目录拷贝到elasticsearch目录的bin目录下。

而后,可以通过执行以下语句安装、启动、停止ElasticSearch:

sh elasticsearch install

sh elasticsearch start

sh elasticsearch stop

 

参考:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch

 

作者:阿凡卢

出处:http://www.cnblogs.com/luxiaoxun/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

http://www.cnblogs.com/luxiaoxun/p/4869509.html

时间: 2024-09-14 10:36:18

ElasticSearch的基本用法与集群搭建的相关文章

ElasticSearch的基本用法与集群搭建 good

一.简介 ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持. 这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/ 二.基本用法 Elasticsearch集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含

codis集群搭建

目前公司的业务是php语言开发,使用的是laravel开源框架.laravel框架使用predis连接redis,但是目前predis不支持主从式redis集群,这点很坑. 看了看目前网上的解决方案,国内豌豆荚开源的codis很不错.这篇文章,我们主要介绍codis集群的安装. 一.codis介绍 codis是一个分布式redis集群解决方案,对于上层的应用来说, 连接到codis-proxy和连接原生的redis-server没有明显的区别. 上层应用可以像使用单机的redis一样使用,cod

zookeeper集群搭建

因为公司的业务发展,需要搭建codis集群(一个由国内豌豆荚开发的redis集群解决方案),但是codis集群是依赖与zookeeper集群的.所以这篇文章,我们主要介绍有关zookeeper集群的搭建. 一.zookeeper介绍 zookeeper是一个分布式的开源框架,它能很好的管理集群,而且提供协调分布式应用的基本服务. 它向外部应用暴露一组通用服务--分布式同步(Distributed Synchronization).命名服务(Naming Service).集群维护(Group M

Hadoop 三台主机 集群搭建 详解(测试)

Hadoop 三台主机 集群搭建 详解 学习更多,请访问系列文章: 1. VMware Redhat网络配置 2. Hadoop 三台主机 集群搭建 详解 3. Windows 下配置 Eclipse 连接 Hadoop 开发环境 部署环境: OS:Redhat 5.5 Enterprise JDK:jdk1.6.0_32 Hadoop:Hadoop-0.20.2 VMWare:7.0 节点安排及网络拓扑: 节点类型 节点IP 节点hostname master节点 192.168.40.5 m

hadoop集群搭建完成,其他进程都启动了,但是namenode没有启动,查看日志,报错了

问题描述 hadoop集群搭建完成,其他进程都启动了,但是namenode没有启动,查看日志,报错了 hadoop集群搭建完成,其他进程都启动了,但是namenode没有启动,查看namenode的日志信息,报错了, 192.168.100.70:8485: Call From anlulu-1/192.168.100.10 to anlulu-7:8485 failed on connection exception: java.net.ConnectException: 拒绝连接; For

Spark-1.4.0集群搭建

主要内容 Ubuntu 10.04 系统设置 ZooKeeper集群搭建 Hadoop-2.4.1集群搭建 Spark 1.4.0集群搭建 假设已经安装好Ubuntu操作系统 Ubuntu 10.04设置 1.主机规划 主机名 IP地址 进程号 SparkMaster 192.168.1.103 ResourceManager DataNode.NodeManager.JournalNode.QuorumPeerMain SparkSlave01 192.168.1.101 ResourceMa

Spark修炼之道(进阶篇)——Spark入门到精通:第十五节 Kafka 0.8.2.1 集群搭建

作者:周志湖 微信号:zhouzhihubeyond 本节为下一节Kafka与Spark Streaming做铺垫 主要内容 1.kafka 集群搭建 1. kafka 集群搭建 kafka 安装与配置 到下面的地址下载:Scala 2.10 - kafka_2.10-0.8.2.1.tgz http://kafka.apache.org/downloads.html 下载完成后,使用命令 tar -zxvf kafka_2.10-0.8.2.1.tgz 解压,解压后的目录如下 进入config

高可用Hadoop平台-HBase集群搭建

1.概述 今天补充一篇HBase集群的搭建,这个是高可用系列遗漏的一篇博客,今天抽时间补上,今天给大家介绍的主要内容目录如下所示: 基础软件的准备 HBase介绍 HBase集群搭建 单点问题验证 截图预览 那么,接下来我们开始今天的HBase集群搭建学习. 2.基础软件的准备 由于HBase的数据是存放在HDFS上的,所以我们在使用HBase时,确保Hadoop集群已搭建完成,并运行良好.若是为搭建Hadoop集群,请参考我写的<配置高可用的Hadoop平台>来完成Hadoop平台的搭建.另

hadoop集群搭建详述

1.集群搭建策略 分析: 我手上只有3台电脑,两台华硕笔记本i7.i3的处理器,一台台式机Pentium4处理器.为了更好的测试zookeeper的功能, 我们总共需要6台ubuntu(Ubuntu 14.04.3 LTS)的主机.以下是我的主机分布策略: i7:开启4台ubuntu虚拟机分别是 虚拟机名 内存 硬盘 网络连接 master 1G 20G 桥接 master2 1G 20G 桥接 rm 512M 20G 桥接 slave3 1G 20G 桥接 i3: 安装ubuntu系统 ,作为