丹东网站制作,wordpress插件去除google,洞泾网站建设,文章导入wordpress1. producer的结构 producer#xff1a;生产者
它由三个部分组成 interceptor#xff1a;拦截器#xff0c;能拦截到数据#xff0c;处理完毕以后发送给下游#xff0c;它和过滤器不同并不是丢弃数据#xff0c;而是将数据处理完毕再次发送出去#xff0c;这个默认是不…1. producer的结构 producer生产者
它由三个部分组成 interceptor拦截器能拦截到数据处理完毕以后发送给下游它和过滤器不同并不是丢弃数据而是将数据处理完毕再次发送出去这个默认是不存在的 serialiazer序列化器kafka中存储的数据是二进制的所以数据必须经过序列化器进行处理这个是必须要有的将用户的数据转换为byte[]的工具类其中k和v要分别指定 partitioner: 分区器主要是控制发送的数据到topic的哪个分区中这个默认也是存在的 record accumulator
本地缓冲累加器 默认32M producer的数据不能直接发送到kafka集群中因为producer和kafka集群并不在一起远程发送的数据不是一次发送一条这样太影响发送的速度和性能所以我们发送都是攒一批数据发一次record accumulator就是一个本地缓冲区producer将发送的数据放入到缓冲区中另外一个线程会去拉取其中的数据远程发送给kafka集群这个异步线程会根据linger.ms和batch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中这个本地缓冲区不仅仅可以适配两端的效率还可以批次形式执行任务增加效率 batch-size 默认16KB linger.ms 默认为0 生产者部分的整体流程 首先producer将发送的数据准备好 经过interceptor的拦截器进行处理如果有的话 然后经过序列化器进行转换为相应的byte[] 经过partitioner分区器分类在本地的record accumulator中缓冲 sender线程会自动根据linger.ms和batch-size双指标进行管控复制数据到kafka 2. producer的简单代码
2.1 准备
引入maven依赖
dependenciesdependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.3.2/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.30/version/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependency
/dependencies
在resources文件中创建log4j.properties
log4j.rootLoggerinfo,console
log4j.appender.consoleorg.apache.log4j.ConsoleAppender
log4j.appender.console.targetSystem.out
log4j.appender.console.layoutorg.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n
2.2 生产者中的设定参数
参数含义bootstrap.serverskafka集群的地址key.serializerkey的序列化器这个序列化器必须和key的类型匹配value.serializervalue的序列化器这个序列化器必须和value的类型匹配batch.size批次拉取大小默认是16KBlinger.ms拉取的间隔时间默认为0没有延迟partitioner分区器存在默认值interceptor拦截器选的
2.3 全部代码
public class producer_test {public static void main(String[] args) {Properties pro new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop106:9092);//设定集群地址pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设定两个序列化器其中StringSerializer是系统自带的序列化器要和数据的类型完全一致pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);//batch-size默认是16KB参数的单位是bytepro.put(ProducerConfig.LINGER_MS_CONFIG, 0);//默认等待批次时长是0KafkaProducerString, String producer new KafkaProducerString, String(pro);ProducerRecordString, String record new ProducerRecord(topic_a, this is hainiu);//发送数据的时候有kv两个部分但是一般k我们什么都不放只放value的值producer.send(record);producer.close();}
}在x-shell中观察消费的数据