Elasticsearch+Hbase实现海量数据秒回查询

---------------------------------------------------------------------------------------------
[版权申明:本文系作者原创,转载请注明出处] 
文章出处:http://blog.csdn.net/sdksdk0/article/details/53966430
作者:朱培      ID:sdksdk0     

--------------------------------------------------------------------------------------------

首先祝大家2017新年快乐,我今天分享的是通过ElasticSearch与hbase进行整合的一个搜索案例,这个案例涉及的技术面比较广,首先你得有JAVAEE的基础,要会SSM,而且还要会大数据中的hdfs、zookeeper、hbase以及ElasticSearch和kibana。环境部署在4台centos7上。主机名为node1-node4。这里假设你已经安装好了zookeeper、hadoop、hbase和ElasticSearch还有kibana,我这里使用的是hadoop2.5.2,ElasticSearch用的你是2.2,kibana是4.4.1。我这里的环境是 hadoop是4台在node1-node4, zookeeper是3台再node1-node3,,ElasticSearch是3台在node1-node3,kibana是一台在node1上。该系统可以对亿万数据查询进行秒回,是一般的关系型数据库很难做到的。在IntelliJ IDEA 中进行代码编写。环境搭建我这里就不啰嗦,相信大家作为一名由经验的开发人员来说都是小事一桩。文末提供源码下载链接。

一、ElasticSearch和Hbase

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。 Elasticsearch的性能是solr的50倍。

HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、
实时读写的分布式数据库
– 利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理
HBase中的海量数据,利用Zookeeper作为其分布式协同服务
– 主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)

二、需求分析&服务器环境设置

主要是做一个文章的搜索。有文章标题、作者、摘要、内容四个主要信息。效果图如下:这里样式我就没怎么设置了。。。。想要好看一点的可以自己加css。

服务器:

在3台centos7中部署,主机名为node1-node3.安装好ElasticSearch并配置好集群,

1.     解压

2.     修改config/elasticsearch.yml    (注意要顶格写,冒号后面要加一个空格)

a)      Cluster.name: tf   (同一集群要一样)

b)      Node.name: node-1  (同一集群要不一样)

c)       Network.Host: 192.168.44.137  这里不能写127.0.0.1

3.     解压安装kibana

4.     再congfig目录下的kibana.yml中修改elasticsearch.url

5.     安装插件

Step 1: Install Marvel into Elasticsearch:

bin/plugin install license
bin/plugin install marvel-agent

Step 2: Install Marvel into Kibana

bin/kibana plugin --install elasticsearch/marvel/latest

Step 3: Start Elasticsearch and Kibana

bin/elasticsearch
bin/kibana

 

启动好elasticsearch集群后,

然后启动zookeeper、hdfs、hbase。zkService.sh start  、start-all.sh、start-hbase.sh。
接下来就是剩下编码步骤了。

三、编码开发

1、首先在IntelliJ IDEA中新建一个maven工程,加入如下依赖。

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
        </dependency>

        <!-- spring 3.2 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>3.2.0.RELEASE</version>
        </dependency>

        <!-- JSTL -->
        <dependency>
            <groupId>jstl</groupId>
            <artifactId>jstl</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>taglibs</groupId>
            <artifactId>standard</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
        </dependency>

        <!-- elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- habse -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.1.3</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

2、Dao层

	private Integer id;
	private String title;

	private String describe;

	private String content;

	private String author;

实现其getter/setter方法。

3、数据准备

在桌面新建一个doc1.txt文档,用于把我们需要查询的数据写入到里面,这里我只准备了5条数据。中间用tab键隔开。

4、在hbase中建立表。表名师doc,列族是cf。

public static void main(String[] args) throws Exception {
      HbaseUtils hbase = new HbaseUtils();
      //创建一张表
	hbase.createTable("doc","cf");

}

/**
 * 创建一张表
 * @param tableName
 * @param column
 * @throws Exception
 */
public void createTable(String tableName, String column) throws Exception {
   if(admin.tableExists(TableName.valueOf(tableName))){
      System.out.println(tableName+"表已经存在!");
   }else{
      HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
      tableDesc.addFamily(new HColumnDescriptor(column.getBytes()));
      admin.createTable(tableDesc);
      System.out.println(tableName+"表创建成功!");
   }
}

5、导入索引。这一步的时候确保你的hdfs和hbase以及elasticsearch是处于开启状态。

  @Test
    public void createIndex() throws Exception {
        List<Doc> arrayList = new ArrayList<Doc>();
        File file = new File("C:\\Users\\asus\\Desktop\\doc1.txt");
        List<String> list = FileUtils.readLines(file,"UTF8");
        for(String line : list){
            Doc Doc = new Doc();
            String[] split = line.split("\t");
            System.out.print(split[0]);
            int parseInt = Integer.parseInt(split[0].trim());
            Doc.setId(parseInt);
            Doc.setTitle(split[1]);
            Doc.setAuthor(split[2]);
            Doc.setDescribe(split[3]);
            Doc.setContent(split[3]);
            arrayList.add(Doc);
        }
        HbaseUtils hbaseUtils = new HbaseUtils();
        for (Doc Doc : arrayList) {
            try {
                //把数据插入hbase
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_TITLE, Doc.getTitle());
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_AUTHOR, Doc.getAuthor());
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_DESCRIBE, Doc.getDescribe());
                hbaseUtils.put(hbaseUtils.TABLE_NAME, Doc.getId()+"", hbaseUtils.COLUMNFAMILY_1, hbaseUtils.COLUMNFAMILY_1_CONTENT, Doc.getContent());
                //把数据插入es
                Esutil.addIndex("tfjt","doc", Doc);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

数据导入成功之后可以在服务器上通过命令查看一下:

curl -XGET http://node1:9200/tfjt/_search

7、搜索。

在这里新建了一个工具类Esutil.java,主要用于处理搜索的。注意,我们默认的elasticsearch是9200端口的,这里数据传输用的是9300,不要写成9200了,然后就是集群名字为tf,也就是前面配置的集群名。还有就是主机名node1-node3,这里不能写ip地址,如果是本地测试的话,你需要在你的window下面配置hosts文件。

public class Esutil {
	public static Client client = null;

		/**
		 * 获取客户端
		 * @return
		 */
		public static  Client getClient() {
			if(client!=null){
				return client;
			}
			Settings settings = Settings.settingsBuilder().put("cluster.name", "tf").build();
			try {
				client = TransportClient.builder().settings(settings).build()
						.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node1"), 9300))
						.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node2"), 9300))
						.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("node3"), 9300));
			} catch (UnknownHostException e) {
				e.printStackTrace();
			}
			return client;
		}

	public static String addIndex(String index,String type,Doc Doc){
		HashMap<String, Object> hashMap = new HashMap<String, Object>();
		hashMap.put("id", Doc.getId());
		hashMap.put("title", Doc.getTitle());
		hashMap.put("describe", Doc.getDescribe());
		hashMap.put("author", Doc.getAuthor());

		IndexResponse response = getClient().prepareIndex(index, type).setSource(hashMap).execute().actionGet();
		return response.getId();
	}

	public static Map<String, Object> search(String key,String index,String type,int start,int row){
		SearchRequestBuilder builder = getClient().prepareSearch(index);
		builder.setTypes(type);
		builder.setFrom(start);
		builder.setSize(row);
		//设置高亮字段名称
		builder.addHighlightedField("title");
		builder.addHighlightedField("describe");
		//设置高亮前缀
		builder.setHighlighterPreTags("<font color='red' >");
		//设置高亮后缀
		builder.setHighlighterPostTags("</font>");
		builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
		if(StringUtils.isNotBlank(key)){
//			builder.setQuery(QueryBuilders.termQuery("title",key));
			builder.setQuery(QueryBuilders.multiMatchQuery(key, "title","describe"));
		}
		builder.setExplain(true);
		SearchResponse searchResponse = builder.get();

		SearchHits hits = searchResponse.getHits();
		long total = hits.getTotalHits();
		Map<String, Object> map = new HashMap<String,Object>();
		SearchHit[] hits2 = hits.getHits();
		map.put("count", total);
		List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
		for (SearchHit searchHit : hits2) {
			Map<String, HighlightField> highlightFields = searchHit.getHighlightFields();
			HighlightField highlightField = highlightFields.get("title");
			Map<String, Object> source = searchHit.getSource();
			if(highlightField!=null){
				Text[] fragments = highlightField.fragments();
				String name = "";
				for (Text text : fragments) {
					name+=text;
				}
				source.put("title", name);
			}
			HighlightField highlightField2 = highlightFields.get("describe");
			if(highlightField2!=null){
				Text[] fragments = highlightField2.fragments();
				String describe = "";
				for (Text text : fragments) {
					describe+=text;
				}
				source.put("describe", describe);
			}
			list.add(source);
		}
		map.put("dataList", list);
		return map;
	}

//	public static void main(String[] args) {
//		Map<String, Object> search = Esutil.search("hbase", "tfjt", "doc", 0, 10);
//		List<Map<String, Object>> list = (List<Map<String, Object>>) search.get("dataList");
//	}
}

8、使用spring控制层处理

在里面的spring配置这里就不说了,代码文末提供。

	@RequestMapping("/search.do")
	public String serachArticle(Model model,
			@RequestParam(value="keyWords",required = false) String keyWords,
			@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
			@RequestParam(value = "pageSize", defaultValue = "3") Integer pageSize){
		try {
			keyWords = new String(keyWords.getBytes("ISO-8859-1"),"UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		Map<String,Object> map = new HashMap<String, Object>();
		int count = 0;
		try {
			map = Esutil.search(keyWords,"tfjt","doc",(pageNum-1)*pageSize, pageSize);
			count = Integer.parseInt(((Long) map.get("count")).toString());
		} catch (Exception e) {
			logger.error("查询索引错误!{}",e);
			e.printStackTrace();
		}
		PageUtil<Map<String, Object>> page = new PageUtil<Map<String, Object>>(String.valueOf(pageNum),String.valueOf(pageSize),count);
		List<Map<String, Object>> articleList = (List<Map<String, Object>>)map.get("dataList");
		page.setList(articleList);
		model.addAttribute("total",count);
		model.addAttribute("pageNum",pageNum);
		model.addAttribute("page",page);
		model.addAttribute("kw",keyWords);
		return "index.jsp";
	}

9、页面

<center>
<form action="search.do" method="get">
  <input type="text" name="keyWords" />
  <input type="submit" value="百度一下">
  <input type="hidden" value="1" name="pageNum">
</form>
<c:if test="${! empty page.list }">
<h3>百度为您找到相关结果约${total}个</h3>
<c:forEach items="${page.list}" var="bean">
  <a href="/es/detailDocById/${bean.id}.do">${bean.title}</a>
  <br/>
  <br/>
  <span>${bean.describe}</span>
  <br/>
  <br/>
</c:forEach>

<c:if test="${page.hasPrevious }">
  <a href="search.do?pageNum=${page.previousPageNum }&keyWords=${kw}"> 上一页</a>
</c:if>
<c:forEach begin="${page.everyPageStart }" end="${page.everyPageEnd }" var="n">
  <a href="search.do?pageNum=${n }&keyWords=${kw}"> ${n }</a>   
</c:forEach>

<c:if test="${page.hasNext }">
  <a href="search.do?pageNum=${page.nextPageNum }&keyWords=${kw}"> 下一页</a>
</c:if>
</c:if>
</center>

10、项目发布
在IntelliJ IDEA 中配置好常用的项目,这里发布名Application context名字为es,当然你也可以自定义设置。

最终效果如下:搜索COS会得到结果,速度非常快。

总结:这个案例的操作流程还是挺多的,要有细心和耐心,特别是服务器配置,各种版本要匹配好,不然会出各种头疼的问题,当然了,这个还是需要有一定基础,不然搞不定这个事情。。。。。

源码地址:https://github.com/sdksdk0/es

时间: 2024-09-10 11:10:56

Elasticsearch+Hbase实现海量数据秒回查询的相关文章

使用闪回查询备份数据

今天在生产环境中,开发人员提交了一个脚本,是做update操作的,但是update操作的时候过滤条件有些大,本来预计修改的数据只有5000条,结果这个语句运行下来更改了500万条数据.对生产系统来说算是一个数据灾难,赶紧和开发确认了问题发生的时间,结果说是在半夜11点多,刚好在后半夜才开始做数据备份,这样这个变更也同时影响了备份,就算做紧急的数据恢复也是没有任何效果的.目前采用的备份都是全量的按天备份,备份收到影响,恢复还是比较困难的. 这个问题就在紧急的讨论中分为了两个步骤,我来尝试恢复昨天备

Oracle Database 10g:最佳新特性(第一周:闪回查询)

oracle 第一周:闪回查询 得到电影而不是图片:闪回版本查询 不需要设置,立即识别对行的所有更改 在 Oracle9i Database 中,我们看到它推出了以闪回查询形式表示的"时间机器".该特性允许 DBA 看到特定时间的列值,只要在还原段中提供该数据块此前镜像的拷贝即可.但是,闪回查询只提供某时刻数据的固定快照,而不是在两个时间点之间被更改数据的运行状态表示.某些应用程序,如涉及到外币管理的应用程序,可能需要了解一段时期内数值数据的变化,而不仅仅是两个时间点的数值.由于闪回版

Oracle 9i中的一个闪回查询操作实例

在利用闪回功能前需要确认: 1.用户有对dbms_flashback包有执行权限! 2.进行闪回查询必须设置自动回滚段管理,在init.ora设置参数UNDO_MANAGEMENT=AUTO,参数UNDO_RETENTION=n,决定了能往前闪回的最大时间,值越大就需要越多Undo空间. Oracle 9i中闪回查询操作实例 查看Oracle中Delete和Commit操作的流程分析 例:Oracle 9i的Flashback Query操作. (1)创建闪回查询用户 SQL> create u

求海量数据的高性能查询统计的方法?

问题描述 求海量数据的高性能查询统计的方法? 目前有每天过亿的历史数据产生,这些数据要保存三个月,可以用什么技术或处理方案可以 实现对这些数据的快速查询统计等操作? 解决方案 这个属于大数据的处理,你需要借助一些专业大数据处理软件,比如hadoop,Storm等. 解决方案二: 用大数据平台,hadoop加spark吧,spark查询速度快,也能支持各种统计. 解决方案三: Hash法 Hash一般被翻译为哈希,也被称为散列,它是一种映射关系,即给定一个数据元素,其关键字为key,按一个确定的哈

云计算-hbase中filter只能用于查询吗?

问题描述 hbase中filter只能用于查询吗? 1.fiter还能用于put和delete吗? 2.hbase又能判断表是否存在的方法,有能判断表中是否有列族?有能判断类族中有列?的方法吗?(我看api好像没有啊)如果没有怎么实现这样的方法? 3.hbase的时间戳不指定是默认,可是有什么属性之类的能设定让他连系统默认都不使用吗?(个人感觉hbase没有时间戳了就没有版本了,不用时间戳好像没意义.) 4.hbase当操作表的结构时候,传值设定表的属性,那么参数太多,使用哪个传值方式更合适(m

计算机-海量数据存储,查询是如何实现的

问题描述 海量数据存储,查询是如何实现的 数据库 就用 Oracle 吧 11亿条数据在数据中是如何存储的?分物理存储,和逻辑存储两方面阐述 这11亿条数据在数据库是如何实现查询的?分物理查询,和逻辑查询两方面阐述 还是这11亿条数据在计算机是如何存储的?分物理存储,和逻辑存储两方面阐述 这11亿条数据在计算机中是如何查询的?分物理查询,和逻辑查询两方面阐述 项目例会的时候,开发经理提出来的.大家各抒己见! 解决方案 11亿在今时今日已经不能称海量了,一般都要过10t~100T的大小.如果磁盘I

怎么把&quot;信息秒回&quot;做到极致?IMO班聊也是拼了

三秒钟找到人,并立刻发起沟通--仅仅是"班铃"这一个功能,就让越来越多的企业成为IMO班聊的铁杆"粉丝". "做为一款工作聊天软件,班聊在沟通方面做得特别细致." 齐家网的无线事业部推广总监姚苏粤对IMO班聊的评价就是,这是一款真正的职场神器. 同样是聊天,生活.工作大不同 同样是聊天,细分到工作场景之下,会有什么不同? 在IMO班聊刚刚召开的产品发布会上,就出现了这样一个场景--早上9点,上海浦东一家公司的产品总监Kevin刚刚走进办公室,还没

上百个用户每隔10秒去查询一次数据表(一般就几条数据),服务器撑的住么?

问题描述 上百个用户每隔10秒去查询一次数据表(一般就几条数据),服务器撑的住么? 解决方案 完全可以,数据库有查询缓存的,经常查询的数据直接命中缓存解决方案二:先要做到代码没有漏洞,就是说例如用jdbc访问数据库,每次做到打开和关闭都匹配,......再做到服务器性能支持,就没问题了.解决方案三:完全没问题解决方案四:完全无压力,除非程序有问题,或者你的查询一次的性能很差解决方案五:得看是什么样的服务器.数据库是单独一台服务器还是虚拟主机还是什么.100并发访问的话,最好配连接池.注意应用中的

oracle 9i使用闪回查询恢复数据库误删问题_oracle

如果用户误删/更新了数据后,作为用户并没有什么直接的方法来进行恢复,他们必须求助DBA来对数据库进行恢复,到了Oracle9i,这一个难堪局面有所改善.Oracle 9i中提供了一项新的技术手段--闪回查询,用户使用闪回查询可以及时取得误操作前的数据,并可以针对错误进行相应的恢复措施,而这一切都无需DBA干预. 因为一时手贱,生产上的数据被我给delete掉了. 用的是delete语句,然后很迅速的还给commit了 下面这两个语句: ALTER TABLE tablename ENABLE r