flume的特点
- 可靠性
- 可恢复性
每个组件必须要配置以下内容
- 名称
- 类型
- 属性集
一个agent包含以下组件
- source
- channel
- sink
测试
采集/opt/flume/data目录下的文件
第一步:首先在flume安装目录下创建文件spooldir-mem-logger.properties
根据业务需求配置 agent
;
-
第一步选择
source
,我的需求是采集目录下的所有文件,因此在这里选择Spooling Directory Source
; -
第二步选择
channel
:这里我选择Memory Channel
; -
第三步选择
sink
:我的需求是打印到控制台,因此这里选择Logger Sink
。

1
代表该 agent
的名字; r1
, c1
, k1
分别代表该 agent
的 source
, channel
和 sink
的名称;各组件的名称,类型以及 source
与 sink
和 channel
的绑定是必须配置的,属性集有些是必须要配置的,有些是可选择配置的。
配置完后就可以启动 flume
开始采集文件了:
在 Flume
的安装目录输入如下命令:
bin/flume-ng agent -n a1 -c conf -f conf/spooldir-mem-logger.properties -Dflume.root.logger=INFO,console
-n
后指定配置的 agent
的名称, -f
后指定配置文件。
这时候 /opt/flume/data
目录下所有文件都采集到控制台输出了。
如何选择 source
, channel
, sink
常见 Source
Avro Source
Avro
端口监听并接收来自外部的 avro
客户流的事件。 常用于多个 agent
相连时;
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 avro |
channel | – | channel的名称 |
bind | – | IP地址或者主机名 |
port | – | 绑定的端口 |
Exec Source
通过设定一个 Unix(linux)
命令监控文件。例如 cat [named pipe]
或 tail -F [file]
。 存在的问题是,当 agent
进程挂掉重启后,会有重复采集的问题。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 exec |
channel | – | channel的名称 |
command | – | 执行的命令,常使用 tail -F file |
Spooling Directory Source
监控某个目录下新增的文件,并读取文件中的数据。采集完的文件默认会被打上标记(在文件末尾加 .COMPLETED)。 适合用于采集新文件,但不适用于对实时追加日志的文件进行监听。如果需要实时监听追加内容的文件,可对 SpoolDirectorySource
进行改进。
注意的点:第一个是采集完的文件追加的新内容不会被采集,第二个是监听目录下的子目录下的文件不会被采集。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 spooldir |
channel | – | channel的名称 |
spoolDir | – | 监控目录 |
Taildir Source
可实时监控多批文件,并记录每个文件最新消费位置,将其保存于一个 json
文件中, agent
进程重启后不会有重复采集的问题。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 logger |
channel | – | channel的名称 |
filegroups | – | 文件组名称,不同文件组用空格分隔 |
filegroups. | – | 文件组的绝对路径 |
positionFile | ~/.flume/taildir_position.json | 以 json 格式记录 inode 、绝对路径和每个文件的最后消费位置 |
常见Sink
Logger Sink
INFO
级别记录事件。通常用于测试/调试目的。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 logger |
channel | – | channel的名称 |
HDFS Sink
将事件写入 HDFS
,目前支持创建文本文件和序列文件,并支持压缩。可以根据时间长短或数据大小或事件数量定期滚动文件(关闭当前文件并创建新文件)。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 hdfs |
** hdfs.path ** | – | 保存文件的目录 |
channel | – | channel的名称 |
hdfs.rollInterval | 30s | 回滚间隔,为零则不根据时间回滚 |
hdfs.rollSize | 1024byte | 回滚大小,为零则不根据大小回滚 |
hdfs.rollCount | 10 | 回滚条数,为零则不根据条数回滚 |
hdfs.fileType | SequenceFile | 保存到 hdfs 的文件格式,支持 SequenceFile , DataStream 或者 CompressedStream ,当使用 CompressedStream 时需指定压缩形式 |
Hive Sink
该 sink
可以将包含分割文本或者 Json
数据的 event
直接传送到 Hive
表中,当 event
提交到 Hive
时,它们马上就可以被 Hive
查询到。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 hdfs |
** hive.metastore ** | – | hive 的元数据存储 URI (eg thrift ://a.b.com:9083 ) |
channel | – | channel的名称 |
hive.database | – | 要导入的 hive 的数据库名称 |
hive.table | – | 要导入的 hive 表名 |
serializer | 解析 event 中的字段与 hive 中的字段相对应,支持 DELIMITED 和 JSON | |
serializer.fieldnames | – | 字段名称 |
serializer.delimiter | , | 依据什么切割字段 |
常见 Channel
Memory Channel
将 agent
缓存于内存中,适用于高吞吐量并且当 agent
挂掉以后允许数据丢失的业务上。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 memery |
capacity | 100 | 存储 channel 的最大 event 数 |
File Channel
将 agent
存储于磁盘上,当 agent
挂掉以后数据不会丢失。
属性名 | 默认值 | 说明 |
---|---|---|
type | – | 必须为 file |
dataDirs | ~/.flume/file-channel/data | 数据存储目录,可配置多个(可提高性能),用逗号分隔 |
checkpointDir | ~/.flume/file-channel/checkpoint | 存储 checkpoint 文件的目录 |
测试:
Agent
命名为a1
;- 选择正确的
source
采集所有文件; Channel
选用memery
;- 文件保存到
hdfs
有如下要求 : hdfs
路径名称:hdfs://localhost:9000
;- 保存到
flume
目录下; - 文件前缀命名为
flume
; - 每
4s
回滚一次文件; - 文件格式使用
DataStream
。
配置如下
# 配置source,channel,sink名称 a1.sources = taildir-source1 a1.channels = ch1 a1.sinks = hdfs-sink1 # 配置source a1.sources.taildir-source1.type = spooldir a1.sources.taildir-source1.spoolDir=/opt/flume/data # 配置 channel a1.channels.ch1.type = memory a1.channels.ch1.capacity = 100 # 配置 sink a1.sinks.hdfs-sink1.type = hdfs a1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:9000/flume a1.sinks.hdfs-sink1.hdfs.filePrefix = flume a1.sinks.hdfs-sink1.hdfs.rollInterval = 4 a1.sinks.hdfs-sink1.hdfs.fileType=DataStream #配置source和sink绑定到channel a1.sinks.hdfs-sink1.channel = ch1 a1.sources.taildir-source1.channels = ch1
不错
可以