新网站收录多少关键词,郑州工装定制公司,南昌网站建设模板合作,正规网站开发公司目录 获取消息
1. 消费者获取消息的流程逻辑分析
阶段一#xff1a;消费者初始化
阶段二#xff1a;分区分配与重平衡#xff08;Rebalance#xff09;
阶段三#xff1a;消息拉取与处理
阶段四#xff1a;偏移量提交
核心设计思想
2. 流程
关键点总结
常见参数…目录 获取消息
1. 消费者获取消息的流程逻辑分析
阶段一消费者初始化
阶段二分区分配与重平衡Rebalance
阶段三消息拉取与处理
阶段四偏移量提交
核心设计思想
2. 流程
关键点总结
常见参数
一、核心必填参数
二、消费者组与重平衡参数
三、消息拉取与处理参数
四、偏移量Offset提交参数
五、错误处理与容错参数
六、高级配置 获取消息
1. 消费者获取消息的流程逻辑分析
Kafka 消费者通过 消费者组Consumer Group 协作消费消息核心流程分为 初始化、分区分配、消息拉取、偏移量提交 四个阶段 阶段一消费者初始化
订阅 Topic 消费者通过 consumer.subscribe() 订阅一个或多个 Topic。若消费者属于同一消费者组组内消费者会均分 Topic 的分区。
加入消费者组 消费者启动时向 Broker 发送 JoinGroup 请求加入消费者组。若消费者是组内第一个成员会被选举为 Leader 消费者负责分区分配。 阶段二分区分配与重平衡Rebalance
分区分配策略 Leader 消费者根据策略如 RangeAssignor 或 RoundRobinAssignor分配分区。分配结果通过 SyncGroup 请求同步给所有消费者。
重平衡触发条件 消费者加入或离开组。Topic 的分区数量变化。消费者心跳超时默认 session.timeout.ms45s。 阶段三消息拉取与处理
拉取消息 消费者向分区的 Leader Broker 发送 FetchRequest从当前偏移量Offset拉取消息。关键配置 max.poll.records单次拉取最大消息数默认 500。fetch.min.bytes最小拉取数据量默认 1B优先吞吐量时可调大。
处理消息 用户通过 ConsumerRecords 处理消息需在 max.poll.interval.ms默认 5分钟内完成否则触发重平衡。 阶段四偏移量提交
提交 Offset 自动提交由消费者线程周期性提交enable.auto.committrue默认 5秒。手动提交用户调用 commitSync() 或 commitAsync() 精确控制。Offset 存储在 Kafka 内部 Topic __consumer_offsets 中。 核心设计思想
负载均衡通过消费者组实现分区并行消费。容错性心跳机制检测消费者存活重平衡保障分区重新分配。至少一次语义Offset 提交后移确保消息至少被消费一次。 2. 流程 关键点总结
重平衡机制保障消费者组动态扩展和容错。Offset 管理通过提交 Offset 实现消费进度持久化。消息拉取优化通过 fetch.min.bytes 和 max.poll.records 平衡吞吐与延迟。超时控制session.timeout.ms 和 max.poll.interval.ms 防止消费者僵死 常见参数
一、核心必填参数 参数名 默认值 说明 bootstrap.servers 无 Kafka 集群地址列表逗号分隔如 host1:9092,host2:9092 。 group.id 无 消费者组 ID同一组内的消费者共享分区负载。 key.deserializer 无 Key 的反序列化类如 org.apache.kafka.common.serialization.StringDeserializer 。 value.deserializer 无 Value 的反序列化类同上。 二、消费者组与重平衡参数 参数名 默认值 说明 session.timeout.ms 45000 (45秒) 消费者与 Broker 的心跳超时时间超时触发重平衡。 heartbeat.interval.ms 3000 (3秒) 消费者发送心跳的间隔时间需小于 session.timeout.ms 的 1/3。 max.poll.interval.ms 300000 (5分钟) 两次 poll() 调用的最大间隔时间超时触发重平衡。 partition.assignment.strategy RangeAssignor 分区分配策略如 RoundRobinAssignor 、CooperativeStickyAssignor 。 三、消息拉取与处理参数 参数名 默认值 说明 fetch.min.bytes 1 (1字节) 单次拉取的最小数据量Broker 等待足够数据后返回提升吞吐量。 fetch.max.bytes 52428800 (50MB) 单次拉取的最大数据量需小于 Broker 的 message.max.bytes 。 max.poll.records 500 单次 poll() 返回的最大消息数避免内存溢出。 max.partition.fetch.bytes 1048576 (1MB) 单分区单次拉取的最大数据量。 四、偏移量Offset提交参数 参数名 默认值 说明 enable.auto.commit true 是否自动提交 Offset建议设为 false 手动提交确保精确控制。 auto.commit.interval.ms 5000 (5秒) 自动提交 Offset 的时间间隔enable.auto.committrue 时生效。 auto.offset.reset latest 无初始 Offset 时的策略br- earliest 从最早消息开始。br- latest 从最新消息开始。 五、错误处理与容错参数 参数名 默认值 说明 isolation.level read_uncommitted 事务消息隔离级别br- read_committed 仅读取已提交的事务消息。 六、高级配置 参数名 默认值 说明 client.id 无 客户端标识用于监控和日志。 connections.max.idle.ms 540000 (9分钟) 空闲连接超时时间Broker 主动关闭超时连接。 request.timeout.ms 30000 (30秒) 消费者等待 Broker 响应的超时时间。