官方文档参数解释:
需要注意:文件格式,fileType=DataStream 默认为SequenceFile,是hadoop的文件格式,改为DataStream就可直接读了(SqeuenceFile怎么用还不知道。。)
配置文件:hdfs.conf
a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.channels = c1a1.sources.r1.spoolDir = /usr/local/hadoop/apache-flume-1.6.0-bin/logsa1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.channel = c1a1.sinks.k1.hdfs.path = hdfs://node4:9000/user/flume/logs/%Y-%m-%d-%Ha1.sinks.k1.hdfs.filePrefix = Syslog#a1.sinks.k1.hdfs.fileSuffix = .log #设定后缀a1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = minute#--触发roll操作的文件大小in bytes (0: never roll based on file size)a1.sinks.k1.hdfs.rollSize = 128000000#--在roll操作之前写入文件的事件数量(0 = never roll based on number of events)a1.sinks.k1.hdfs.rollCount = 0#--文件格式:默认SequenceFile,可选 DataStream \ CompressedStreama1.sinks.k1.hdfs.fileType = DataStream #DataStream可以直接读出来#--Format for sequence file records. “Text” or “Writable” a1.sinks.k1.hdfs.writeFormat = Text#--使用local time来替换转移字符 (而不是使用event header的timestamp)a1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1 |
启动hadoop
启动flume:
./flume-ng agent -c . -f /usr/local/hadoop/apache-flume-1.6.0-bin/conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
在被监听的文件夹下生成日志文件:
for i in { 1000..2000}; do echo "test line $i" >> /usr/local/hadoop/apache-flume-1.6.0-bin/logs/spool_text$i.log; done;
查看hdfs: http://node4:50070