Flume中的HDFS Sink配置参数说明。
官方配置网址:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
type:hdfs
path:hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/flumedata/
filePrefix:默认值:FlumeData,写入hdfs的文件名前缀
fileSuffix:写入 hdfs 的文件名后缀,比如:.lzo .log等。
inUsePrefix:临时文件的文件名前缀,hdfs sink 会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件;
inUseSuffix:默认值:.tmp,临时文件的文件名后缀。
rollInterval:默认值:30:hdfs sink 间隔多长将临时文件滚动成最终目标文件,单位:秒;
如果设置成0,则表示不根据时间来滚动文件;
注:滚动(roll)指的是,hdfs sink 将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
rollSize:默认值:1024:当临时文件达到多少(单位:bytes)时,滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;
rollCount:默认值:10:当 events 数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件;
idleTimeout:默认值:0:当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;
batchSize:默认值:100:每个批次刷新到 HDFS 上的 events 数量;
codeC:文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
fileType:默认值:SequenceFile,文件格式,包括:SequenceFile, DataStream,CompressedStream
当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;
当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;
maxOpenFiles:默认值:5000:最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;
minBlockReplicas:默认值:HDFS副本数,写入 HDFS 文件块的最小副本数。
该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件。
writeFormat:写 sequence 文件的格式。包含:Text, Writable(默认)
callTimeout:默认值:10000,执行HDFS操作的超时时间(单位:毫秒);
threadsPoolSize:默认值:10,hdfs sink 启动的操作HDFS的线程数。
rollTimerPoolSize:默认值:1,hdfs sink 启动的根据时间滚动文件的线程数。
kerberosPrincipal:HDFS安全认证kerberos配置;
kerberosKeytab:HDFS安全认证kerberos配置;
proxyUser:代理用户
round:默认值:false,是否启用时间上的”舍弃”;
roundValue:默认值:1,时间上进行“舍弃”的值;
roundUnit:默认值:seconds,时间上进行”舍弃”的单位,包含:second,minute,hour
示例:
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:/flume/events/20151016/17:30/00
因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。
timeZone:默认值:Local Time,时区。
useLocalTimeStamp:默认值:flase,是否使用当地时间。
closeTries:默认值:0,hdfs sink 关闭文件的尝试次数;
如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。
设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。
retryInterval:默认值:180(秒),hdfs sink 尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.
serializer:默认值:TEXT,序列化类型。
比如:
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/data/%Y%m%d
agent1.sinks.sink1.hdfs.filePrefix = log_%Y%m%d_%H
agent1.sinks.sink1.hdfs.fileSuffix = .lzo
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.fileType = CompressedStream
agent1.sinks.sink1.hdfs.rollCount = 0
agent1.sinks.sink1.hdfs.rollSize = 0
agent1.sinks.sink1.hdfs.rollInterval = 600
agent1.sinks.sink1.hdfs.codeC = lzop
agent1.sinks.sink1.hdfs.batchSize = 100
agent1.sinks.sink1.hdfs.threadsPoolSize = 10
agent1.sinks.sink1.hdfs.idleTimeout = 0
agent1.sinks.sink1.hdfs.minBlockReplicas = 1
上面的配置中,在 HDFS 的 /tmp/data/ 目录下,每天生成一个格式为20151016的目录,目标文件每10分钟生成一个,目标文件格式为:log_20151016_13.1444973768543.lzo目标文件采用lzo压缩。
附上测试用的telnet.py代码:
import time
import telnetlib
if __name__ = '__main__':
host = 'localhost'
port = 44444
tn = telnetlib.Telnet(host=host,port=port)
for i in range(10000):
print i
tn.write(str(i) + '\n')
time.sleep(1)