1.5 flume简单案例

1.5.1 采集指定端口的数据并输出到控制台

采集需求:某个端口不断接受外来数据传入,flume需要将数据进行处理并输出到其他位置(这里用控制台表示)

从官网中获取到配置数据进行修改配置

使用flume的关键就是定义配置文件,根据需求,首先定义以下 3 大要素

​ 采集源,即 source——监控文件目录 : spooldir

​ 下沉目标,即 sink——HDFS 文件系统 : hdfs sink

​ source 和 sink 之间的传递通道——channel,可用 file channel 也可以用

将以上三个组件串联起来

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1 
a1.channels = c1

# 描述和配置 source 组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# 描述和配置 sink 组件:k1 
a1.sinks.k1.type = logger 

# 描述和配置 channel 组件,此处使用是内存缓存的方式、 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100

# 描述和配置 source channel sink 之间的连接关系 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

1.5.2 从A服务器采集文件到B服务器

需求:将A服务器中的日志实时采集到B服务器

1,A服务器:

监控一个文件实时采集新增的数据输出B服务器

2,B服务器:

从指定网络端口采集数据输出到控制台

设计:

A服务器中flume配置

采集源,即 source : exec-source

​ 下沉目标,即 sink——HDFS 文件系统 : avro sink

​ source 和 sink 之间的传递通道——memory channel

#exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/ccy/data/a.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444


# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

B服务器中flume配置

采集源,即 source : avro-source

​ 下沉目标,即 sink——HDFS 文件系统 : logger sink

​ source 和 sink 之间的传递通道——memory channel

#avro-memory-logger.conf
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

# Describe/configure the source
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = localhost
avro-memory-logger.sources.avro-source.port = 44444

# Describe the sink
avro-memory-logger.sinks.logger-sink.type = logger

# Use a channel which buffers events in memory
avro-memory-logger.channels.memory-channel.type = memory

# Bind the source and sink to the channel
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel

启动:

注:先启动B服务器,后启动A服务器

bin/flume-ng agent --conf conf --conf-file avro-memory-logger.conf --name avro-memory-logger -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf --conf-file exec-memory-avro.conf --name exec-memory-avro -Dflume.root.logger=INFO,console

创建文件,不断向文件追加内容

touch a.log
echo "xxx">>a.log

1.5.3 采集文件到HDFS

采集需求:比如业务系统生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到 hdfs

根据需求,首先定义以下 3 大要素

​ 采集源,即 source——监控文件内容更新 : exec ‘tail -F file’

​ 下沉目标,即 sink——HDFS 文件系统 : hdfs sink

​ Source 和 sink 之间的传递通道——channel,可用 file channel 也可以用内存 channel

配置文件编写:

#Name the components on this agent 
exec-memory-hdfs.sources = exec-source
exec-memory-hdfs.sinks = hdfs-sink
exec-memory-hdfs.channels = memory-channel

# Describe/configure the source
exec-memory-hdfs.sources.exec-source.type = exec
exec-memory-hdfs.sources.exec-source.command = tail -F /root/logs/test.log
exec-memory-avro.sources.exec-source.shell=/bin/sh -c

# Describe the sink 
exec-memory-hdfs.sinks.hdfs-sink.type = hdfs
exec-memory-hdfs.sinks.hdfs-sink.hdfs.path = /flume/events/%y-%m-%d/%H%M/
exec-memory-hdfs.sinks.hdfs-sink.hdfs.filePrefix = events-
exec-memory-hdfs.sinks.hdfs-sink.hdfs.round = true
exec-memory-hdfs.sinks.hdfs-sink.hdfs.roundValue = 10
exec-memory-hdfs.sinks.hdfs-sink.hdfs.roundUnit = minute
exec-memory-hdfs.sinks.hdfs-sink.hdfs.rollInterval = 3
exec-memory-hdfs.sinks.hdfs-sink.hdfs.rollSize = 20
exec-memory-hdfs.sinks.hdfs-sink.hdfs.rollCount = 5
exec-memory-hdfs.sinks.hdfs-sink.hdfs.batchSize = 1
exec-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本 
exec-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本 
exec-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream

# Use a channel which buffers events in memory 
exec-memory-hdfs.channels.memory-channel.type = memory
exec-memory-hdfs.channels.memory-channel.capacity = 1000
exec-memory-hdfs.channels.memory-channel.transactionCapacity = 100


# Bind the source and sink to the channel
exec-memory-hdfs.sources.exec-source.channels = memory-channel
exec-memory-hdfs.sinks.hdfs-sink.channel = memory-channel

1.5.4 采集目录到HDFS

采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到 HDFS 中去

根据需求,首先定义以下 3 大要素

​ 采集源,即 source——监控文件目录 : spooldir

​ 下沉目标,即 sink——HDFS 文件系统 : hdfs sink

​ source 和 sink 之间的传递通道——channel,可用 file channel 也可以用

​ 内存 channel

配置文件编写:

#Name the components on this agent 
spool-memory-hdfs.sources = spool-source
spool-memory-hdfs.sinks = hdfs-sink
spool-memory-hdfs.channels = memory-channel

# Describe/configure the source
#注意:不能往监控目中重复丢同名文件
spool-memory-hdfs.sources.spool-source.type = spooldir
spool-memory-hdfs.sources.spool-source.spoolDir = /root/bigdata/data
spool-memory-hdfs.sources.spool-source.fileHeader = true

# Describe the sink 
spool-memory-hdfs.sinks.hdfs-sink.type = hdfs
spool-memory-hdfs.sinks.hdfs-sink.hdfs.path = /flume/events/%y-%m-%d/%H%M/
spool-memory-hdfs.sinks.hdfs-sink.hdfs.filePrefix = events-
spool-memory-hdfs.sinks.hdfs-sink.hdfs.round = true
spool-memory-hdfs.sinks.hdfs-sink.hdfs.roundValue = 10
spool-memory-hdfs.sinks.hdfs-sink.hdfs.roundUnit = minute
spool-memory-hdfs.sinks.hdfs-sink.hdfs.rollInterval = 3
spool-memory-hdfs.sinks.hdfs-sink.hdfs.rollSize = 20
spool-memory-hdfs.sinks.hdfs-sink.hdfs.rollCount = 5
spool-memory-hdfs.sinks.hdfs-sink.hdfs.batchSize = 1
spool-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本 
spool-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream

# Use a channel which buffers events in memory 
spool-memory-hdfs.channels.memory-channel.type = memory
spool-memory-hdfs.channels.memory-channel.capacity = 1000
spool-memory-hdfs.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
spool-memory-hdfs.sources.spool-source.channels = memory-channel
spool-memory-hdfs.sinks.hdfs-sink.channel = memory-channel

启动:

bin/flume-ng agent --conf conf --conf-file spool-memory-hdfs.conf --name spool-memory-hdfs -Dflume.root.logger=INFO,console

Channel 参数解释:

  • capacity:默认该通道中最大的可以存储的 event 数量

  • trasactionCapacity:每次最大可以从 source 中拿到或者送到 sink 中的 event数量 sinks参数解析:

  • rollInterval:默认值:30

    hdfs sink 间隔多长将临时文件滚动成最终目标文件,单位:秒;

    如果设置成 0,则表示不根据时间来滚动文件;

    注:滚动(roll)指的是,hdfs sink 将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;

  • rollSize

    默认值:1024

    当临时文件达到该大小(单位:bytes)时,滚动成目标文件;

    如果设置成 0,则表示不根据临时文件大小来滚动文件;

  • rollCount

    默认值:10

    当 events 数据达到该数量时候,将临时文件滚动成目标文件;

    如果设置成 0,则表示不根据 events 数据来滚动文件;

  • round

    默认值:false

    是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。

  • roundValue

    默认值:1

    时间上进行“舍弃”的值;

  • roundUnit

    默认值:seconds

    时间上进行“舍弃”的单位,包含:second,minute,hour

    示例:

    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S

    a1.sinks.k1.hdfs.round = true

    a1.sinks.k1.hdfs.roundValue = 10

    a1.sinks.k1.hdfs.roundUnit = minute

    当时间为 2015-10-16 17:38:59 时候,hdfs.path 依然会被解析为:

    /flume/events/20151016/17:30/00

    因为设置的是舍弃 10 分钟内的时间,因此,该目录每 10 分钟新生成一个。

results matching ""

    No results matching ""