kafka 学习笔记
参考链接
尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
kafka 安装
kafka基于zookeeper,所以得先安装zookeeper,官网下载解压即用步骤略
kafka也是下载解压即用,步骤略
kafka官方连接工具
https://www.kafkatool.com/download.html
kafka 组件
每个kafka节点,都是一个Broker,节点下可以创建多个主题Topic,主题下面可以创建多个分区partition,最终生产者生产的消息放在分区中,消费者消费分区中的消息
消费者组概念,消费者group由多个消费者组成,每个消费者可以分别消费broker中分区的数据,注意,一个分区的消息只能由一个消费者消费,一个消费者可以消费多个分区的消息 通过 --consumer-property group.id=my-console-consumer-group来指定消费者所属分组
kafka 简单命令使用
# 启动kafka
bin/kafka-server-start.sh config/server.properties
# 通过kafka-topics.sh 创建topic
./kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create
# kafka-console-producer.sh启动生产者客户端
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# kafka-console-consumer.sh启动消费者客户端
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
SpringBoot整合kafka
导入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
kafka: # 集群地址 bootstrap-servers: 192.168.137.139:9092 listener: #设置是否批量消费,默认 single(单条),batch(批量) type: single # 在侦听器容器中运行的线程数。 concurrency: 5 # listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false # 生产者配置 producer: # 重试次数 retries: 3 # 应答级别 # acks=0 把消息发送到kafka就认为发送成功 # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 acks: all # 批量处理的最大大小 单位 byte batch-size: 4096 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka buffer-memory: 33554432 # 客户端ID client-id: hello-kafka # Key 序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer # Value 序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息压缩:none、lz4、gzip、snappy,默认为 none。 compression-type: gzip properties: linger: # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka ms: 1000 max: block: # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms ms: 6000 request: size: 1048576 # 消费者配置 consumer: # 默认消费者组 group-id: test-group # 自动提交 offset 默认 true enable-auto-commit: false # 自动提交的频率 单位 ms auto-commit-interval: 1000 # 批量消费最大数量 max-poll-records: 100 # Key 反序列化类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value 反序列化类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset # latest:重置为分区中最新的offset(消费分区中新产生的数据) # none:只要有一个分区不存在已提交的offset,就抛出异常 auto-offset-reset: latest properties: session: timeout: # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 ms: 120000 request: timeout: # 请求超时 ms: 120000
配置生产者
@Configuration public class KafkaProducerConfiguration { @Resource private KafkaProperties kafkaProperties; /** * 不包含事务 producerFactory * 注意,从kafkaProperties取值时,需要注意值类型,看是否需要强转 * @return */ public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<>(); //kafka 集群地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); //重试次数 props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries()); //应答级别 //acks=0 把消息发送到kafka就认为发送成功 //acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 //acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 props.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getProducer().getAcks()); //KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, kafkaProperties.getProducer().getProperties().get(ProducerConfig.MAX_BLOCK_MS_CONFIG)); //批量处理的最大大小 单位 byte props.put(ProducerConfig.BATCH_SIZE_CONFIG, (int)kafkaProperties.getProducer().getBatchSize().toBytes()); //发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getProperties().get(ProducerConfig.LINGER_MS_CONFIG)); //生产者可用缓冲区的最大值 单位 byte props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProperties.getProducer().getBufferMemory().toBytes()); //每条消息最大的大小 props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getProducer().getProperties().get(ProducerConfig.MAX_REQUEST_SIZE_CONFIG)); //客户端ID props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getProducer().getClientId()); //Key 序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getKeySerializer()); //Value 序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer().getValueSerializer()); //消息压缩:none、lz4、gzip、snappy,默认为 none。 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaProperties.getProducer().getCompressionType()); //自定义分区器 //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName()); return new DefaultKafkaProducerFactory<>(props); } /** * 包含事务 producerFactory * @return */ public ProducerFactory<String, String> producerFactoryWithTransaction() { DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = (DefaultKafkaProducerFactory<String, String>) producerFactory(); //设置事务Id前缀 defaultKafkaProducerFactory.setTransactionIdPrefix("tx"); return defaultKafkaProducerFactory; } /** * 不包含事务 kafkaTemplate * @return */ @Bean("kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * 包含事务 kafkaTemplate * @return */ @Bean("kafkaTemplateWithTransaction") public KafkaTemplate<String, String> kafkaTemplateWithTransaction() { return new KafkaTemplate<>(producerFactoryWithTransaction()); } /** * 以该方式配置事务管理器:就不能以普通方式发送消息,只能通过 kafkaTemplate.executeInTransaction 或 * 在方法上加 @Transactional 注解来发送消息,否则报错 * @param producerFactory * @return */ // @Bean // public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<Integer, String> producerFactory) { // return new KafkaTransactionManager<>(producerFactory); // } }
配置消费者
@Slf4j @Configuration public class KafkaConsumerConfiguration { @Resource private KafkaProperties kafkaProperties; /** * consumerFactory * 注意,从kafkaProperties取值时,需要注意值类型,看是否需要强转 * @return */ public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); //kafka集群地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); //消费者分组ID props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId()); //Key 反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getKeyDeserializer()); //Value 反序列化类 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getValueDeserializer()); //设置Consumer拦截器 //props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyKafkaConsumerInterceptor.class.getName()); //自动提交 offset 默认 true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getConsumer().getEnableAutoCommit()); //自动提交的频率 单位 ms props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, (int) kafkaProperties.getConsumer().getAutoCommitInterval().getSeconds()); //批量消费最大数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaProperties.getConsumer().getMaxPollRecords()); //当kafka中没有初始offset或offset超出范围时将自动重置offset //earliest:重置为分区中最小的offset //latest:重置为分区中最新的offset(消费分区中新产生的数据) //none:只要有一个分区不存在已提交的offset,就抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset()); //session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaProperties.getConsumer().getProperties().get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)); //请求超时 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProperties.getConsumer().getProperties().get(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); //设置 consumerFactory factory.setConsumerFactory(consumerFactory()); //设置是否开启批量监听 factory.setBatchListener(false); //设置消费者组中的线程数量 factory.setConcurrency(1); return factory; } /** * 消费异常处理器 * @return */ @Bean public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { //打印消费异常的消息和异常信息 log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId()); return null; } }; } }
主题配置
@Configuration public class KafkaTopicConfiguration { /** * 创建 KafkaAmin,可以自动检测集群中是否存在topic,不存在则创建 * @return */ @Bean public KafkaAdmin kafkaAdmin() { return new KafkaAdmin(new HashMap<>()); } @Bean public NewTopic newTopic() { // 创建 topic,指定 名称、分区数、副本数 return new NewTopic("austin", 4, (short) 2); } }
测试消息生产与消费
/** * kafka 生产服务 **/ @Slf4j @Service public class KafkaProducerService { @Qualifier("kafkaTemplate") @Resource private KafkaTemplate<String, String> kafkaTemplate; @Qualifier("kafkaTemplateWithTransaction") @Resource private KafkaTemplate<String, String> kafkaTemplateWithTransaction; /** * 发送消息(同步) * @param topic 主题 * @param key 键 * @param message 值 */ public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException { //可以指定最长等待时间,也可以不指定 kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message); //指定key,kafka根据key进行hash,决定存入哪个partition // kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS); //存入指定partition // kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS); } /** * 发送消息并获取结果 * @param topic * @param message * @throws ExecutionException * @throws InterruptedException */ public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException { SendResult<String, String> result = kafkaTemplate.send(topic, message).get(); log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message); log.info("The partition the message was sent to: " + result.getRecordMetadata().partition()); } /** * 发送消息(异步) * @param topic 主题 * @param message 消息内容 */ public void sendMessageAsync(String topic, String message) { ListenableFuture<SendResult<String, String>> future = (ListenableFuture<SendResult<String, String>>) kafkaTemplate.send(topic, message); //添加回调 future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message); } @Override public void onSuccess(SendResult<String, String> stringStringSendResult) { log.info("sendMessageAsync success! topic: {}, message: {}", topic, message); } }); } /** * 可以将消息组装成 Message 对象和 ProducerRecord 对象发送 * @param topic * @param key * @param message * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException */ public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException { // 组装消息 Message msg = MessageBuilder.withPayload(message) .setHeader(KafkaHeaders.MESSAGE_KEY, key) .setHeader(KafkaHeaders.TOPIC, topic) .setHeader(KafkaHeaders.PREFIX,"kafka_") .build(); //同步发送 kafkaTemplate.send(msg).get(); // 组装消息 // ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message); // kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS); } /** * 以事务方式发送消息 * @param topic * @param key * @param message */ public void sendMessageInTransaction(String topic, String key, String message) { kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() { @Override public Object doInOperations(KafkaOperations<String, String> kafkaOperations) { kafkaOperations.send(topic, key, message); //出现异常将会中断事务,消息不会发送出去 throw new RuntimeException("exception"); } }); } }
@Component @Slf4j public class MessageConsumer { @KafkaListener(topics = {"austin"}, groupId = "test-group") public void consumer(ConsumerRecord<?, ?> consumerRecord) { Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); log.info(">>>>>>>>>> record =" + kafkaMessage); //判断对象是否存在 if (kafkaMessage.isPresent()) { //得到Optional实例中的值 Object message = kafkaMessage.get(); log.info("message: {}", message); } } }
//Controller测试方法 @GetMapping("/testKafkaProducer") public Result<?> testKafkaProducer(@RequestParam(value = "message", required = true) String message, @RequestParam(value = "topic", required = true) String topic){ kafkaProducerService.sendMessageAsync(topic, message); return Result.OK("操作成功"); }
kafka各类机制介绍
参考链接 Kafka最全知识点整理
1. 在集群中节点的分区和副本如何存放?
Kafka在创建Topic时,会根据Broker的数量和配置的副本因子来决定分区的放置。第一个分区的副本会随机放置,后续分区的副本会按照一定的规则在Broker间轮询。topic不能区分数据所属节点,真正区分数据所属节点是分区partition,创建的副本也是依据分区来创建,分区副本所属节点一定不是在分区所属节点,不然副本的就没有意义了。
2. kafka设计理念
Kafka是一个分布式流处理平台,它通过Topic组织消息,并支持Producers发布消息和Consumers订阅消息。Kafka以集群模式运行,由多个Broker组成,每个Broker是一个独立的服务节点。Producers通过网络将消息发布到集群,而Consumers从Broker拉取消息进行处理。
3. 集群中broker节点如何选举成zk中的controller节点?
controller节点就是第一个与zk连接的kafka节点,其他节点则监听controller节点的状态,如果controller宕机,其他节点就会尝试与zk连接,同样,谁先连接谁就是下一个controller。详细步骤参考:https://www.bilibili.com/video/BV1Gp421m7UN?p=19
4. kafka分区的存储位置
Kafka的分区存储位置由log.dirs
参数指定,可以配置多个目录以提高读写性能。Kafka会优先在分区目录数量最少的目录下创建新的分区。
5. kafkaACK应答机制
Kafka提供了三种ACK级别:
acks=0
:生产者不等待任何Broker的确认,延迟最低,但可靠性最差。acks=1
:等待Leader副本确认,但如果Leader挂掉,可能会丢失数据。acks=all
:等待可同步副本列表中所有副本确认,确保消息不会丢失。
6. 幂等性问题
为什么需要幂等性 在使用Kafka时,需要确保Exactly-Once语义。分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack前,有可能出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。注,在未达到最大重试次数前,会自动重试(非应用程序代码写的重试)。
Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:
生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:
上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(比如网络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。
因此需要保证幂等性保证即使多次发送也要让最终的结果一样。
幂等性原理
为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。
PID:ProducerID,每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID
Sequence Number:SequenceNumber :对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
使用幂等
通过如下配置使用幂:
enable.idempotence=true。enable.idempotence配置项表示是否使用幂等性。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
acks=all。
7. kafka事务问题
为什么需要事务
Kafka 的 Exactly Once 幂等性只能保证单次会话内的精准一次性,不能解决跨会话和跨分区的问题。
假如有如下问题:
producer发的多条消息组成一个事务,这些消息需要对consumer同时可见或者同时不可见 。
producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。
kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费偏移量也不能提交。
producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。
在一个原子操作中,根据包含的操作类型,可以分为三种情况:
只有Producer生产消息
消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的consume-transform-produce 模式
只有consumer消费消息
前两种情况需要引入事务,第3种情况不需要引入,这种操作其实没有什么意义,跟使用手动提交效果一样,而且也不是事务属性引入的目的。
开启事务
对于Producer,需要设置transactional.id属性,这个属性的作用下文会提到。设置了transactional.id属性后,enable.idempotence属性会自动设置为true。
对于Consumer,需要设置isolation.level = read_committed,这样Consumer只会读取已经提交了事务的消息。另外一个值是read_uncommitted,默认级别,允许读取尚未提交的消息。
查看数据日志
kafka-dump-log.sh --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
8. kafka生产者分区策略
给定了分区号,直接将数据发送到指定的分区里面去。
没有给定分区号,通过key的hashcode,和 topic的分区数,进行取模。例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那 么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
既没有给定分区号,也没有给定key值,直接轮循进行分区。Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。 例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。(注意这里batch.size=16k的设置,意思是,在保证消息完整性的前提下,将消息按16k一个批次的进行分区)
自定义分区。根据业务需要制定以分区策略。
最佳实践:生产者根据消息的key来决定将数据发送到哪个分区,这样可以保证具有相同key的消息总是被发送到同一个分区。
9. kafka消费者分区策略
Range范围分配策略
Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
RoundRobin轮询策略
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。使用RoundRobin策略有两个前提条件必须满足:
同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;
每个消费者订阅的主题必须相同。
StickyAssignor粘性分区策略
无论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。
Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动(上一次的结果是有粘性的),其目标有两点:
分区的分配尽量的均衡
每一次重分配的结果尽量与上一次分配结果保持一致
StickyAssignor的模式比其他两种提供更加均衡的分配结果,在发生Consumer或者Partition变更的情况下,也能减少不必要的分区调整。