北京网站建站公,重庆建设厅网站公示公告栏,上海网站开发技术最好公司电话,软件外包公司好不好前言#xff1a;Apache Kafka是一个分布式流处理平台#xff0c;由LinkedIn开发并捐赠给Apache软件基金会。它主要用于构建实时数据流管道和流应用。Kafka具有高吞吐量、可扩展性和容错性的特点#xff0c;适用于处理大量数据。
以下是Kafka的一些核心概念和特性#xff1…前言Apache Kafka是一个分布式流处理平台由LinkedIn开发并捐赠给Apache软件基金会。它主要用于构建实时数据流管道和流应用。Kafka具有高吞吐量、可扩展性和容错性的特点适用于处理大量数据。
以下是Kafka的一些核心概念和特性 主题TopicKafka中消息的分类生产者Producer将消息发送到特定的主题消费者Consumer从主题中读取消息。 分区Partition为了提高并行性和吞吐量每个主题可以被分割成多个分区。分区可以分布在集群中的不同服务器上。 生产者Producer向Kafka集群发送消息的客户端。 消费者Consumer从Kafka集群读取消息的客户端。消费者通常以消费者组Consumer Group的形式存在每个消费者组中的消费者可以独立地读取消息。 消费者组Consumer Group是一组共享相同主题订阅的消费者。消费者组允许多个消费者实例共同处理主题中的消息。 BrokerKafka集群中的一个节点负责存储数据并提供数据服务。 ZooKeeperKafka使用ZooKeeper来维护集群元数据和协调集群操作。 消息MessageKafka中的基本数据单元由键Key、值Value、时间戳等组成。 偏移量Offset每个分区中消息的索引消费者通过偏移量来追踪其在分区中的位置。 副本Replica为了提高数据的可靠性和可用性Kafka中的每个分区都有多个副本。其中一个副本是领导者Leader其他副本是追随者Follower。 领导者选举Leader Election当分区的领导者不可用时追随者之一会被选为新的领导者。 高可用性High Availability, HA通过副本和领导者选举机制Kafka能够保证服务的高可用性。 持久性DurabilityKafka提供了消息持久化机制确保消息不会因为系统故障而丢失。 可伸缩性ScalabilityKafka可以通过增加更多的Broker来水平扩展以处理更多的数据。 容错性Fault ToleranceKafka通过副本机制来容忍节点故障。
Kafka的应用场景包括
日志聚合收集和集中分布式系统中的日志数据。实时数据分析流式处理和分析实时数据。事件源Event Sourcing使用事件来记录系统中发生的状态变化。消息队列作为消息队列系统用于任务分发和解耦服务。流处理构建复杂的流处理应用如实时监控、报警系统等。
Kafka通常与其他技术如Apache Flink、Apache Storm、Apache Samza等结合使用以构建完整的流处理解决方案。
安装kafka集群 第一步创建文件夹路径 命令mkdir -p /var/kafka-logs /data/zk 命令mkdir -p /usr/local/kafka
第二步上传压缩包到 /usr/local/kafka 解压进入解压后的目录/kafka-3.0.0/kafka_2.12-3.0.0 tar -zxvf kafka_2.12-3.0.0.tgz cd kafka_2.12-3.0.0
第三步修改配置文件 修改/usr/local/kafka/kafka_2.12-3.0.0/config/server.properties vim server.properties修改以下内容
broker.id0 #保证每个broker唯一第一台可以不修改默认为0后面两台需要修改如改为1和2
num.partitions3 #分区数量一般与broker保持一致
listenersPLAINTEXT://10.0.0.8:9092 #修改为本机ip
zookeeper.connect10.0.0.8:2181,10.0.0.9:2181,10.0.0.10:2181 #配置三台服务zookeeper连接地址
host.name10.0.0.8 #新增host.name值分别设为不同的值3台机器根据自己的ip设置
log.dirs/var/kafka-logs/ #修改log.dirs目录修改/kafka-3.0.0/kafka_2.12-3.0.0/config/zookeeper.properties
dataDir/data/zk #修改为自定义的目录
maxClientCnxns0 #注释掉
#设置连接参数添加如下配置
tickTime2000
initLimit10
syncLimit5
#设置broker Id的服务地址
server.010.0.0.8:2888:3888
server.110.0.0.9:2888:3888
server.210.0.0.10:2888:3888而消费者配置consumer.properites和生产者配置producer.properties根据具体业务来进行调配
第四步 三台服务器分别执行如下命令myid的值取决于broker.id
echo 0 /data/zk/myid
echo 1 /data/zk/myid
echo 2 /data/zk/myid第五步配置hosts文件 查看主机名命令hostname
10.0.0.8 ecs-ubiinf6as0wpbx
10.0.0.9 ecs-8k9rxzfxvsuyp2
10.0.010 ecs-bodhwf981mt5r9命令vim /etc/hosts 注释掉本机的加入以上三条信息
第六步启动 kafka启动时先启动zookeeper再启动kafka关闭时相反先关闭kafka再关闭zookeeper 启动zookeeper ./zookeeper-server-start.sh /usr/local/kafka/kafka_2.12-3.0.0/config/zookeeper.properties 启动kafka ./kafka-server-start.sh /usr/local/kafka/kafka_2.12-3.0.0/config/server.properties 查看日志 tail -f /usr/local/kafka/kafka_2.12-3.0.0/logs/server.log Kafka集群配置成功且无集群节点连接异常报错
第七步放行端口 如集群配置完成后开启防火墙报节点无法连接需做放行端口配置 放行端口
firewall-cmd --zonepublic --add-port2888/tcp --permanent 放行2888端口
firewall-cmd --zonepublic --add-port3888/tcp --permanent 放行3888端口
firewall-cmd --zonepublic --add-port9092/tcp --permanent 放行9092端口firewall-cmd --reload #重新载入 返回 success 代表成功firewall-cmd --zonepublic --query-port2888/tcp 查看 返回 yes 代表开启成功
firewall-cmd --zonepublic --query-port3888/tcp 查看 返回 yes 代表开启成功
firewall-cmd --zonepublic --query-port9092/tcp 查看 返回 yes 代表开启成功