flume初探
什么是flume
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
这个定义来自百度百科。
当然,由于百度的公信力与可靠性越来越差,我周边大部分的技术人员都不相信百度了,那我们就看看flume官网是怎么定义它的。
官方:
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
由于我英文与不是特别好,但是感觉和百度百科的定义应该大致相致。
通过google翻译为中文如下:
Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。 它具有基于流数据流的简单灵活的架构。 它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性。 它使用简单的可扩展数据模型,允许在线分析应用程序。
以下是flume官方给出的flume架构图:
根据这个图,我猜的大概意思应该是,入口从web服务来。一个agen由source,sink,channel组成。
source对应的是入口地址定义。
将从source获取的数据丢到channel管道中,然后输出到sink,sink定义与配置了具体是什么存储介质来接受数据。
使用flume的基本系统要求:
- Java Runtime Environment - Java 1.8 or later
- Memory - Sufficient memory for configurations used by sources, channels or sinks
- Disk Space - Sufficient disk space for configurations used by channels or sinks
- Directory Permissions - Read/Write permissions for directories used by agent
内存和磁盘空间都可以通过配置文件进行配置,对agent的使用的目录必须有读写权限。
ps: 最近这家叫Cloudera的公司好像传出资金链断裂的消息.
不过,好在它早就是Apache基金会下的一个开源项目。
有什么优缺点
优点:
- 高吞吐,高可用,高可靠的数据收集工具
Apache Flume具有高度可扩展性,可靠性,可用性,可水平扩展以及可针对不同源数据和Sink进行定制,有助于收集,汇总和移动大量的数据集。 例如Facebook,Twitter和电子商务网站产生的数据等
使用配置简单
开源: Apache Flume是开源的,即易于获得。
文档丰富
提供丰富的文档,有很多很好的例子和模式,可以在其文档中找到。
低延迟
Apache Flume提供较高的吞吐量和较低的延迟。
数据流
在Hadoop环境中,Flume可以连续生成流式数据源。 比如日志文件等
Routing
Flume会查看流数据或Event等有效负载,构建合适的Routing (Routing Context)。
节省成本
Flume为开源软件,安装,操作和维护成本低。
可靠的消息传递
提供可靠的消息传递。 在Flume中,transaction (事物)是基于channel的,其中为每条消息维护两个transaction(一个source之间的,一个和sink之间的)。
基于稳定的数据流
提供了一种可靠和分布式的解决方案,并从HDFS中的各种来源如网络流量,社交媒体,电子邮件消息,日志文件等接收实时流数据。而且,在读取和写入操作之间,Flume能够提供稳定的数据流
缺点:
Weak Ordering Guarantee
Apache Flume 对于 消息排序的支持不强;
Duplicacy
Flume不保证信息到达是唯一的。 重复的消息时间可能会被接受到,所以有时候会接收到一些重复的消息。
低可扩展性
影响Flume硬件的高低的因素很多,在大多数情况下,是一个反复试验的过程。 因此,从这点上考虑,其可扩展性很低。
可靠性问题
如果考虑到所有因素,如果没有明智地选择存储的选择,则其可扩展性和可靠性受到质疑。
flume适合做什么
下面列出所有可能的Apache Flume用例:
• 从各种来源获取数据事件和日志并存储到Hadoop系统中,可以使用Apache Flume;
• 如果需要以高速率和高容量的数据在Hadoop系统中处理,可以使用Apache Flume;
• 可以将数据可靠地传送到目的地。
• 当数据的速度和数量增加时,通过Flume可扩展的解决方案,只需增加更多的机器就可以扩展;
• 不会产生任何停机时间,并且Flume可动态配置Flume Architecture的各个组件。
• 实时数据流,可以使用Apache Flume;
• 从多台服务器高效收集日志数据并将其提取到分布式存储系统中(HDFS,HBase)可以使用Flume;
• 可以实时的,以批处理模式从多个服务器收集数据;
• 可导入社交媒体网站(如Facebook和Twitter)和各种电子商务网站实时生成的大量事件数据并分析;
• 可以从许多数据来源收集数据,然后使用Flume将数据事件移动到多个目的地。
• Flume支持Multi-hop 流, fan-in 和fan-out 数据流, 以及 contextual routing。
• 如果有多个Web应用程序服务正在运行,生成大量的日志信息,必须以非常快的速度将日志移动到HDFS,可以使用Apache Flume;
• 使用抓取工具下载twitter中的各种数据,Flume可以将这些数据移至HDFS;
• 通过使用interceptors,可以在Flume中处理正在传输的数据。
• Flume可以 对数据进行屏蔽或过滤;
• 可以水平扩展容量;
有什么竞品
- logstash
工作原理与结构
logstash简单来看,内部节构是这样的:
这样看的话,其实和flume的结构很像:
都是入口—>中间件—->出口,看来设计思想是一样的。
- 配置复杂度
flume的配置相对于logstash来说,作为程序员来说,略显复杂,而logstash相对简单。
- 使用复杂度
Logstash可以和ELK其他组件配合使用,开发、应用都会简单很多,技术成熟,使用场景广泛。
相反Flume组件就需要和其他很多工具配合使用,场景的针对性会比较强,技术栈将会越铺越多。
- 设计初衷
flume本身最初设计的目的是为了把数据传入HDFS中,并不是为了采集日志而设计,这和Logstash有根本的区别
Flume Event
在Flume内部传输的数据的基本单位就是Event。 它包含字节数组的PayLoad。可以通过Header将信息从输出源传输到目的地。 请参考下图的Flume Event结构。
所以如果你的source是http类型,则client发送数据时,输入:
curl -X POST http://ip:8000 -d '[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'
就是说,如果你的source是http类型,发送的数据格式必须是包含:headers,和body的这种结构,否则默认是解析不了的。
Flume Agent
在Apache Flume中,Agent是一个独立的JVM守护进程。 它接收来自Client或其他Agent的Event。 之后,将它转发到下一个目标地点,如Sink或Agent。 注意,Flume可以有多个Agent。 参考下面的图片来了解Flume Agent。
Agent包含三个主要组件: Source,Channel和 Sink。
Flume Source
Flume Source从数据生成器接收数据。 然后将数据作为Flume Event传输到一个或多个Channel。
Apache Flume支持各种类型的Source。每个source接收来自指定数据生成器的Event。
如Avro source,Thrift source,twitter source等
Flume Channel
用来从Source接收Event的临时存储,并且缓存Event直到它们被Sink 取走,它充当了Flume中 source和 sink之间的桥梁。
如JDBC通道,文件系统通道,内存通道等。
Flume Sink
Sink用来将数据存储到Hbase或HDFS等目的存储中或下一个Agent,它消耗来自Channel的Event,然后将其传送到目的地, Sink的目的地可以是另一个Agent或存储系统,如Hbase和HDFS。
如HDFS Sink,另外,需要注意的是,Agent可以有许多数量的 Source,Channel和 Sink。
Flume Client生成Event然后将其发送给一个或多个Agent。
Flume Agent的附加组件
上面介绍的组件都是Apache Flume Agent的基本组件。还有一些组件在Flume event从数据生成器传输到目的存储中起着至关重要的作用。
Apache Flume Interceptors
通过Interceptors,可以改变/检查Source和Channel之间传输的Flume Event.
Channel Selectors
为了确定在多通道 (Channel)的情况下应该选择传送数据的通道,我们使用Channel Selectors。Channel Selectors通常有两种类型:
默认通道选择器 – 复制每个通道中所有Event的Channel Selectors;
Multiplexing channel selectors – 根据该Event Header中的地址决定发送Event的通道;
Sink Processors
通常 Sink Processor 用来调用特定 Sink。此外,还可用来在sink中创建故障转移路径或负载平衡event等。
厂内应用点
我厂前段时间有个需求,需要将与我厂合作的爬虫团队爬取的数据定时传送到我厂,我厂通过flume接收并存入mongodb中。
我厂大概的流程如下:
本来两个flume都放在一台宿主机上,通知不同的端口号来区分,理论上好像是可以的,但是实际部署中遇到一个问题;
由于我是用的文件管道去传输本地的flume数据到mongo的,防止因为使用内存管道而导致数据丢失问题。
flume1配置:
agent1.channels = channelCS
agent1.sources.sourceCS.type = http
agent1.sources.sourceCS.bind = 0.0.0.0
agent1.sources.sourceCS.port = 44445
agent1.sources.sourceCS.idleTimeout = 120
agent1.sources.sourceCS.channels = channelCS
agent1.sinks.sinkCS.type = org.riderzen.flume.sink.MongoSink
agent1.sinks.sinkCS.host = mongo
agent1.sinks.sinkCS.port = 27017
agent1.sinks.sinkCS.model = SINGLE
agent1.sinks.sinkCS.db = bzyhotel
agent1.sinks.sinkCS.collection = comments
agent1.sinks.sinkCS.batch = 100
agent1.sinks.sinkCS.channel = channelCS
agent1.channels.channelCS.checkpointDir = /data/point1
#监测点文件是否备份
#agent1.channels.channelCS.useDualCheckpoints = true
#监测点文件备份路径
#agent1.channels.channelCS.backupCheckpointDir = /data/back3
#数据存储目录
#agent1.channels.channelCS.dataDirs = /data/store1
flume2配置:
agent2.channels = channelH
agent2.sources.sourceH.type = http
agent2.sources.sourceH.bind = 0.0.0.0
agent2.sources.sourceH.port = 44446
agent2.sources.sourceH.idleTimeout = 120
agent2.sources.sourceH.channels = channelH
agent2.sinks.sinkH.type = org.riderzen.flume.sink.MongoSink
agent2.sinks.sinkH.host = mongo
agent2.sinks.sinkH.port = 27017
agent2.sinks.sinkH.model = SINGLE
agent2.sinks.sinkH.db = bzyhotel
agent2.sinks.sinkH.collection = hotel
agent2.sinks.sinkH.batch = 100
agent2.sinks.sinkH.channel = channelH
agent2.channels.channelH.checkpointDir = /data/point1
#监测点文件是否备份
#agent2.channels.channelH.useDualCheckpoints = true
#监测点文件备份路径
#agent2.channels.channelH.backupCheckpointDir = /data/back2
#数据存储目录
#agent2.channels.channelCS.dataDirs = /data/store2
用两个端口区分不现的flume服务,但是当我只开一台flume的时候,是可以正常解析并存入到mongodb的,但是当我启动另外一台flume的时候,就会出现无法存入mongo的情况。
经过反复测试与验证,发现是文件管道的问题。并且将不同的flume设置在不同的文件目录还不行,必须为不同的文件系统,也就是让flume觉得他们不在一个系统上。
在网络上有另外一个兄弟也提到过这个问题:
https://www.doudianyun.com/2018/11/apache-flume%e4%b8%ad%e7%9a%84channel%e7%b1%bb%e5%9e%8b/
好吧,那我没有两台服务器啊,怎么办?第一时间想到docker,是不是可以欺骗一下flume,说我是两个不同的文件系统。好吧,我开了两个flume试一下,由于现在的docker镜像市场没有我需要的自定义好的flume,所以我自制了支持mongodb的flume
flume-mongo:
registry.cn-shanghai.aliyuncs.com/hgq/flume-mongo
自制了一个启动脚本,run.sh:
docker run --name $1 -d -p $2:44444 -e AGENT_NAME1="$3" --link mongo -v ~/docker_map/flume-mongo/data$4:/data -v ~/docker_map/flume-mongo/conf$4:/usr/loca l/flume/conf:ro registry.cn-shanghai.aliyuncs.com/hgq/flume-mongo:1.0
执行run.sh:
sh ./run.sh flume1 8081 agentCS 1
sh ./run.sh flume2 8082 agentH 1
然后再试,就都可以存了,配置文件分配放到他们的挂载目录。