ElasticSearch实战-编码实践

1.概述

  前面在《ElasticSearch实战-入门》中给大家分享如何搭建这样一个集群,在完成集群的搭建后,今天给大家分享如何实现对应的业务功能模块,下面是今天的分享内容,目录如下所示:

  • 编码实践
  • 效果预览
  • 总结

2.编码实践

  由于 ES 集群支持 Restful 接口,我们可以直接通过 Java 来调用 Restful 接口来查询我们需要的数据结果,并将查询到的结果在在我们的业务界面可视化出来。我们知道在 ES 集群的 Web 管理界面有这样一个入口,如下图所示:

  我们可以在此界面的入口中拼接 JSON 字符串来查询我们想要的结果,下面,我们通过 Java 的 API 去调用 Restful 接口来查询我们想要的结果。

2.1 字符串拼接实现

  接着,我们去实现要查询的核心代码,具体内容实现如下所示:

public String buildQueryString(Map<String, Object> param) throws ParseException {
        SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        StringBuilder builder = new StringBuilder("{\"query\":{\"bool\":{\"must\":[");
        if (param.get("msgType") != null) {
            Integer msgType = (int) param.get("msgType") == 0 ? 2 : 1;
            builder.append("{\"term\":{\"msg_type\":").append(msgType).append("}}");
        }if (param.get("start") != null && param.get("end") != null) {
            String start = String.valueOf(dfs.parse(param.get("start").toString()).getTime()).substring(0, 10);
            String end = String.valueOf(dfs.parse(param.get("end").toString()).getTime()).substring(0, 10);
            builder.append(",{\"range\":{\"itime\":{\"from\":" + start + ",\"to\":" + end + "}}}");
        }
        if (param.get("receiverValue") != null) {
            builder.append(",{\"wildcard\":{\"receiver_value\":\"*").append(param.get("receiverValue")).append("*\"}}");
        }
        builder.append("],\"must_not\":[],\"should\":[]}}");
        builder.append(",\"sort\":[{\"itime\":\"desc\"}],\"facets\":{}");
        builder.append(",\"from\": ").append(param.get("startIndex")).append(",\"size\": ").append(param.get("pageSize")).append("}");
        LOG.info("API Query -> " + builder.toString());
        return builder.toString();
    }

2.2 查询实现核心代码

  接着是实现查询的核心代码,具体内容实现如下所示:

public SerachResponse<ApiSent> querySent(Map<String, Object> param) {
        SerachResponse<ApiSent> search_result = null;
        try {
            long time = System.currentTimeMillis();
            ResponseWrapper wrapper = httpUtils.sendJson(configService.loadConfig(Configure.API_SENT), buildQueryString(param));
            if (wrapper.responseCode == HttpStatus.SC_OK) {
                search_result = _gson.fromJson(wrapper.responseContent, new TypeToken<SerachResponse<ApiSent>>() {
                }.getType());
                LOG.info(String.format("API query ES spent time=%sms", (System.currentTimeMillis() - time)));
                return search_result;
            } else {
                LOG.info(String.format("API query ES spent time=%sms", (System.currentTimeMillis() - time)));
                LOG.error(String.format("api sent request es server response not 200,response=%s,exception=%s", wrapper.error, wrapper.exceptionString));
            }
        } catch (Exception ex) {
            LOG.error(String.format("parsed es sent data exception.", ex));
        }
        return search_result;

    }

Configure类



public class Configure {
    public static final String API_SENT = "API_SENT";
}

2.3 DAO层获取 ES 集群的连接信息

public class ConfigService {

    private static Log logger = LogFactory.getLog(ConfigService.class);

    @Autowired
    private ConfigDao configDao;

    @Cacheable("sysConfigCache")
    public String loadConfig(String type) {
        String value = configDao.getConfig(type);
        logger.info(String.format("Load Config,type=%s,value=%s", type, value));
        return value;
    }
}

ConfigDao接口



public interface ConfigDao {

    String getConfig(String type);

}

  其对应的实现内容如下所示:



<select id="getConfig" parameterType="String" resultType="String">
   select value from t_system_config where type=#{type}
</select>

  DB库存储的 ES 连接信息,如下图所示:

2.4 HTTP 接口的实现代码

  关于 HttpUtils 的代码实现较为简单,这里直接附上代码的实现内容,如下所示:

  • IHttpUtils 接口


public interface IHttpUtils {
    public ResponseWrapper sendJson(String url, String content);
}
  • HttpUtils 类实现接口
public class HttpUtils implements IHttpUtils {

    private static Logger LOG = Logger.getLogger(HttpUtils.class.getName());
    protected static Gson _gson = new Gson();

    protected void initSSL() {
        try {
            TrustManager[] tmCerts = new javax.net.ssl.TrustManager[1];
            tmCerts[0] = new SimpleTrustManager();
            javax.net.ssl.SSLContext sc = javax.net.ssl.SSLContext.getInstance("SSL");
            sc.init(null, tmCerts, null);
            javax.net.ssl.HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
            HostnameVerifier hv = new SimpleHostnameVerifier();
            HttpsURLConnection.setDefaultHostnameVerifier(hv);
        } catch (Exception e) {
            LOG.error("init SSL exception.", e);
        }
    }

    @Override
    public ResponseWrapper sendJson(String url, String content) {
        return sendJson(url, content, null);
    }

    @Override
    public ResponseWrapper sendJson(String url, String content, String authCode) {
        return sendRequest(url, content, METHOD_POST, authCode, CONTENT_TYPE_JSON);
    }

public ResponseWrapper sendRequest(String url, String content, String method, String authCode,
            String contentType) {
        LOG.info("Send request to - " + url + ", with content - " + content);
        HttpURLConnection conn = null;
        OutputStream out = null;
        StringBuffer sb = new StringBuffer();
        cn.jpush.utils.ResponseWrapper wrapper = new ResponseWrapper();

        try {
            if (StringUtils.isSSL(url)) {
                initSSL();
            }

            if (METHOD_GET.equals(method)) {
                if (!Strings.isNullOrEmpty(content)) url += "?" + content;
            }
            URL aUrl = new URL(url);
            wrapper.address = aUrl.getHost();

            conn = (HttpURLConnection) aUrl.openConnection();
            conn.setConnectTimeout(DEFAULT_CONNECTION_TIMEOUT);
            conn.setReadTimeout(DEFAULT_SOCKET_TIMEOUT);
            conn.setUseCaches(false);
            conn.setRequestMethod(method);
            conn.setRequestProperty("Connection", "Keep-Alive");
            conn.setRequestProperty("Accept-Charset", CHARSET);
            conn.setRequestProperty("Charset", CHARSET);
            conn.setRequestProperty("Authorization", authCode);
            conn.setRequestProperty("Send-Source", "portal");
            conn.setRequestProperty("Content-Type", contentType);

            if (METHOD_POST.equals(method)) {
                conn.setDoOutput(true);
                byte[] data = content.getBytes(CHARSET);
                conn.setRequestProperty("Content-Length", String.valueOf(data.length));
                out = conn.getOutputStream();
                out.write(data);
                out.flush();
            } else {
                conn.setDoOutput(false);
            }
            int status = conn.getResponseCode();
            InputStream in = null;
            if (status == 200) {
                in = conn.getInputStream();
            } else {
                in = conn.getErrorStream();
            }
            InputStreamReader reader = new InputStreamReader(in, CHARSET);
            char[] buff = new char[1024];
            int len;
            while ((len = reader.read(buff)) > 0) {
                sb.append(buff, 0, len);
            }

            String responseContent = sb.toString();
            wrapper.responseCode = status;
            wrapper.responseContent = responseContent;

            String quota = conn.getHeaderField(RATE_LIMIT_QUOTA);
            String remaining = conn.getHeaderField(RATE_LIMIT_Remaining);
            String reset = conn.getHeaderField(RATE_LIMIT_Reset);
            wrapper.setRateLimit(quota, remaining, reset);

            if (status == 200) {
                LOG.debug("Succeed to get response - 200 OK");
                LOG.debug("Response Content - " + responseContent);

            } else if (status > 200 && status < 400) {
                LOG.warn("Normal response but unexpected - responseCode:" + status
                        + ", responseContent:" + responseContent);

            } else {
                LOG.warn("Got error response - responseCode:" + status + ", responseContent:"
                        + responseContent);

                switch (status) {
                    case 400:
                        LOG.error("Your request params is invalid. Please check them according to error message.");
                        wrapper.setErrorObject();
                        break;
                    case 401:
                        LOG.error("Authentication failed! Please check authentication params according to docs.");
                        wrapper.setErrorObject();
                        break;
                    case 403:
                        LOG.error("Request is forbidden! Maybe your is listed in blacklist?");
                        wrapper.setErrorObject();
                        break;
                    case 410:
                        LOG.error("Request resource is no longer in service. Please according to notice on official website.");
                        wrapper.setErrorObject();
                    case 429:
                        LOG.error("Too many requests! Please review your request quota.");
                        wrapper.setErrorObject();
                        break;
                    case 500:
                    case 502:
                    case 503:
                    case 504:
                        LOG.error("Seems encountered server error. Maybe is in maintenance? Please retry later.");
                        break;
                    default:
                        LOG.error("Unexpected response.");
                }

            }

        } catch (SocketTimeoutException e) {
            if (e.getMessage().contains(KEYWORDS_READ_TIMED_OUT)) {
                LOG.error(KEYWORDS_READ_TIMED_OUT, e);
            }
            wrapper.exceptionString = e.getMessage();

        } catch (IOException e) {
            LOG.error(KEYWORDS_CONNECT_TIMED_OUT, e);
            wrapper.exceptionString = e.getMessage();

        } finally {
            if (null != out) {
                try {
                    out.close();
                } catch (IOException e) {
                    LOG.error("Failed to close stream.", e);
                }
            }
            if (null != conn) {
                conn.disconnect();
            }
        }
        LOG.info(String.format("Send Response to - %s, Response Wrapper - %s", url, wrapper));
        return wrapper;
    }
}

3.截图预览

  下面给大家附上一张业务界面可视化的数据结果预览图,如下图所示:

  上图为我发送的测试数据,通过收集模块将我发送的数据收集并存储到 ES 集群,通过接口代码将这部分数据可视化到业务界面进行展示。

4.总结

  总体来说,ES 集群从搭建部署到编码实现都较为简单,在使用 JSON 字符串拼接查询时需要细心点,后续有时间可以为大家分享下 ES 的查询的效率,及其他方面的性能指标。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-09-30 12:12:17

ElasticSearch实战-编码实践的相关文章

Kafka项目实战-用户日志上报实时统计之编码实践

1.概述 本课程的视频教程地址:<Kafka实战项目之编码实践>  该课程我以用户实时上报日志案例为基础,带着大家去完成各个KPI的编码工作,实现生产模块.消费模块, 数据持久化,以及应用调度等工作, 通过对这一系列流程的演示,让大家能够去掌握Kafka项目的相关编码以及调度流程.下面,我们首先来预览本课程所包含的课时,他们分别有: 接下来,我们开始第一课时的学习:<数据生产实现> 2.内容 2.1 数据生产实现 本课时主要给大家演示Kafka数据生产的代码实现,在前面搭建好的集群

Hadoop项目实战-用户行为分析之编码实践

1.概述 本课程以用户行为分析案例为基础,带着大家去完成对各个KPI的编码工作,以及应用调度工作,让大家通过本课程掌握Hadoop项目的编码.调度流程.下面我们来看看本课程有哪些课时,如下图所示: 2.内容 2.1 Hadoop项目基础代码 本课时介绍编写Hadoop基础代码及脚本,在前面搭建好的Hadoop项目工程上, 完成项目的基本代码的编写,以及一些注意事项,为编写核心代码做准备,让大家掌握Hadoop项目的基础代码开发. 本课时主要包含以下知识点,如下图所示: 下面,我为大家介绍Hado

ElasticSearch实战-入门

1.概述 今天接着<ElasticSearch实战-日志监控平台>一文来给大家分享后续的学习,在<ElasticSearch实战-日志监控平台>中给大家介绍一个日志监控平台的架构方案,接下来给大家分享如何去搭建部署这样一个平台,给大家做一个入门介绍.下面是今天的分享目录: 搭建部署 Elastic 套件 运行集群 截图预览 下面开始今天的内容分享. 2.搭建部署 Elastic 套件 搭建 Elastic 套件较为简单,下面我们开始去搭建部署相关套件,首先我们准备必要的环境. 2.

ElasticSearch实战-日志监控平台

1.概述 在项目业务倍增的情况下,查询效率受到影响,这里我们经过讨论,引进了分布式搜索套件--ElasticSearch,通过分布式搜索来解决当下业务上存在的问题.下面给大家列出今天分析的目录: ElasticSearch 套件介绍 ElasticSearch 应用场景和案例 平台架构 下面开始今天的内容分享. 2.ElasticSearch 套件 2.1LogStash LogStash是一个开源的.免费的日志收集工具,属于Elastic家族的一员,负责将收集的日志信息输送到ElasticSe

程序设计者需要谨记的九大安全编码实践

历史已经证明,软件设计的缺陷一直是导致其被漏洞利用的最主要的罪魁祸首,安全专家发现,多数漏洞源自常见软件的相对有限的一些漏洞.软件开发者和设计者应当严格检查程序中的各种错误,尽量在 软件部署之前就减少或清除其中的漏洞.下面列示的这些方法有助于开发人员提高编码的安全性:一.注意编译器警告程序员应当使用编译器的最高警告等级.在编译过程中,应当修改程序中的错误,直到警告解除.应当使用静态和动态的分析工具来检测和清除安全缺陷.二.根据安全策略设置软件架构设计者应创建一个软件架构,并在设计软件的过程中实施

C、C++和Java安全编码实践提示与技巧

对于所有类型环境中的开发人员来说,安全性正成为一个越来越重要的主题,即便过去一直认为安全性不成问题的嵌入式系统也是如此.本文将介绍几种类型的编码漏洞,指出漏洞是什么.如何降低代码被攻击的风险.如何更好地找出代码中的此类缺陷. 注入攻击 通过将信息注入正在运行的流程,攻击者可以危害进程的运行状态,以反射到开发人员无法保护的某种最终目标.例如,攻击者可能会通过堆栈溢出(stack corruption)将代码注入进程,从而执行攻击者选定的代码.此外,攻击者也可能尝试将数据注入数据库,供将来使用:或将

开发人员应用软件安全编码的最佳实践

信息安全并不仅仅是保护计算机.网络.信息免受攻击和危害.应用软件缺乏安全性日益成为许多企业的一种巨大漏洞!无论是软件开发者,还是安全专家,在与攻击者斗争的过程中,永远不可能一劳永逸地解决安全问题.因为攻击者 往往富有创造性并且坚忍不拔,还有金钱的巨大诱惑.除了利用操作系统的漏洞之外,攻击者还喜欢利用应用软件的漏洞,而软件工程师们似乎 忽视了这一点.软件安全的目标是什么?是构建更好的.无缺陷的软件.一般情况下,应用软件都会存在许多缺陷,其中的相当一部分都成为安全问题的源头.在开发软件时,将安全性牢

Kafka实战-实时日志统计流程

1.概述 在<Kafka实战-简单示例> 一文中给大家介绍来Kafka的简单示例,演示了如何编写Kafka的代码去生产数据和消费数据,今天给大家介绍如何去整合一个完整的项目,本篇博客我打 算为大家介绍Flume+Kafka+Storm的实时日志统计,由于涉及的内容较多,这里先给大家梳理一个项目的运用这些技术的流程.下面是今天的内容 目录: 项目流程 Flume Kafka Storm 下面开始今天的内容分享. 2.项目流程 在整合这套方案的时候,项目组也是经过一番讨论,在讨论中,观点很多,有人

Kafka项目实战-用户日志上报实时统计之应用概述

1.概述 本课程的视频教程地址:<Kafka实战项目之应用概述> 本课程是通过一个用户实时上报日志来展开的,通过介绍 Kafka 的业务和应用场景,并带着大家搭建本 Kafka 项目的实战开发环境.下面我们来看看本课程有哪些课时,如下图所示: 接下来,我们开始第一课时的学习:<Kafka 回顾>. 2.内容 2.1 Kafka 回顾 本课时简述 Kafka 平台部署的注意事项,以及 Kafka 在企业中的业务场景和应用场景.让大家了解 Kafka 在企业中的使用. 本课时主要包含以