flume学习(六):使用hive来分析flume收集的日志数据

前面已经讲过如何将log4j的日志输出到指定的hdfs目录,我们前面的指定目录为/flume/events。

如果想用hive来分析采集来的日志,我们可以将/flume/events下面的日志数据都load到hive中的表当中去。

如果了解hive的load data原理的话,还有一种更简便的方式,可以省去load data这一步,就是直接将sink1.hdfs.path指定为hive表的目录。

下面我将详细描述具体的操作步骤。

我们还是从需求驱动来讲解,前面我们采集的数据,都是接口的访问日志数据,数据格式是JSON格式如下:

{“requestTime”:1405651379758,”requestParams”:{“timestamp”:1405651377211,”phone”:”02038824941″,”cardName”:”测试商家名称”,”provinceCode”:”440000″,”cityCode”:”440106″},”requestUrl”:”/reporter-api/reporter/reporter12/init.do”}

现在有一个需求,我们要统计接口的总调用量。

我第一想法就是,hive中建一张表:test             然后将hdfs.path指定为tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test

然后select  count(*) from test;   完事。

这个方案简单,粗暴,先这么干着。于是会遇到一个问题,我的日志数据时JSON格式的,需要hive来序列化和反序列化JSON格式的数据到test表的具体字段当中去。

这有点糟糕,因为hive本身没有提供JSON的SERDE,但是有提供函数来解析JSON字符串,

第一个是(UDF):

get_json_object(string json_string,string path) 从给定路径上的JSON字符串中抽取出JSON对象,并返回这个对象的JSON字符串形式,如果输入的JSON字符串是非法的,则返回NULL。

第二个是表生成函数(UDTF):json_tuple(string jsonstr,p1,p2,…,pn) 本函数可以接受多个标签名称,对输入的JSON字符串进行处理,这个和get_json_object这个UDF类似,不过更高效,其通过一次调用就可以获得多个键值,例:select b.* from test_json a lateral view json_tuple(a.id,’id’,’name’) b as f1,f2;通过lateral view行转列。

最理想的方式就是能有一种JSON SERDE,只要我们LOAD完数据,就直接可以select * from test,而不是select get_json_object这种方式来获取,N个字段就要解析N次,效率太低了。

好在cloudrea wiki里提供了一个json serde类(这个类没有在发行的hive的jar包中),于是我把它搬来了,如下:

[java] view plaincopy

  1. package com.besttone.hive.serde;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.Arrays;  
  5. import java.util.HashMap;  
  6. import java.util.List;  
  7. import java.util.Map;  
  8. import java.util.Properties;  
  9.   
  10. import org.apache.hadoop.conf.Configuration;  
  11. import org.apache.hadoop.hive.serde.serdeConstants;  
  12. import org.apache.hadoop.hive.serde2.SerDe;  
  13. import org.apache.hadoop.hive.serde2.SerDeException;  
  14. import org.apache.hadoop.hive.serde2.SerDeStats;  
  15. import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;  
  16. import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;  
  17. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;  
  18. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;  
  19. import org.apache.hadoop.hive.serde2.objectinspector.StructField;  
  20. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;  
  21. import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;  
  22. import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;  
  23. import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;  
  24. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;  
  25. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;  
  26. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;  
  27. import org.apache.hadoop.io.Text;  
  28. import org.apache.hadoop.io.Writable;  
  29. import org.codehaus.jackson.map.ObjectMapper;  
  30.   
  31. /** 
  32.  * This SerDe can be used for processing JSON data in Hive. It supports 
  33.  * arbitrary JSON data, and can handle all Hive types except for UNION. However, 
  34.  * the JSON data is expected to be a series of discrete records, rather than a 
  35.  * JSON array of objects. 
  36.  *  
  37.  * The Hive table is expected to contain columns with names corresponding to 
  38.  * fields in the JSON data, but it is not necessary for every JSON field to have 
  39.  * a corresponding Hive column. Those JSON fields will be ignored during 
  40.  * queries. 
  41.  *  
  42.  * Example: 
  43.  *  
  44.  * { “a”: 1, “b”: [ "str1", "str2" ], “c”: { “field1″: “val1″ } } 
  45.  *  
  46.  * Could correspond to a table: 
  47.  *  
  48.  * CREATE TABLE foo (a INT, b ARRAY<STRING>, c STRUCT<field1:STRING>); 
  49.  *  
  50.  * JSON objects can also interpreted as a Hive MAP type, so long as the keys and 
  51.  * values in the JSON object are all of the appropriate types. For example, in 
  52.  * the JSON above, another valid table declaraction would be: 
  53.  *  
  54.  * CREATE TABLE foo (a INT, b ARRAY<STRING>, c MAP<STRING,STRING>); 
  55.  *  
  56.  * Only STRING keys are supported for Hive MAPs. 
  57.  */  
  58. public class JSONSerDe implements SerDe {  
  59.   
  60.     private StructTypeInfo rowTypeInfo;  
  61.     private ObjectInspector rowOI;  
  62.     private List<String> colNames;  
  63.     private List<Object> row = new ArrayList<Object>();  
  64.   
  65.     //遇到非JSON格式输入的时候的处理。  
  66.     private boolean ignoreInvalidInput;  
  67.   
  68.     /** 
  69.      * An initialization function used to gather information about the table. 
  70.      * Typically, a SerDe implementation will be interested in the list of 
  71.      * column names and their types. That information will be used to help 
  72.      * perform actual serialization and deserialization of data. 
  73.      */  
  74.     @Override  
  75.     public void initialize(Configuration conf, Properties tbl)  
  76.             throws SerDeException {  
  77.         // 遇到无法转换成JSON对象的字符串时,是否忽略,默认不忽略,抛出异常,设置为true将跳过异常。  
  78.         ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(  
  79.                 “input.invalid.ignore”, “false”));  
  80.   
  81.         // Get a list of the table’s column names.  
  82.   
  83.         String colNamesStr = tbl.getProperty(serdeConstants.LIST_COLUMNS);  
  84.         colNames = Arrays.asList(colNamesStr.split(“,”));  
  85.   
  86.         // Get a list of TypeInfos for the columns. This list lines up with  
  87.         // the list of column names.  
  88.         String colTypesStr = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);  
  89.         List<TypeInfo> colTypes = TypeInfoUtils  
  90.                 .getTypeInfosFromTypeString(colTypesStr);  
  91.   
  92.         rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(  
  93.                 colNames, colTypes);  
  94.         rowOI = TypeInfoUtils  
  95.                 .getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);  
  96.     }  
  97.   
  98.     /** 
  99.      * This method does the work of deserializing a record into Java objects 
  100.      * that Hive can work with via the ObjectInspector interface. For this 
  101.      * SerDe, the blob that is passed in is a JSON string, and the Jackson JSON 
  102.      * parser is being used to translate the string into Java objects. 
  103.      *  
  104.      * The JSON deserialization works by taking the column names in the Hive 
  105.      * table, and looking up those fields in the parsed JSON object. If the 
  106.      * value of the field is not a primitive, the object is parsed further. 
  107.      */  
  108.     @Override  
  109.     public Object deserialize(Writable blob) throws SerDeException {  
  110.         Map<?, ?> root = null;  
  111.         row.clear();  
  112.         try {  
  113.             ObjectMapper mapper = new ObjectMapper();  
  114.             // This is really a Map<String, Object>. For more information about  
  115.             // how  
  116.             // Jackson parses JSON in this example, see  
  117.             // http://wiki.fasterxml.com/JacksonDataBinding  
  118.             root = mapper.readValue(blob.toString(), Map.class);  
  119.         } catch (Exception e) {  
  120.             // 如果为true,不抛出异常,忽略该行数据  
  121.             if (!ignoreInvalidInput)  
  122.                 throw new SerDeException(e);  
  123.             else {  
  124.                 return null;  
  125.             }  
  126.               
  127.         }  
  128.   
  129.         // Lowercase the keys as expected by hive  
  130.         Map<String, Object> lowerRoot = new HashMap();  
  131.         for (Map.Entry entry : root.entrySet()) {  
  132.             lowerRoot.put(((String) entry.getKey()).toLowerCase(),  
  133.                     entry.getValue());  
  134.         }  
  135.         root = lowerRoot;  
  136.   
  137.         Object value = null;  
  138.         for (String fieldName : rowTypeInfo.getAllStructFieldNames()) {  
  139.             try {  
  140.                 TypeInfo fieldTypeInfo = rowTypeInfo  
  141.                         .getStructFieldTypeInfo(fieldName);  
  142.                 value = parseField(root.get(fieldName), fieldTypeInfo);  
  143.             } catch (Exception e) {  
  144.                 value = null;  
  145.             }  
  146.             row.add(value);  
  147.         }  
  148.         return row;  
  149.     }  
  150.   
  151.     /** 
  152.      * Parses a JSON object according to the Hive column’s type. 
  153.      *  
  154.      * @param field 
  155.      *            – The JSON object to parse 
  156.      * @param fieldTypeInfo 
  157.      *            – Metadata about the Hive column 
  158.      * @return – The parsed value of the field 
  159.      */  
  160.     private Object parseField(Object field, TypeInfo fieldTypeInfo) {  
  161.         switch (fieldTypeInfo.getCategory()) {  
  162.         case PRIMITIVE:  
  163.             // Jackson will return the right thing in this case, so just return  
  164.             // the object  
  165.             if (field instanceof String) {  
  166.                 field = field.toString().replaceAll(“\n”, “\\\\n”);  
  167.             }  
  168.             return field;  
  169.         case LIST:  
  170.             return parseList(field, (ListTypeInfo) fieldTypeInfo);  
  171.         case MAP:  
  172.             return parseMap(field, (MapTypeInfo) fieldTypeInfo);  
  173.         case STRUCT:  
  174.             return parseStruct(field, (StructTypeInfo) fieldTypeInfo);  
  175.         case UNION:  
  176.             // Unsupported by JSON  
  177.         default:  
  178.             return null;  
  179.         }  
  180.     }  
  181.   
  182.     /** 
  183.      * Parses a JSON object and its fields. The Hive metadata is used to 
  184.      * determine how to parse the object fields. 
  185.      *  
  186.      * @param field 
  187.      *            – The JSON object to parse 
  188.      * @param fieldTypeInfo 
  189.      *            – Metadata about the Hive column 
  190.      * @return – A map representing the object and its fields 
  191.      */  
  192.     private Object parseStruct(Object field, StructTypeInfo fieldTypeInfo) {  
  193.         Map<Object, Object> map = (Map<Object, Object>) field;  
  194.         ArrayList<TypeInfo> structTypes = fieldTypeInfo  
  195.                 .getAllStructFieldTypeInfos();  
  196.         ArrayList<String> structNames = fieldTypeInfo.getAllStructFieldNames();  
  197.   
  198.         List<Object> structRow = new ArrayList<Object>(structTypes.size());  
  199.         for (int i = 0; i < structNames.size(); i++) {  
  200.             structRow.add(parseField(map.get(structNames.get(i)),  
  201.                     structTypes.get(i)));  
  202.         }  
  203.         return structRow;  
  204.     }  
  205.   
  206.     /** 
  207.      * Parse a JSON list and its elements. This uses the Hive metadata for the 
  208.      * list elements to determine how to parse the elements. 
  209.      *  
  210.      * @param field 
  211.      *            – The JSON list to parse 
  212.      * @param fieldTypeInfo 
  213.      *            – Metadata about the Hive column 
  214.      * @return – A list of the parsed elements 
  215.      */  
  216.     private Object parseList(Object field, ListTypeInfo fieldTypeInfo) {  
  217.         ArrayList<Object> list = (ArrayList<Object>) field;  
  218.         TypeInfo elemTypeInfo = fieldTypeInfo.getListElementTypeInfo();  
  219.   
  220.         for (int i = 0; i < list.size(); i++) {  
  221.             list.set(i, parseField(list.get(i), elemTypeInfo));  
  222.         }  
  223.   
  224.         return list.toArray();  
  225.     }  
  226.   
  227.     /** 
  228.      * Parse a JSON object as a map. This uses the Hive metadata for the map 
  229.      * values to determine how to parse the values. The map is assumed to have a 
  230.      * string for a key. 
  231.      *  
  232.      * @param field 
  233.      *            – The JSON list to parse 
  234.      * @param fieldTypeInfo 
  235.      *            – Metadata about the Hive column 
  236.      * @return 
  237.      */  
  238.     private Object parseMap(Object field, MapTypeInfo fieldTypeInfo) {  
  239.         Map<Object, Object> map = (Map<Object, Object>) field;  
  240.         TypeInfo valueTypeInfo = fieldTypeInfo.getMapValueTypeInfo();  
  241.   
  242.         for (Map.Entry<Object, Object> entry : map.entrySet()) {  
  243.             map.put(entry.getKey(), parseField(entry.getValue(), valueTypeInfo));  
  244.         }  
  245.         return map;  
  246.     }  
  247.   
  248.     /** 
  249.      * Return an ObjectInspector for the row of data 
  250.      */  
  251.     @Override  
  252.     public ObjectInspector getObjectInspector() throws SerDeException {  
  253.         return rowOI;  
  254.     }  
  255.   
  256.     /** 
  257.      * Unimplemented 
  258.      */  
  259.     @Override  
  260.     public SerDeStats getSerDeStats() {  
  261.         return null;  
  262.     }  
  263.   
  264.     /** 
  265.      * JSON is just a textual representation, so our serialized class is just 
  266.      * Text. 
  267.      */  
  268.     @Override  
  269.     public Class<? extends Writable> getSerializedClass() {  
  270.         return Text.class;  
  271.     }  
  272.   
  273.     /** 
  274.      * This method takes an object representing a row of data from Hive, and 
  275.      * uses the ObjectInspector to get the data for each column and serialize 
  276.      * it. This implementation deparses the row into an object that Jackson can 
  277.      * easily serialize into a JSON blob. 
  278.      */  
  279.     @Override  
  280.     public Writable serialize(Object obj, ObjectInspector oi)  
  281.             throws SerDeException {  
  282.         Object deparsedObj = deparseRow(obj, oi);  
  283.         ObjectMapper mapper = new ObjectMapper();  
  284.         try {  
  285.             // Let Jackson do the work of serializing the object  
  286.             return new Text(mapper.writeValueAsString(deparsedObj));  
  287.         } catch (Exception e) {  
  288.             throw new SerDeException(e);  
  289.         }  
  290.     }  
  291.   
  292.     /** 
  293.      * Deparse a Hive object into a Jackson-serializable object. This uses the 
  294.      * ObjectInspector to extract the column data. 
  295.      *  
  296.      * @param obj 
  297.      *            – Hive object to deparse 
  298.      * @param oi 
  299.      *            – ObjectInspector for the object 
  300.      * @return – A deparsed object 
  301.      */  
  302.     private Object deparseObject(Object obj, ObjectInspector oi) {  
  303.         switch (oi.getCategory()) {  
  304.         case LIST:  
  305.             return deparseList(obj, (ListObjectInspector) oi);  
  306.         case MAP:  
  307.             return deparseMap(obj, (MapObjectInspector) oi);  
  308.         case PRIMITIVE:  
  309.             return deparsePrimitive(obj, (PrimitiveObjectInspector) oi);  
  310.         case STRUCT:  
  311.             return deparseStruct(obj, (StructObjectInspector) oi, false);  
  312.         case UNION:  
  313.             // Unsupported by JSON  
  314.         default:  
  315.             return null;  
  316.         }  
  317.     }  
  318.   
  319.     /** 
  320.      * Deparses a row of data. We have to treat this one differently from other 
  321.      * structs, because the field names for the root object do not match the 
  322.      * column names for the Hive table. 
  323.      *  
  324.      * @param obj 
  325.      *            – Object representing the top-level row 
  326.      * @param structOI 
  327.      *            – ObjectInspector for the row 
  328.      * @return – A deparsed row of data 
  329.      */  
  330.     private Object deparseRow(Object obj, ObjectInspector structOI) {  
  331.         return deparseStruct(obj, (StructObjectInspector) structOI, true);  
  332.     }  
  333.   
  334.     /** 
  335.      * Deparses struct data into a serializable JSON object. 
  336.      *  
  337.      * @param obj 
  338.      *            – Hive struct data 
  339.      * @param structOI 
  340.      *            – ObjectInspector for the struct 
  341.      * @param isRow 
  342.      *            – Whether or not this struct represents a top-level row 
  343.      * @return – A deparsed struct 
  344.      */  
  345.     private Object deparseStruct(Object obj, StructObjectInspector structOI,  
  346.             boolean isRow) {  
  347.         Map<Object, Object> struct = new HashMap<Object, Object>();  
  348.         List<? extends StructField> fields = structOI.getAllStructFieldRefs();  
  349.         for (int i = 0; i < fields.size(); i++) {  
  350.             StructField field = fields.get(i);  
  351.             // The top-level row object is treated slightly differently from  
  352.             // other  
  353.             // structs, because the field names for the row do not correctly  
  354.             // reflect  
  355.             // the Hive column names. For lower-level structs, we can get the  
  356.             // field  
  357.             // name from the associated StructField object.  
  358.             String fieldName = isRow ? colNames.get(i) : field.getFieldName();  
  359.             ObjectInspector fieldOI = field.getFieldObjectInspector();  
  360.             Object fieldObj = structOI.getStructFieldData(obj, field);  
  361.             struct.put(fieldName, deparseObject(fieldObj, fieldOI));  
  362.         }  
  363.         return struct;  
  364.     }  
  365.   
  366.     /** 
  367.      * Deparses a primitive type. 
  368.      *  
  369.      * @param obj 
  370.      *            – Hive object to deparse 
  371.      * @param oi 
  372.      *            – ObjectInspector for the object 
  373.      * @return – A deparsed object 
  374.      */  
  375.     private Object deparsePrimitive(Object obj, PrimitiveObjectInspector primOI) {  
  376.         return primOI.getPrimitiveJavaObject(obj);  
  377.     }  
  378.   
  379.     private Object deparseMap(Object obj, MapObjectInspector mapOI) {  
  380.         Map<Object, Object> map = new HashMap<Object, Object>();  
  381.         ObjectInspector mapValOI = mapOI.getMapValueObjectInspector();  
  382.         Map<?, ?> fields = mapOI.getMap(obj);  
  383.         for (Map.Entry<?, ?> field : fields.entrySet()) {  
  384.             Object fieldName = field.getKey();  
  385.             Object fieldObj = field.getValue();  
  386.             map.put(fieldName, deparseObject(fieldObj, mapValOI));  
  387.         }  
  388.         return map;  
  389.     }  
  390.   
  391.     /** 
  392.      * Deparses a list and its elements. 
  393.      *  
  394.      * @param obj 
  395.      *            – Hive object to deparse 
  396.      * @param oi 
  397.      *            – ObjectInspector for the object 
  398.      * @return – A deparsed object 
  399.      */  
  400.     private Object deparseList(Object obj, ListObjectInspector listOI) {  
  401.         List<Object> list = new ArrayList<Object>();  
  402.         List<?> field = listOI.getList(obj);  
  403.         ObjectInspector elemOI = listOI.getListElementObjectInspector();  
  404.         for (Object elem : field) {  
  405.             list.add(deparseObject(elem, elemOI));  
  406.         }  
  407.         return list;  
  408.     }  
  409. }  

我稍微修改了一点东西,多加了一个参数input.invalid.ignore,对应的变量为:

//遇到非JSON格式输入的时候的处理。
private boolean ignoreInvalidInput;

在deserialize方法中原来是如果传入的是非JSON格式字符串的话,直接抛出了SerDeException,我加了一个参数来控制它是否抛出异常,在initialize方法中初始化这个变量(默认为false):

// 遇到无法转换成JSON对象的字符串时,是否忽略,默认不忽略,抛出异常,设置为true将跳过异常。
ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(
“input.invalid.ignore”, “false”));

好的,现在将这个类打成JAR包: JSONSerDe.jar,放在hive_home的auxlib目录下(我的是/etc/hive/auxlib),然后修改hive-env.sh,添加HIVE_AUX_JARS_PATH=/etc/hive/auxlib/JSONSerDe.jar,这样每次运行hive客户端的时候都会将这个jar包添加到classpath,否则在设置SERDE的时候会报找不到类。

现在我们在HIVE中创建一张表用来存放日志数据:

[plain] view plaincopy

  1. create table test(  
  2. requestTime BIGINT,  
  3. requestParams STRUCT<timestamp:BIGINT,phone:STRING,cardName:STRING,provinceCode:STRING,cityCode:STRING>,    
  4. requestUrl STRING)  
  5.  row format serde “com.besttone.hive.serde.JSONSerDe”   
  6.  WITH SERDEPROPERTIES(  
  7.  “input.invalid.ignore”=”true”,  
  8.  “requestTime”=”$.requestTime”,  
  9.  “requestParams.timestamp”=”$.requestParams.timestamp”,  
  10.  “requestParams.phone”=”$.requestParams.phone”,  
  11.  “requestParams.cardName”=”$.requestParams.cardName”,  
  12.  “requestParams.provinceCode”=”$.requestParams.provinceCode”,  
  13.  “requestParams.cityCode”=”$.requestParams.cityCode”,  
  14.  “requestUrl”=”$.requestUrl”);  

这个表结构就是按照日志格式设计的,还记得前面说过的日志数据如下:

{“requestTime”:1405651379758,”requestParams”:{“timestamp”:1405651377211,”phone”:”02038824941″,”cardName”:”测试商家名称”,”provinceCode”:”440000″,”cityCode”:”440106″},”requestUrl”:”/reporter-api/reporter/reporter12/init.do”}

我使用了一个STRUCT类型来保存requestParams的值,row format我们用的是自定义的json serde:com.besttone.hive.serde.JSONSerDe,SERDEPROPERTIES中,除了设置JSON对象的映射关系外,我还设置了一个自定义的参数:”input.invalid.ignore”=”true”,忽略掉所有非JSON格式的输入行。这里不是真正意义的忽略,只是非法行的每个输出字段都为NULL了,要在结果集上忽略,必须这样写:select * from test where requestUrl is not null;

OK表建好了,现在就差数据了,我们启动flumedemo的WriteLog,往hive表test目录下面输出一些日志数据,然后在进入hive客户端,select * from test;所以字段都正确的解析,大功告成。

flume.conf如下:

[plain] view plaincopy

  1. tier1.sources=source1  
  2. tier1.channels=channel1  
  3. tier1.sinks=sink1  
  4.   
  5. tier1.sources.source1.type=avro  
  6. tier1.sources.source1.bind=0.0.0.0  
  7. tier1.sources.source1.port=44444  
  8. tier1.sources.source1.channels=channel1  
  9.   
  10. tier1.sources.source1.interceptors=i1 i2  
  11. tier1.sources.source1.interceptors.i1.type=regex_filter  
  12. tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  
  13. tier1.sources.source1.interceptors.i2.type=timestamp  
  14.   
  15. tier1.channels.channel1.type=memory  
  16. tier1.channels.channel1.capacity=10000  
  17. tier1.channels.channel1.transactionCapacity=1000  
  18. tier1.channels.channel1.keep-alive=30  
  19.   
  20. tier1.sinks.sink1.type=hdfs  
  21. tier1.sinks.sink1.channel=channel1  
  22. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test  
  23. tier1.sinks.sink1.hdfs.fileType=DataStream  
  24. tier1.sinks.sink1.hdfs.writeFormat=Text  
  25. tier1.sinks.sink1.hdfs.rollInterval=0  
  26. tier1.sinks.sink1.hdfs.rollSize=10240  
  27. tier1.sinks.sink1.hdfs.rollCount=0  
  28. tier1.sinks.sink1.hdfs.idleTimeout=60  

besttone.db是我在hive中创建的数据库,了解hive的应该理解没多大问题。

OK,到这篇文章为止,整个从LOG4J生产日志,到flume收集日志,再到用hive离线分析日志,一整套流水线都讲解完了。

时间: 2024-10-03 21:19:53

flume学习(六):使用hive来分析flume收集的日志数据的相关文章

【大数据新手上路】“零基础”系列课程--Flume收集网站日志数据到MaxCompute

免费开通大数据服务:https://www.aliyun.com/product/odps 概述:大数据时代,谁掌握了足够的数据,谁就有可能掌握未来,而其中的数据采集就是将来的流动资产积累. 任何规模的企业,每时每刻都在产生大量的数据,但这些数据如何归集.提炼始终是一个困扰.而大数据技术的意义确实不在于掌握规模庞大的数据信息,而在于对这些数据进行智能处理,从中分析和挖掘出有价值的信息,但前提是如何获取大量有价值的数据. 相信很多做过网站管理的人对网站访问日志(Access Log)应该不会陌生,

《日志管理与分析权威指南》一1.2.2 日志数据是如何传输和收集的

1.2.2 日志数据是如何传输和收集的 日志数据的传输和收集在概念上非常简单.计算机或者其他设备都实现了日志记录子系统,能够在确定有必要的时候生成日志消息,具体的确定方式取决于设备.例如,你可以选择对设备进行配置,设备也可能本身进行了硬编码,生成一系列预设消息.另一方面,你必须有一个用来接收和收集日志消息的地方.这个地方一般被称为日志主机(loghost).日志主机是一个计算机系统,一般来说可能是Unix系统或者Windows服务器系统,它是集中收集日志消息的地方.使用集中日志收集器的优点如下:

flume学习(一):log4j直接输出日志到flume

log4j.properties配置: log4j.rootLogger=INFOlog4j.category.com.besttone=INFO,flumelog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppenderlog4j.appender.flume.Hostname = localhostlog4j.appender.flume.Port = 44444 log4j.appender.flume.

flume学习(五):Flume Channel Selectors使用

前几篇文章只有一个项目的日志,现在我们考虑多个项目的日志的收集,我拷贝了一份flumedemo项目,重命名为flumedemo2,添加了一个WriteLog2.java类,稍微改动了一下JSON字符串的输出,将以前requestUrl中的"reporter-api"改为了"image-api",以便和WriteLog类的输出稍微区分开来,如下: [java] view plaincopy package com.besttone.flume;      import

flume学习(四):Flume Interceptors的使用

对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的.也即在日志进入到source之前,对日志进行一些包装.清新过滤等等动作. 官方上提供的已有的拦截器有: Timestamp Interceptor Host Interceptor Static Interceptor Regex Filtering Interceptor Regex Extractor Interceptor 像很多java的开源项目如springmvc中的拦截器一

5W1H(六何分析法)全景洞察大数据

引言 5W1H(WWWWWH)分析法也叫六何分析法,是一种思考方法,也可以说是一种创造技法.我们也对大数据问些问题,相信这也是很多中小企业面临的现实问题.大数据这个词也是从12年开始慢慢热起来的,经过4年的发展,如今,很多企业已经开始有自己的大数据平台,但是对于更多的企业是没有的. 笔者也在成都的云栖大会分享了笔者的一些思考与总结,由于后续没有录像放出来,应一些朋友.同学.用户的要求,笔者直接文字写出来.文字都是笔者经过推敲写出来的,肯定要比现场讲的思路更加缜密. 最后会涉及到怎么做,如果对前面

基于Apache Flume Datahub插件将日志数据同步上云

本文用到的 阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps 简介 Apache Flume是一个分布式的.可靠的.可用的系统,可用于从不同的数据源中高效地收集.聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件.本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub. 环境要求 JDK (1.7及以上,推荐1.7) Flume-NG 1.

在HDInsight中开始使用Hadoop与Hive来分析移动手机使用

在HDInsight中开始使用Hadoop与Hive来分析移动手机使用 为了能让你迅速上手使用HDInsight,本教程将向您介绍如何运行一个查询Hive提取的Hadoop集群,从非结构化数据的有意义的信息.然后,你将分析结果在Microsoft Excel中. 注意:如果你是新的Hadoop和大数据,你可以阅读更多有关条款的Apache Hadoop,MapReduce,HDFS和Hive.要了解HDInsight如何使Hadoop的在Azure中,看HDInsight Hadoop的介绍.

测试并发应用(六)用 FindBugs 分析并发代码

声明:本文是< Java 7 Concurrency Cookbook>的第八章, 作者: Javier Fernández González 译者:郑玉婷 用 FindBugs 分析并发代码 静态代码分析工具是一套通过分析应用源代码来查找潜在异常的工具.这些工具,例如 Checkstyle, PMD, 或者 FindBugs,他们有定义极好的实践(good practices) 规则,然后解析源代码来查找有没有违反这些规则.目的是在产品运行之前,更早的找到异常或者修改较差性能的代码.各种编程