Flume入门案例
监控端口数据官方案例
案例需求:使用
Flume 监听一个端口, 收集该端口数据 ,并打印到控制台。
实现步骤:
安装 netcat 工具
sudo yum install -y nc
判断44444 端口是否被占用
sudo netstat -tunlp | grep 44444
创建Flume Agent 配置文件flume-netcat-logger.conf
在flume-netcat-logger.conf 文件中添加如下内容:
'Name the components on this agent'
a1.sources = r1
a1.sinks = k1
a1.channels = c1
'Describe/configure the source'
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost 表示a1监听的主机
a1.sources.r1.port = 44444 表示a1监听的端口号
'Describe the sink'
a1.sinks.k1.type = logger
'Use a channel which buffers events in memory'
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100 表示a1的channel传输时收集到了100条event后再去提交事务
'Bind the source and sink to the channel'
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 先开启flume 监听端口
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
或
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
--conf/-c:表示配置文件存储在conf/目录
--name/-n:表示给agent 起名为a1
--conf-file/-f:flume 本次启动读取的配置文件是在job 文件夹下的flume-telnet.conf文件。
-Dflume.root.logger=INFO,console :-D 表示flume 运行时动态修改flume.root.logger 参数属性值,并将控制台日志打印级别设置为INFO 级别。日志级别包括:log、info、warn、error。
- 使用netcat 工具向本机的44444 端口发送内容
[atguigu@hadoop102 ~]$ nc localhost 44444
hello
atguigu
在Flume 监听页面观察接收数据情况
实时监控单个追加文件
案例需求:实时监控Hive 日志,并上传到HDFS 中
实现步骤:
- Flume 要想将数据输出到HDFS,须持有Hadoop 相关jar 包
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷贝到/opt/module/flume/lib 文件夹下。
- 创建flume-file-hdfs.conf 文件
' Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
' Describe/configure the source
a2.source s.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
' Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs- '上传文件的前缀
a2.sinks.k2.hdfs.round = true '是否按照时间滚动文件夹
a2.sinks.k2.hdfs.roundValue = 1 '多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundUnit = hour '重新定义时间单位
a2.sinks.k2.hdfs.useLocalTimeStamp = true '是否使用本地时间戳
a2.sinks.k2.hdfs.batchSize = 1000 '积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.fileType = DataStream '设置文件类型,可支持压缩
a2.sinks.k2.hdfs.rollInterval = 30 '多久生成一个新的文件
a2.sinks.k2.hdfs.rollSize = 134217700 '设置每个文件的滚动大小,略小于块的大小
a2.sinks.k2.hdfs.rollCount = 0 '文件的滚动与 Event 数量无关
a2.channels.c2.type = memory ' Use a channel which buffers events in memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
' Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
运行Flume
bin/flume-ng agent —conf conf/ —name a2 —conf-file job/flume-file-hdfs.conf
开启Hadoop 和Hive 并操作Hive 产生日志
在HDFS 上查看文件。
实时监控目录下多个新文件
案例需求:使用Flume 监听整个目录的文件,并上传至HDFS
- 创建配置文件flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
'Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp) '忽略所有以.tmp 结尾的文件,不上传
'Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path =
hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload- '上传文件的前缀
a3.sinks.k3.hdfs.round = true '是否按照时间滚动文件夹
a3.sinks.k3.hdfs.roundValue = 1 '多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundUnit = hour '重新定义时间单位
a3.sinks.k3.hdfs.useLocalTimeStamp = true '是否使用本地时间戳
a3.sinks.k3.hdfs.batchSize = 100 '积攒多少个Event 才flush 到HDFS 一次
a3.sinks.k3.hdfs.fileType = DataStream '设置文件类型,可支持压缩
a3.sinks.k3.hdfs.rollInterval = 60 '多久生成一个新的文件
a3.sinks.k3.hdfs.rollSize = 134217700 '设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollCount = 0 '文件的滚动与Event 数量无关
'Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
'Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
启动监控文件夹命令
bin/flume-ng agent —conf conf/ —name a3 —conf-file job/flume-dir-hdfs.conf
说明:在使用Spooling Directory Source 时,不要在监控目录中创建并持续修改文件,上传完成的文件会以.COMPLETED 结尾,被监控文件夹每500 毫秒扫描一次文件变动
向upload 文件夹中添加文件
查看HDFS 上的数据
实时监控目录下的多个追加文件
Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而Taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。
案例需求:使用Flume 监听整个目录的实时追加文件,并上传至HDFS
- 创建配置文件flume-taildir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
'Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/flume/tail_dir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*
'Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path =hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload- '上传文件的前缀'
a3.sinks.k3.hdfs.round = true '是否按照时间滚动文件夹'
a3.sinks.k3.hdfs.roundValue = 1 '多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundUnit = hour '重新定义时间单位'
a3.sinks.k3.hdfs.useLocalTimeStamp = true ' 是否使用本地时间戳'
a3.sinks.k3.hdfs.batchSize = 100 '积攒多少个 Event 才 flush 到 HDFS 一次'
a3.sinks.k3.hdfs.fileType = DataStream '设置文件类型,可支持压缩'
a3.sinks.k3.hdfs.rollInterval = 60 '多久生成一个新的文件'
a3.sinks.k3.hdfs.rollSize = 134217700 '设置每个文件的滚动大小大概是 128M'
a3.sinks.k3.hdfs.rollCount = 0 '文件的滚动与 Event 数量无关'
'Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
'Bind the source and sink to the channel
a3.sources.r3. channels = c3
a3.sinks.k3.channel = c3
启动监控文件夹命令
bin/flume-ng agent —conf conf/ —name a3 —conf-file job/flume-taildir-hdfs.conf
向files 文件夹中追加内容 echo hello >> file1.txt
查看HDFS 上的数据
Taildir 说明:
Taildir Source 维护了一个json 格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。PositionFile 的格式如下:
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
注:Linux 中储存文件元数据的区域就叫做inode,每个inode 都有一个号码,操作系统用inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用inode 号码来识别文件。
Flume 企业开发案例
复制和多路复用
案例需求:使用Flume-1 监控文件变动,Flume-1 将变动内容传递给Flume-2,Flume-2 负责存储到HDFS。同时Flume-1 将变动内容传递给Flume-3,Flume-3 负责输出到Local FileSystem。
在/opt/module/datas/目录下创建flume3 文件夹
创建flume-file-flume.conf
'Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
'将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
'Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
'Describe the sink
'sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
'Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
'Bi nd the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
- 创建 flume-flume-hdfs.conf
'Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
'Describe/configure the source
'source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
'Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = flume2 '上传文件的前缀
a2.sinks.k1.hdfs.round = true'是否按照时间滚动文件夹
a2.sinks.k1.hdfs.roundValue = 1 '多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundUnit = hour '重新定义时间单位
a2.sinks.k1.hdfs.useLocalTimeStamp = true '是否使用 本地时间戳
a2.sinks.k1.hdfs.batchSize = 100 '积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.fileType = DataStream '设置文件类型,可支持压缩
a2.sinks.k1.hdfs.rollInterval = 600 '多久生成一个新的文件
a2.sinks.k1.hdfs.rollSize = 134217700 '设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollCount = 0 '文件的滚动与 Event 数量无关
'Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
'Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 创建 flume-flume-dir.conf
'Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
'Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
'Describe the sink
'提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
'Describe the channel
a3.channels.c2.type = memory
a3.cha nnels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
'Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
执行配置文件
分别
启动对应的 flume 进程 flume flume dir flume flume h dfs flume file flume :[atguigu@hadoop102 flume]$ bin/flume-ng agent —conf conf/ —name
a3 —conf-file job/group1/flume-flume-dir.conf[atguigu@hadoop102 flume]$ bin/flume-ng agent —conf conf/ —name
a2 —conf-file job/group1/flume-flume-hdfs.conf[atguigu@hadoop102 flume]$ bin/flume-ng agent —conf conf/ —name
a1 —conf-file job/group1/flume-file-flume.conf启动 Hadoop 和 Hive
检查 /opt/module/datas/flume3 目录中数据和 HDFS 上数据
负载均衡和故障转移
使用Flume 1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和Flume3 ,采用FailoverSinkProcessor ,实现故障转移的功能。
- 创建flume-netcat-flume.conf
'Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
'Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
'Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
'Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
'Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
- 创 建 flume-flume-console1.conf
'Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
'Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
'Describe the sink
a2.sinks.k1.type = logger
'Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
'Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- flume-flume-console2.conf
'Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
'Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
'escribe the sink
a3.sinks.k1.type = logger
'escribe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
'ind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
- 分别开启对应配置文件:
flume-flume-console2.conf, flume-flume-console1.conf, flume-netcat-flume.conf。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
使用 net cat 工具向本机的 44444 端口发送内容
查看 F lume 2 及 F lume 3 的控制台打印日志
将 Flume2 kill ,观察 Flume 3 的控制台打印情况。
注:使用
jps -ml 查看 Flume 进程。
聚合
案例需求:hadoop102 上的 Flume1 监控文件 opt/module /data group.log;hadoop103 上的 Flume2 监控某一个端口的数据流,
Flume1 与 Flume2 将数据发送给 hadoop104 上的 Flume3, Flume3 将最终数据打印到控
制台。
分发Flume
创建flume1-logger-flume.conf
'Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
'Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
'Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
'Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
'Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 创建 flume2-netcat-flume.conf
'Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
'Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
'Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
'Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capa city = 1000
a2.channels.c1.transactionCapacity = 100
'Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 创建 flume3-flume-logger.conf
'Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
'Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop10 4
a3.sources.r1.port = 4141
'Describe the sink
'Describe the sink
a3.sinks.k1.type = logger
'Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
'Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
- 执行配置文件
[atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf
[atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.conf
在 hadoop 103 上向 opt/module 目录下的 group .log 追加内容
在 hadoop 102 上向 44444 端口发送 数据
检查 hadoop 104 上数据