Flume事务
Agent 内部原理
ChannelSelector
ChannelSelector 的作用就是选出Event 将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个 Event 发往所有的 Channel, Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。
SinkProcessor
SinkProcessor共有三种类型,分别是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor。
DefaultSinkProcessor 对应的是单个的 Sink,LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能, FailoverSinkProcessor 可以 实现故障转移的功能。
拓扑结构
串联
这种模式是将多个flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量 flume 数量 过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
复制和多路复用图
Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中, sink 可以选择传送到不同的目的地。
负载均衡和故障转移
Flume 支持使用将多个 sink 逻辑上分到一个 sink 组, sink 组配合不同的 SinkProcessor可以实现负载均衡和错误恢复的功能。
聚合
这种模式是我们最常见的,也非常实用,日常web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个flume 采集日志,传送到一个集中收集日志的flume,再由此flume 上传到hdfs、hive、hbase 等,进行日志分析。
自定义Interceptor
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构, Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个Interceptor,为不同类型的event 的Header 中的key 赋予不同的值。
在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)。
实现步骤:
- 创建一个maven 项目,并引入以下依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
- 定义CustomInterceptor 类并实现Interceptor 接口
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
public class CustomInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
if (body[0] < 'z' && body[0] > 'a') {
event.getHeaders().put("type", "letter");
} else if (body[0] > '0' && body[0] < '9')
event.getHeaders().put("type", "number");
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomInterceptor();
}
@Override
public void configure(Context context) {}
}
}
编辑 flume 配置文件
为
hadoop102 上的 Flume1 配置 1 个 netcat source, 1 个 sink group, 2 个 avro sink并配置相应的 ChannelSelector 和 interceptor。
'Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
'Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sourc es.r1.selector.mapping.number = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
'Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
'Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
'Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
为
hadoop 103 上的 F lume2 配置一个 avro source 和一个 logger sink 。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
为hadoop 10 4 上的 Flume3 配置一个 avro source 和一个 logger sink 。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10 00
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
分别在 hadoop102 hadoop103 hadoop104 上启动 flume 进程 ,注意先后顺序。
在 hadoop102 使用 netcat 向 localhost 444 44 发送字母和数字
观察 hadoop103 和 hadoop104 打印的日志 。
自定义Source
Source是负责接收数据到 Flume Agent 的组件。 Source 组件可以处理各种类型、各种格式的日志数据 包括 avro、 thrift、 exec、 jms、 spooling directory、 netcat、 sequence generator、 syslog、 http、 legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
- getBackOffSleepIncrement
暂不用 - getMaxBackOffS
leepInterval 暂不用 - configure(Context context
初始化 context (读取配置文件内容) - process()
获取数据封装成 event 并写入 channel ,这个方法将被循环调用。
使用场景:读取
MySQL 数据或者其他文件系统。
需求:使用
flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。
- 导入pom 依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
- 编写 MySource 代码
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定义配置文件将来要读取的字段
private Long delay;
private String field;
//初始化配置信息
@Override
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "Hello!");
}
@Override
public Status process() throws EventDeliveryException {
try {
//创建事件头信息
HashMap<String, String> hearderMap = new HashMap<>();
//创建事件
SimpleEvent event = new SimpleEvent();
//循环封装事件
for (int i = 0; i < 5; i++) {
//给事件设置头信息
event.setHeaders(hearderMap);
//给事件设置内容
event.setBody((field + i).getBytes());
//将事件写入 channel
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (E xception e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffS leepInterval() {
return 0;
}
}
打包,将写好的代码打包,并放到
flume 的 lib 目录( opt/module/flume )下。配置文件
'Name the components on this agent'
a1.sources = r1
a1.sinks = k1
a1.channels = c1
'Describe/configure the source'
a1.sources.r1.type = com.atguigu.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = atguigu
'Describe the sink'
a1.sinks.k1.type = logger
'Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transac tionCapacity = 100
'Bind the source and sink to the channel'
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
开启任务
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f
job/my-source.conf -n a1 -Dflume.root.logger=INFO,console结果展示
自定义Sink
Sink不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent 。
Sink是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent Sink 就利用 Channel 提交事务。 事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、
自定义。官方提供的Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Sink。
MySink 需要继承AbstractSink 类并实现Configurable 接口。
实现相应方法:
- configure(Context context)//初始化
- context(读取配置文件内容)
- process() //从Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取Channel 数据写入MySQL 或者其他文件系统
需求:使用flume 接收数据,并在Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在flume 任务配置文件中配置。
- 编码
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
//创建 Logger 对象
private static final Logger LOG =
LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
//声明返回值状态信息
Status status;
//获取当前 Sink 绑定的 Channel
Channel ch = getChannel();
//获取事务
Transaction txn = ch.getTransaction();
//声明事件
Event event;
//开启事务
txn.begin();
//读取 Channel 中的事件,直到读取到事件结束循环
while (true) {
event = ch.take();
if (event != null) {
break;
}
}
try {
//处理事件(打印)
LOG.info(prefix + new String(event.getBody()) + suffix);
//事务提交
txn.commit();
status = Status.READY;
} catch (Exception e){
//遇到异常,事务回滚
txn.rollback();
status = Status.BACKOFF;
} finally{
//关闭事务
txn.close();
}
return status;
}
@Override
public void configure(Context context) {
//读取配置文件内容,有默认值
prefix = context.getString("prefix", "hello:");
//读取配置文件内容,无默认值
suffix = context.getString("suffix");
}
}
打包
将写好的代码打包,并放到
flume 的 lib 目录( (/opt/module/flume )下。配置文件
'Name the components on this agent'
a1.sources = r1
a1.sinks = k1
a1.channels = c1
'Describe/configure the source'
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
'Describe the sink'
a1.sinks.k1.type = com.atguigu.MySink
'a1.sinks.k1.prefix ='
a1.sinks.k1.suffix = :atguigu
'Use a channel which buffers events in memory'
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
'Bind the source and sink to the channel'
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 开启任务
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[atguigu@hadoop102 ~]$ nc localhost 44444
hello
OK
atguigu
OK
- 结果展示
Flume数据流监控
Ganglia的安装与部署
安装 httpd 服务与 php
[atguigu@hadoop102 flume]$ sudo yum y install httpd php
安装其他依赖
sudo yum y install rrdtool perl rrdtool
rrdtool devel[atguigu@hadoop102 flume]$ sudo yum y install apr devel
安装 ganglia
sudo rpm Uvh
http://dl.fedoraproject.org/pub/epel/6/x86_64/epel release 6
8.noarch.rpmsudo yum y install ganglia gmetad
sudo yum y install ganglia web
sudo yum y install ganglia gmond
Ganglia由 gmond、 gmetad 和 gweb 三部分组成。
gmond (Ganglia Monitoring Daemon) 是一种轻量级服务,安装在每台需要收集指标数据的节点主 机上。使用 gmond ,你可以很容易收集很多系统指标数据,如 CPU 、内存、磁盘、网络和活跃进程的数据等。
gmetad (Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。
gweb (Ganglia Web Ganglia) 可视化工具, gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。
修改配置文件 /etc/httpd/conf.d/ganglia.conf
# Ganglia monitoring system php web frontend Alias /ganglia /usr/share/ganglia <Location/ ganglia> Order deny,allow #Deny from all Allow from all ##这一行 # Allow from 127.0.0.1 # Allow from ::1 # Allow from .example.com </Location>
修改配置文件 /etc/ganglia/gmetad.conf
data_source “hadoop102” 192.168. 9 .102
修改配置文件 /etc/ganglia/gmond.conf
cluster { name = "hadoop102" owner = "unspecified" latlong = "unspecified" url = "unspecified" } udp_send_channel { #bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address # that resolves to the machine's hostname. Without # this, the metrics may appear to come from any # interface and the DNS names associated with # those IPs will be used to create the # mcast_join = 239.2.11.71 host = 192.168.9.102 port = 8649 ttl = 1 } udp_recv_channel { # mcast_join = 239.2.11.71 port = 8649 bind = 192.168. 9 .102 retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really # should bump it up to e.g. 10MB or even # buffer = 10485760 }
修改配置文件 /etc/selinux/config
# This file controls the state of SELinux on the # SELINUX= can take one of these three # enforcing SELinux security policy is enforced. # permissive SELinux prints warnings instead of enforcing. # disabled No SE Linux policy is loaded. SELINUX=disabled ### 这一行 # SELINUXTYPE= can take one of these two # targeted Targeted processes are protected, # mls Multi Level Security protection. SELINUXTYPE=targeted '提示selinux 本次生效关闭必须重启,如果此时不想重启,可以临时生效之: '[atguigu@hadoop102 flume]$ sudo setenforce 0
启动 ganglia
[atguigu@hadoop102 flume]$ sudo service httpd start [atguigu@hadoop102 flume]$ sudo service gmetad start [atguigu@hadoop102 flume]$ sudo service gmond start
打开网页浏览 ganglia 页面
提示:
如果完成以上操作依然出现权限不足错误,请修改 /var/lib/ganglia 目录的权限:[atguigu@hadoop102 flume]$ sudo chmod R 777 /var/lib/ganglia
操作 Flume测试监控
修改 /opt/module/flume/conf 目录下的 flume env.sh 配置:
JAVA_OPTS=" Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts= 192.168.9.102:8649 -Xms100m -Xmx200m"
启动 Flume 任务
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume netcat logger.conf -Dflume.root.logger==INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitorin g.hosts= 192.168.9.102:8649
发送数据观察 ganglia 监测图
[atguigu@hadoop102 flume]$ nc localhost 44444
字段(图表名称) | 字段含义 |
---|---|
EventPut AttemptCount | source 尝试写入 channel 的事件总数量 |
EventPutSuccessCount | 成功写入 channel 且提交的事件总数量 |
EventTakeAttemptCount | sink 尝试从 channel 拉取事件的总数量 |
E ventTakeSuccessCount sink | sink 成功读取的事件的总数量 |
StartTime | channel 启动的时间(毫秒) |
StopTime | channel 停止的时间(毫秒) |
ChannelSize | 目前 channel 中事件的总数量 |
ChannelFillPercentage | channel 占用的百分比 |
ChannelCapacity | channel 的容量 |