1.6 Flume实战案例

日志的采集和汇总

案例场景

A、B 两台日志服务机器实时生产日志主要类型为 access.log、nginx.log、web.log

现在要求:

把 A、B 机器中的 access.log、nginx.log、web.log 采集汇总到 C 机器上

然后统一收集到 hdfs 中。

但是在 hdfs 中要求的目录为:

数据放置到hdfs环境中的对应时间的目录上

/source/logs/access/20160101/**
/source/logs/nginx/20160101/**
/source/logs/web/20160101/**

关键步骤:利用拦截器在数据中添加标识,进行数据的区分

拦截器:interpreter,可以在传输的数据中添加header(key,value)形式,在数据接收方,可以通过获取key对应的value来处理数据。

场景分析

1540349678122

数据流程处理分析

1540349692086

功能实现

① 在服务器 A 和服务器 B 上

创建配置文件 exec-memory-avro.conf

# Name the components on this agent 
exec-memory-avro.sources = access-source nginx-source web-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# 定义source
exec-memory-avro.sources.access-source.type = exec
exec-memory-avro.sources.access-source.command = tail -F /root/bigdata/data/access.log
#定义插入器,用于定义插入器
exec-memory-avro.sources.access-source.interceptors = i1
exec-memory-avro.sources.access-source.interceptors.i1.type = static
# static 拦截器的功能就是往采集到的数据的 header 中插入自己定义的 key-value 对
# 定义插入器的细节:type(类型),key(插入器的key),value(key对应的值)
exec-memory-avro.sources.access-source.interceptors.i1.key = type
exec-memory-avro.sources.access-source.interceptors.i1.value = access

#定义source
exec-memory-avro.sources.nginx-source.type = exec
exec-memory-avro.sources.nginx-source.command = tail -F /root/bigdata/data/nginx.log
#定义插入器,用于定义插入器
exec-memory-avro.sources.nginx-source.interceptors = i2
#定义插入器的细节:type(类型),key(插入器的key),value(key对应的值)
exec-memory-avro.sources.nginx-source.interceptors.i2.type = static
exec-memory-avro.sources.nginx-source.interceptors.i2.key = type
exec-memory-avro.sources.nginx-source.interceptors.i2.value = nginx

#定义source,思路同上,但是此时插入器的key对应的value发生的变化
exec-memory-avro.sources.web-source.type = exec
exec-memory-avro.sources.web-source.command = tail -F /root/bigdata/data/web.log
exec-memory-avro.sources.web-source.interceptors = i3
exec-memory-avro.sources.web-source.interceptors.i3.type = static
exec-memory-avro.sources.web-source.interceptors.i3.key = type
exec-memory-avro.sources.web-source.interceptors.i3.value = web


#定义sink,type为avro,绑定到node-teach1的44444端口上 
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = node-teach1
exec-memory-avro.sinks.avro-sink.port = 44444

#定义channel 
exec-memory-avro.channels.memory-channel.type = memory
#定义容量,为了缓存的最大数据量
exec-memory-avro.channels.memory-channel.capacity = 20000
#定义交互式容量,为了正在处理的数据量
exec-memory-avro.channels.memory-channel.transactionCapacity = 10000

#定义source/sink和channel的关联
exec-memory-avro.sources.access-source.channels = memory-channel
exec-memory-avro.sources.nginx-source.channels = memory-channel
exec-memory-avro.sources.web-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

② 在服务器 C 上创建配置文件 avro-memory-hdfs.conf 文件内容为

#定义 agent 名, source、channel、sink 的名称
avro-memory-hdfs.sources = avro-source
avro-memory-hdfs.sinks = hdfs-sink
avro-memory-hdfs.channels = memory-channel

#定义 source
avro-memory-hdfs.sources.avro-source.type = avro
avro-memory-hdfs.sources.avro-source.bind = node-teach1
avro-memory-hdfs.sources.avro-source.port =44444
#添加时间拦截器:可以将A或B机器传过来的数据进行获取,得到数据
avro-memory-hdfs.sources.avro-source.interceptors = i1
avro-memory-hdfs.sources.avro-source.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#定义 sink
avro-memory-hdfs.sinks.hdfs-sink.type = hdfs
#%{type} 可以将前面的数据中的key对应的value取出
avro-memory-hdfs.sinks.hdfs-sink.hdfs.path=hdfs://node-teach1:8020/source/logs/%{type}/%Y%m%d
avro-memory-hdfs.sinks.hdfs-sink.hdfs.filePrefix =events
avro-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream
avro-memory-hdfs.sinks.hdfs-sink.hdfs.writeFormat = Text
#时间类型
avro-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
#生成的文件不按条数生成
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollCount = 0
#生成的文件按时间生成
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollInterval = 30
#生成的文件按大小生成
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollSize    = 10485760
#批量写入 hdfs 的个数
avro-memory-hdfs.sinks.hdfs-sink.hdfs.batchSize = 10000
#flume 操作 hdfs 的线程数(包括新建,写入等)
avro-memory-hdfs.sinks.hdfs-sink.hdfs.threadsPoolSize=10
#操作 hdfs 超时时间
avro-memory-hdfs.sinks.hdfs-sink.hdfs.callTimeout=30000

#定义 channels
avro-memory-hdfs.channels.memory-channel.type = memory
avro-memory-hdfs.channels.memory-channel.capacity = 20000
avro-memory-hdfs.channels.memory-channel.transactionCapacity = 10000

#组装 source、channel、sink
avro-memory-hdfs.sources.avro-source.channels = memory-channel
avro-memory-hdfs.sinks.hdfs-sink.channel = memory-channel

③ 配置完成之后,在服务器 A 和 B 上的/root/data 有数据文件 access.log、nginx.log、web.log。

模拟生成日志文件

将三种内容拼接到三个文件中

下面脚本解释:

while true:死循环

do echo "test">>/xxx/xxx.log:输出数据"test"到某个日志文件中

sleep 0.5:睡眠0.5秒

done:结束符

while true;do echo "access ..." >> /root/bigdata/data/access.log;sleep 0.5;done

while true;do echo "nginx ..." >> /root/bigdata/data/nginx.log;sleep 0.5;done

while true;do echo "web ..." >> /root/bigdata/data/web.log;sleep 0.5;done

先启动服务器 C 上的 flume,启动命令在 flume 安装目录下执行 :

flume-ng agent -f avro-memory-hdfs.conf -n avro-memory-hdfs -Dflume.root.logger=INFO,console

然后在启动服务器上的 A 和 B,启动命令在 flume 安装目录下执行 :

flume-ng agent -f exec-memory-avro.conf -n exec-memory-avro -Dflume.root.logger=INFO,console

results matching ""

    No results matching ""