采集日志到Hive

我们现在的需求是需要将线上的日志以小时为单位采集并存储到 hive 数据库中,方便以后使用 mapreduce 或者 impala 做数据分析。为了实现这个目标调研了 flume 如何采集数据到 hive,其他的日志采集框架尚未做调研。

日志压缩

flume中有个 HdfsSink 组件,其可以压缩日志进行保存,故首先想到我们的日志应该以压缩的方式进行保存,遂选择了 lzo 的压缩格式,HdfsSink 的配置如下:

agent-1.sinks.sink_hdfs.channel = ch-1
agent-1.sinks.sink_hdfs.type = hdfs
agent-1.sinks.sink_hdfs.hdfs.path = hdfs://cdh1:8020/user/root/events/%Y-%m-%d
agent-1.sinks.sink_hdfs.hdfs.filePrefix = logs
agent-1.sinks.sink_hdfs.hdfs.inUsePrefix = .
agent-1.sinks.sink_hdfs.hdfs.rollInterval = 30
agent-1.sinks.sink_hdfs.hdfs.rollSize = 0
agent-1.sinks.sink_hdfs.hdfs.rollCount = 0
agent-1.sinks.sink_hdfs.hdfs.batchSize = 1000
agent-1.sinks.sink_hdfs.hdfs.fileType = CompressedStream
agent-1.sinks.sink_hdfs.hdfs.codeC = lzop

hive 目前是支持 lzo 压缩的,但是要想在 mapreduce 中 lzo 文件可以拆分,需要通过 hadoop 的 api 进行手动创建索引:

$ lzop a.txt
$ hadoop fs -put a.txt.lzo /log/dw_srclog/sp_visit_log/ptd_ymd=20140720
​$ hadoop jar /usr/lib/hadoop/lib/hadoop-lzo.jar com.hadoop.compression.lzo.LzoIndexer /log/sp_visit_log/ptd_ymd=20140720/a.txt.lzo

impala 目前也是在支持 lzo 压缩格式的文件的,故采用 lzo 压缩方式存储日志文件似乎是个可行方案。

自定义分隔符

Hive默认创建的表字段分隔符为:\001(ctrl-A),也可以通过 ROW FORMAT DELIMITED FIELDS TERMINATED BY 指定其他字符,但是该语法只支持单个字符。

目前,我们的日志中几乎任何单个字符都被使用了,故没法使用单个字符作为 hive 表字段的分隔符,只能使用多个字符,例如:“|||”。 使用多字符来分隔字段,则需要你自定义InputFormat来实现。

package org.apache.hadoop.mapred;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

public class MyDemoInputFormat extends TextInputFormat {

	@Override
	public RecordReader<LongWritable, Text> getRecordReader(
			InputSplit genericSplit, JobConf job, Reporter reporter)
			throws IOException {
		reporter.setStatus(genericSplit.toString());
		MyDemoRecordReader reader = new MyDemoRecordReader(
				new LineRecordReader(job, (FileSplit) genericSplit));
		return reader;
	}

	public static class MyDemoRecordReader implements
			RecordReader<LongWritable, Text> {

		LineRecordReader reader;
		Text text;

		public MyDemoRecordReader(LineRecordReader reader) {
			this.reader = reader;
			text = reader.createValue();
		}

		@Override
		public void close() throws IOException {
			reader.close();
		}

		@Override
		public LongWritable createKey() {
			return reader.createKey();
		}

		@Override
		public Text createValue() {
			return new Text();
		}

		@Override
		public long getPos() throws IOException {
			return reader.getPos();
		}

		@Override
		public float getProgress() throws IOException {
			return reader.getProgress();
		}

		@Override
		public boolean next(LongWritable key, Text value) throws IOException {
			Text txtReplace;
			while (reader.next(key, text)) {
				txtReplace = new Text();
				txtReplace.set(text.toString().toLowerCase().replaceAll("\\|\\|\\|", "\001"));
				value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
				return true;

			}
			return false;
		}
	}
}

这时候的建表语句是:

create external table IF NOT EXISTS  test(
id string,
name string
)partitioned by (day string)
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.MyDemoInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/log/dw_srclog/test';

但是,这样建表的话,是不能识别 lzo 压缩文件的,需要去扩展 lzo 的 DeprecatedLzoTextInputFormat 类,但是如何扩展,没有找到合适方法。

所以,在自定义分隔符的情况下,想支持 lzo 压缩文件,需要另外想办法。例如,使用 SERDE 的方式:

create external table IF NOT EXISTS  test(
id string,
name string
)partitioned by (day string)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES
( 'input.regex' = '([^ ]*)\\|\\|\\|([^ ]*)',
'output.format.string' = '%1$s %2$s')
STORED AS INPUTFORMAT
  'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/log/dw_srclog/test';

要想使用SERDE,必须添加 hive-contrib-XXXX.jar 到 classpath,在 hive-env.sh 中添加下面代码;

$ export HIVE_AUX_JARS_PATH=/usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.7.0.jar

注意:

  • 使用 SERDE 时,字段类型只能为 string。
  • 这种方式建表,flume 可以将日志存储为 lzo 并且 hive 能够识别出数据,但是 impala 中却不支持 SERDE 的语法,故只能放弃该方法。

最后,只能放弃 lzo 压缩文件的想法,改为不做压缩。flume 中 HdfsSink 配置参数 hdfs.fileType 目前只有三种可选值:CompressedStream 、DataStream、SequenceFile,为了保持向后兼容便于扩展,这里使用了 DataStream 的方式,不做数据压缩。

Update

注意:

最后又经过测试,发现 impala 不支持 hive 的自定义文件格式,详细说明请参考:SQL Differences Between Impala and Hive

日志采集

使用 flume 来采集日志,只需要在应用程序服务器上安装一个 agent 就可以监听文件或者目录的改变来搜集日志,但是实际情况你不一定有权限访问应用服务器,更多的方式是应用服务器将日志推送到一个中央的日志集中存储服务器。你只有权限去从该服务器收集数据,并且该服务器对外提供 ftp 的接口供你访问。

日志采集有 pull 和 push 的两种方式,关于两种方式的一些说明,可以参考这篇文章:大规模日志收集处理项目的技术总结

对于当前情况而言,只能从 ftp 服务器轮询文件然后下载文件到本地,最后再将其导入到 hive 中去。以前,使用 kettle 做过这种事情,现在为了简单只是写了个 python 脚本来做这件事情,一个示例代码,请参考 https://gist.github.com/javachen/6f7d14aae138c7a284e6#file-fetch-py

该脚本会再 crontab 中每隔5分钟执行一次。

执行该脚本会往 mongodb 中记录一些状态信息,并往 logs 目录以天为单位记录日志。

暂时没有使用 flume 的原因:

  1. 对 flume 的测试于调研程度还不够
  2. flume 中无法对数据去重
  3. 只能停止 flume 进程,才可以升级 flume,这样会丢失数据

等日志采集实时性要求变高,以及对 flume 的熟悉程度变深之后,会考虑使用 flume。

时间: 2024-08-25 01:35:59

采集日志到Hive的相关文章

【大数据新手上路】“零基础”系列课程--日志服务(Log Service)采集 ECS 日志数据到 MaxCompute

随着公司业务的增多,云服务器 ECS 上的日志数据越来越多,存储开销越来越大,受限于日志的大小和格式,分析的速度非常缓慢,导致海量数据在沉睡,不知道发挥作用,如何能将这些数据进行归集.提炼和智能化的处理始终是一个困扰.通过日志服务投递日志数据到MaxCompute便可以让用户按照不同的场景和需求.以不同的方式复用数据,充分发挥日志数据的价值. 使用日志服务投递日志数据到MaxCompute具有如下优势: 使用非常简单.用户只需要完成2步配置即可以把日志服务Logstore的日志数据迁移到MaxC

通过shell和redis来实现集群业务中日志的实时收集分析

在统计项目中,最难实施的就是日志数据的收集.日志分布在全国各个机房,而且数据量比较大,像rsync+inotify这种方式显然不能满足快速日志同步的要求. 当然大家也可以用fluentd和flume采集日志数据,除了这个我们也可以自己写一套简单的. 我写的这个日志分析系统 流程是: 在客户端收集数据,然后通过redis pub方式把数据发给服务端 2   服务器端是redis的sub    他会把数据统一存放在一个文件,或者当前就过滤出来 客户端收集日志的更新数据 #!/bin/bash DAT

一文看懂HIVE和HBASE的区别

两者分别是什么? Apache Hive是一个构建在hadoop基础设施之上的数据仓库.通过Hive可以使用HQL语言查询存放在HDFS上的数据.HQL是一种类SQL语言,这种语言最终被转化为Map/Reduce. 虽然Hive提供了SQL查询功能,但是Hive不能够进行交互查询–因为它只能够在Haoop上批量的执行Hadoop. Apache HBase是一种Key/Value系统,它运行在HDFS之上.和Hive不一样,Hbase的能够在它的数据库上实时运行,而不是运行MapReduce任务

通过日志服务调试分布式系统

为了解决分布式系统开发过程中调试和分析性能的问题,在过去阿里云产品的开发调试过程中,我们开发了Tracer工具.Tracer主要用来解决分布式系统执行过程关联问题,它的基本原理如下: 用户在关键程序的入口埋点以日志方式输出时间.Trace ID.上下文等信息.当用户请求经过埋点函数时,输出日志.通过以上步骤,我们就能通过关联同样的Trace ID来记录一个请求生命周期内访问多个进程的情况. 当携带Trace ID的请求每一次执行到这个宏,会留下这样一条日志: [2013-07-13 10:28:

模拟使用Flume监听日志变化,并且把增量的日志文件写入到hdfs中

1.采集日志文件时一个很常见的现象 采集需求:比如业务系统使用log4j生成日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs中. 1.1.根据需求,首先定义一下3大要素: 采集源,即source-监控日志文件内容更新:exec 'tail -F file' 下沉目标,即sink-HDFS文件系统:hdfs sink Source和sink之间的传递通道--channel,可用file channel也可以用 内存channel. 1.2.进入/home/tuzq/softw

中小企业运维需要重视日志分析

前言 如果把运维看做是医生给病人看病,日志则是病人对自己的陈述,很多时候医生需要通过对病人的描述得出病人状况,是否严重,需要什么计量的药,该用什么类型的药. 所以古人有句话叫做对症下药,这个"症"就是病人的描述加医生的判断,对于重一点的病再加上很多的化验.在医生看病时,病人描述的病情和化验单上的数据对医生的判断是非常重要的. 同理,日志在运维中的作用也是非常类似的,但很不幸,日志在很多中小企业运维中被严重低估,直到磁盘空间不足的时候才想到,磁盘里有个大的日志文件要把他删了,这样可以节省

日志归档与数据挖掘(日志中心)

日志归档与数据挖掘 http://netkiller.github.io/journal/log.html Mr. Neo Chen (陈景峰), netkiller, BG7NYT 中国广东省深圳市龙华新区民治街道溪山美地518131+86 13113668890+86 755 29812080<netkiller@msn.com> 版权 2013, 2014 Netkiller. All rights reserved. 版权声明 转载请与作者联系,转载时请务必标明文章原始出处和作者信息及

日志服务(原SLS)新功能发布(10)--Logtail配置支持日志转换、过滤

日志收集流程 对于日志收集的客户端,其work pipeline通常包括三个过程:Input,Process,Output. Input: 适配各类日志接入源,目前Logtail支持文本文件.Syslog(TCP流式)两种形式数据写入. Process:自定义日志处理逻辑,常见的有:日志切分.日志编码转换.日志结构化解析.日志过滤等等. Output:定义日志输出,例如Logtail以HTTP协议写数据到日志服务. 今天要介绍Logtail在日志处理阶段的两个新功能:转码.过滤. 日志转码 日志

Nginx+Logstash+Elasticsearch+Kibana搭建网站日志分析

前言   流程,nignx格式化日志成json,通过logstash直接采集到elasticsearch,然后通过kibana gui界面展示分析 要点nignx日志成json格式,避免nignx默认日志是空格,需要正则匹配,导致logstash占过多cpuelasticsearch机配置防火墙,只让指定的logstash机访问kibana只监听本地127.0.0.1使用nignx方向代理,nginx中配置Http Basic Auth账号密码登陆 比较粗略的笔记,备忘安装java um ins