一、Flume Channels
channel是在代理上暂存事件的存储库。Source 添加事件,Sink 将其删除。
1、Memory Channel
事件存储在具有可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流,但在agent发生故障时会丢失暂存数据
Property Name | Default | Description |
type | – | The component type name, needs to be memory |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
2、JDBC Channel
事件存储在由数据库支持的持久性存储中。JDBC 通道目前支持嵌入式 Derby。这是一个持久的通道,非常适合可恢复性很重要的流。
Property Name | Default | Description |
type | – | The component type name, needs to be jdbc |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
3、Kafka Channel
这些事件存储在 Kafka 集群中(必须单独安装)。Kafka 提供高可用性和复制,因此,如果agent或 Kafka 崩溃,这些事件会立即提供给其他sink
Kafka 通道可用于多种方案:
1. 使用 Flume source和sink - 它为事件提供了一个可靠且高度可用的通道
2. 使用 Flume source和interceptor,但没有sink - 它允许将 Flume 事件写入 Kafka topic,供其他应用程序使用
3. 使用 Flume sink,但没有source - 这是一种低延迟、高容错的方式,可以将事件从 Kafka 发送到 Flume sink,例如 HDFS、HBase 或 Solr
目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。测试完成到 2.0.1,这是发布时最高的可用版本。
配置参数的组织方式如下
1. 与通道相关的配置值一般应用于通道配置级别,例如:a1.channel.k1.type =
2. 与 Kafka 或通道运行方式相关的配置值以“kafka”为前缀(这与 CommonClient 配置无关),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。这与 hdfs 接收器的运行方式没有什么不同
3. 特定于生产者/消费者的属性以 kafka.producer 或 kafka.consumer 为前缀
4. 在可能的情况下,使用 Kafka 参数名称,例如:bootstrap.servers 和 acks
Property Name | Default | Description |
type | – | The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
Example for agent named a1:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
4、File Channel
默认情况下,文件通道使用用户主目录内的检查点和数据目录的路径。因此,如果代理中有多个文件通道实例处于活动状态,则只有一个实例能够锁定目录并导致另一个通道初始化失败。因此,有必要提供所有已配置通道的显式路径,最好是在不同的磁盘上。此外,由于文件通道将在每次提交后同步到磁盘,因此可能需要将其与将事件批处理在一起的sink/source耦合,以便在多个磁盘不可用于检查点和数据目录的情况下提供良好的性能。
Property Name Default | Description |
|
type | – | The component type name, needs to be file. |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
二、Flume Channel Selectors¶
通道选择器,如果未指定类型,则默认为“复制”。
1、Replicating Channel Selector (default)
Property Name | Default | Description |
selector.type | replicating | The component type name, needs to be replicating |
selector.optional | – | Set of channels to be marked as optional |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
在上面的配置中,c3 是可选通道。写入 c3 失败不会对事务产生影响。而 c1 和 c2 未标记为可选(optional),写入c1 和 c2失败将导致事务失败。
2、Load Balancing Channel Selector
负载平衡通道选择器提供了在多个通道上对流量进行负载均衡的能力。这 有效地允许在多个线程上处理传入数据。它维护一个索引列表,该列表必须在其上分配负载。实现支持使用round_robin或随机选择机制分配负载。选择机制的选择默认为round_robin类型,但可以通过配置进行覆盖。
Property Name | Default | Description |
selector.type | replicating | The component type name, needs to be load_balancing |
selector.policy | round_robin | Selection mechanism. Must be either round_robin or random. |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = load_balancing
a1.sources.r1.selector.policy = round_robin
3、Multiplexing Channel Selector
多路复用通道选择器
Property Name | Default | Description |
selector.type | replicating | The component type name, needs to be multiplexing |
selector.header | flume.selector.header | |
selector.default | – | |
selector.mapping.* | – |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
#header的值对应自定义Interceptor中header的key
a1.sources.r1.selector.header = state
# CZ、US对应自定义Interceptor中header的value
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
三、Flume Interceptors
Flume 能够在数据传输中修改/删除事件。这是在拦截器Interceptors的帮助下完成的。拦截器是实现 org.apache.flume.interceptor.Interceptor 接口的类。拦截器可以根据拦截器开发人员选择的任何条件修改甚至删除事件。Flume 支持拦截器的链接。这是通过在配置中指定拦截器生成器类名列表来实现的。拦截器在源配置中被指定为空格分隔列表。配置中的拦截器的顺序即是调用拦截器的顺序。一个拦截器返回的事件列表将传递给链中的下一个拦截器。拦截器可以修改或删除事件。如果拦截器需要删除事件,则它不会在返回的列表中返回该事件。如果要删除所有事件,则仅返回一个空列表。拦截器是需要自编的组件,下面是一个示例
1、自定义interceptors
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 先调用i1再调用i2
a1.sources.r1.interceptors = i1 i2
#这里自编的拦截器名为HostInterceptor
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
请注意,拦截器生成器将传递给 type config 参数。拦截器本身就是 可配置,并且可以传递配置值,就像传递给任何其他可配置组件一样。 在上面的示例中,事件首先传递给 HostInterceptor,然后由 HostInterceptor 返回事件 然后传递给 TimestampInterceptor。可以指定完全限定的类名 (FQCN) 或别名时间戳。如果您有多个收集器写入同一 HDFS 路径,则还可以使用 HostInterceptor。
2、Timestamp Interceptor
此拦截器将处理事件的时间(以毫秒)插入到事件标头中。这个拦截器 插入一个具有键 timestamp (或由 header 属性指定) 的标头,其值为相关时间戳。 如果配置中已存在现有时间戳,则此侦听器可以保留该时间戳。
Property Name | Default | Description |
type | – | The component type name, has to be timestamp or the FQCN |
headerName | timestamp | The name of the header in which to place the generated timestamp. |
preserveExisting | false | If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
3、Host Interceptor
此侦听器插入运行此代理的主机的主机名或 IP 地址。它插入一个标题 使用密钥主机或已配置的密钥,其值为主机的主机名或 IP 地址(基于配置)
Property Name | Default | Description |
type | – | The component type name, has to be host |
preserveExisting | false | If the host header already exists, should it be preserved - true or false |
useIP | true | Use the IP Address if true, else use hostname. |
hostHeader | host | The header key to be used. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
还没有评论,来说两句吧...