Logstash7.0输出值Kafka

释放双眼,带上耳机,听听看~!

Kafka输入插件简介

在logstash使用kafka插件将从kafka中读取数据,此插件需要使用kafka client 2.1.0客户端版本。有关代理兼容性,请参阅kafka官方兼容性参考资料
Kafka插件支持通过以下方式连接到kakfa:

  • SSL (需要kafka插件版本在3.0.0及以上版本)
  • Kerberos SASL (需要Kafka插件版本5.1.0及以上版本)
    不过不用担心,现在logstash对kafka输入插件提供的版本为v9.0.0
    默认情况下,安全认证是禁用的,但可以根据需要打开安全认证

Logstash Kafka使用者使用Kafka处理组管理并使用默认的偏移管理策略。
默认情况下,Logstash Kafka实例组成一个逻辑组Consumer Group来订阅Kafka,每个Logstash Kafka使用者可以运行多个线程来增加读取吞吐量。或者,您可以使用相同的group_id运行多个Logstash实例,将负载分散到物理机器上。kafka中的消息将分发给所有具有相同group_id的Logstash实例。

有关更多信息,请参阅:Kafka更多信息
Kafka消费者配置:Kafka消息者配置

Kafka输入插件配置选项

设置 输入类型 描述
auto_commit_interval_ms string 可选
auto_offset_reset string 可选
bootstrap_servers string 可选
check_crcs string 可选
client_id string 可选
connections_max_idle_ms string 可选
consumer_threads number 可选
decorate_events boolean 可选
enable_auto_commit string 可选
exclude_internal_topics string 可选
fetch_max_bytes string 可选
fetch_max_wait_ms string 可选
fetch_min_bytes string 可选
group_id string 可选
heartbeat_interval_ms string 可选
jaas_path path 可选
kerberos_config path 可选
key_deserializer_class string 可选
max_partition_fetch_bytes string 可选
max_poll_interval_ms string 可选
max_poll_records string 可选
metadata_max_age_ms string 可选
partition_assignment_strategy string 可选
poll_timeout_ms number 可选
receive_buffer_bytes string 可选
reconnect_backoff_ms string 可选
request_timeout_ms string 可选
retry_backoff_ms string 可选
sasl_kerberos_service_name string 可选
sasl_mechanism string 可选
security_protocol string 可选
send_buffer_bytes string 可选
session_timeout_ms string 可选
ssl_endpoint_identification_algorithm string 可选
ssl_key_password password 可选
ssl_keystore_location path 可选
ssl_keystore_password password 可选
ssl_keystore_type string 可选
ssl_truststore_location path 可选
ssl_truststore_password password 可选
ssl_truststore_type string 可选
topics array 可选
topics_pattern string 可选
value_deserializer_class string 可选

auto_commit_interval_ms

  • 值类型是string字符串
  • 默认值是5000
  • 描述:Kafka消费者将偏移量提交给Kafka的频率,单位为毫秒5000ms=5s

auto_offset_reset

  • 值类型是string字符串
  • 没有默认值
  • 描述:Kafka中没有初始偏移量或者偏移量超出范围怎么办?以下四种值
    1)earliest:Kafka自动将偏移量重置为最老的偏移量
    2)latest:Kafka自动将偏移量重置为最新的偏移量
    3)none:Kafka如果没有找到消费者组原来的偏移量,则向消费者抛出异常
    4)anything else:Kafka向消费者抛出异常

bootstrap_servers

  • 值类型为string字符串
  • 默认值是localhost:9092
  • 描述:Logstash连接Kafka实例的列表,如果为Kafka集群,可以写为 host1:port1,host2:port2,逗号分隔

check_crcs

  • 值类型是string字符串
  • 没有默认值
  • 描述:自动检查所使用记录的CRC32。这确保不会发生消息的联机或磁盘损坏。这种检查增加了一些开销,所以在寻求极端性能的情况下可能会禁用它。

client_id

  • 值类型为string字符串
  • 默认值为logstash
  • 描述:发出请求时传递给服务器的ID字符串,这样做的目的是允许通过包含逻辑应用程序名称来跟踪超出ip/port的请求源

connections_max_idle_ms

  • 值类型为string字符串
  • 没有默认值
  • 描述:在此配置指定的毫秒后关闭Logstash与Kafka的空闲连接

consumer_threads

  • 值类型是number数字
  • 默认值为1
  • 描述:Kafka消费者线程数,建议与Kafka Topic中的分区数量相同,过多的线程会处于空闲状态,浪费资源

decorate_events

  • 值类型为boolean布尔
  • 默认值为false
  • 描述:该选项可将Kafka元数据(如Topic,消息大小)添加到事件中,这将向logstash事件添加一个名为kafka的字段,该字段包含以下属性:
    1)Topic:此消息与关联的主题
    2)consumer_group:消费者组用于在此事件中读取
    3)partition:此消息关联的分区
    4)offset:与此消息关联分区的偏移量
    5)key:包含消息键的字节缓冲区

enable_auto_commit

  • 值类型为string字符串
  • 默认值为true
  • 描述:如果为真,则定期会把Logstash消费过Kafka的消息偏移量提交给Kafka,当流程失败时,将使用这个提交的偏移量作为开始消费的位置

exelude_internal_topics

  • 值类型为string字符串
  • 没有默认值
  • 描述:是否将内部主题(如偏移量)的记录暴露给消费者,如果设置为true,则从内部主题接收记录的唯一方法是订阅它

fetch_max_bytes

  • 值类型是string字符串
  • 没有默认值
  • 描述:服务器应该为获取请求返回的最大数据量。这不是一个绝对最大值,如果fetch的第一个非空分区中的第一个消息大于这个值,那么仍然会返回该消息,以确保使用者能够取得进展。

fetch_max_wait_ms

  • 值类型是string字符串
  • 没有默认值
  • 描述:如果没有足够的数据满足fetch_min_bytes,服务器将在响应fetch请求之前阻塞的最长时间。这应该小于或等于poll_timeout_ms中使用的超时

fetch_min_bytes

  • 值类型为string字符串
  • 没有默认值
    服务器应该为获取请求返回的最小数据量。如果没有足够的数据可用,则请求将等待大量数据累积后才响应请求。

group_id

  • 值类型是string字符串
  • 默认为logstash
  • 描述:此Logstash消费者所属的消费者组名称,消费者组是由多个消费者实例的单个订阅服务器,Topic中的消息将分发给所有具有相同group_id的logstash实例

heartbeat_interval_ms

  • 值类型为string字符串
  • 没有默认值
  • 描述:消费者协调器的预期心跳间隔时间。心跳用于确保用户会话保持活跃,并在新用户加入或离开组时促进再平衡。该值必须设置为session.timeout以下。,但通常应设置不高于该值的1/3。它可以调整甚至更低,以控制正常再平衡的预期时间。

jaas_path

  • 值类型为path路径
  • 没有默认值
  • 描述:JAVA身份验证和授权服务(JAAS)API为Kafka提供用户身份验证和授权服务,该设置提供指定JAAS文件的路径,Kafka客户端的示例JAAS文件:
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=true
  renewTicket=true
  serviceName="kafka";
  };

请注意,在配置文件中指定jaas_path和kerberos_config将把它们添加到全局JVM系统属性中。这意味着,如果您有多个Kafka输入,那么它们都将共享同一个jaas_path和kerberos_config。如果不希望这样,就必须在不同的JVM实例上运行不同的Logstash实例。

kerberos_config

key_deserializer_class

  • 值类型是string字符串
  • 默认值是"org.apache.kafka.common.serialization.StringDeserializer"
  • 描述:用于反序列化记录密钥的JAVA类

max_poll_interval_ms

  • 值类型是string字符串
  • 没有默认值
  • 描述:使用消费者组时,poll()调用之间的最大延迟,这就为消费者在获取更多记录之前空闲的时间设置了上限,如果在此超时过期之前未调用poll(),则认为使用者失败,消费者组将重新平衡,以便将分区重新分配给另一个成员。配置request_timeout_ms的值必须始终大于max_poll_interval_ms

max_poll_records

  • 值类型为string字符串
  • 没有默认值
  • 描述:在一次对poll()的调用中返回的最大记录数

poll_timeout_ms

  • 值类型为number数字
  • 默认值为100
  • 描述:Kafka消费者将等待从主题接收新消息

receive_buffer_bytes

  • 默认值为string数字
  • 没有默认值
  • 描述:读取数据时使用TCP接收缓冲区(SO_RCVBUF)的大小

reconnect_backoff_ms

  • 值类型为string字符串
  • 没有默认值
  • 描述:尝试重新连接到给定主机之前等待的时间。这避免了在紧密循环中重复连接到主机。此回退适用于消费者发送给代理的所有请求。

request_timeout_ms

  • 值类型为string字符串
  • 没有默认值
  • 描述:Kafka消费者等待请求响应的最大时间,如果超过此时间没有接收到响应,消费者将在必要时重新发送请求,直到重新发送请求耗尽时请求失败

sasl_kerberos_service_name

  • 值类型为string字符串
  • 没有默认值
  • 描述:Kafka代理运行的Kerberos主体名称。这可以在Kafka的JAAS配置或Kafka的配置中定义

sasl_mechanism

  • 值类型是string字符串
  • 默认值是"GSSAPI"
  • 描述:用于客户端连接的SASL机制。这可以是安全提供者可用的任何机制。GSSAPI是默认机制。

security_protocol

  • 值类型可以是:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL
  • 默认值为PLAINTEXT
  • 描述:Kafka消费者要使用的安全协议,可以上以上几种协议

send_buffer_bytes

  • 值类型是字符串
  • 没有默认值
  • 描述:发送数据时使用TCP发送缓冲区(SO_SNDBUF)的大小

session_timeout_ms

  • 值类型是字符串
  • 没有默认值
  • 描述:超时之后,如果poll_timeout_ms未调用,则将消费者标记为死,并为消费者组触发重新平衡操作group_id

topics

  • 值类型是数组array
  • 默认值是["logstash"]
  • 描述:要订阅的主题列表,默认为["logstash"]

topic_pattern

  • 值类型是字符串string
  • 没有默认值
  • 描述:要订阅的主题正则表达式模式。使用此配置时将忽略主题配置。

Kafka输入插件示例配置

cat /opt/application/logstash-7.0.0/conf/logstash-kafka.conf 
input {
    kafka {
        auto_offset_reset => "earliest"
        bootstrap_servers => "10.150.55.94:9092,10.150.55.95:9092,10.150.50.223:9092"
        consumer_threads => "3"
        group_id => "logstash_group"
        heartbeat_interval_ms => ""
        request_timeout_ms => "30000"
        topics => "nginx_log"
  }
}

filter {
    json {
        source => "message"
  }
}

output {
   stdout {
        codec => rubydebug
 }
}

检查配置

人已赞赏
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧