Super detailed Kafka tutorial coming

Kafka concept and introduction

Kafka is a message system. Designed and developed by LinkedIn in 2011.

Kafka is a distributed, publish / Subscribe Based messaging system. The main design objectives are as follows:

Consumers subscribe to messages from brokers in pull mode.

Basic concepts of Kafka

Stand alone deployment structure

Cluster deployment structure

Topic and partition

A topic can contain one or more partitions. Because the storage and performance of a machine are limited, multiple partitions are to support horizontal expansion and parallel processing.

Partition and replica

Dividing into multiple partitions can distribute the message pressure to multiple machines, but if the data of one partition is lost, the overall data will be lost. Therefore, the concept of replica is introduced.

Each partition can add multiple replicas through the replica factor. In this way, even if one machine fails, there is backup data on other machines

3 partitions and 3 replicas in cluster environment:

Source code diagram:

2、 Installation deployment

I use 2.7 0 version, download and unzip

Note that select binary downloads instead of source download 2 Enter conf / server Properties file, open the following configuration

Self install, I use version 3.7 of zookeeper 4 Start Kafka

Kafka command line

View topic

Create topic

View topic information

Consumption order

Production order

Simple performance test:

Java client

producer:

public class SimpleKafkaProducer {

    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("bootstrap.servers","192.168.157.200:9092");

        KafkaProducer producer=new KafkaProducer(properties);

        ProducerRecord record=new ProducerRecord("test1","这是一条消息");
        producer.send(record);
        producer.close();
    }
}

consumer

public class SimpleKafkaConsumer {

    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("bootstrap.servers","192.168.157.200:9092");
        //消费者组
        properties.setProperty("group.id","group1");


        KafkaConsumer consumer=new KafkaConsumer(properties);
        //订阅topic
        consumer.subscribe(Arrays.asList("test1"));

        while (true){
            //拉取数据
            ConsumerRecords poll=consumer.poll(100);
            ((ConsumerRecords) poll).forEach(
                    data->{
                        System.out.println(((ConsumerRecord)data).value());
                    }
            );
        }
    }
}

3、 Advanced features

Producer characteristics

Producer confirmation mode

Can setting acks to - 1 ensure that messages are not lost?

A: No. If there is only one copy of the partition, that is, there are only leaders but no followers, then the downtime messages will also be lost. Therefore, at least 2 or more copies should be set.

In addition, to improve the data reliability, set min.insync while setting acks = - 1 Replicas (minimum number of replicas, default 1)

Producer - synchronous sending

public void syncSend() throws ExecutionException,InterruptedException {
        Properties properties=new Properties();
        properties.setProperty("key.serializer","这是一条消息");
        Future future = producer.send(record);
        //同步发送消息方法1
        Object o = future.get();
        
        //同步发送消息方法2
        producer.send(record);
        producer.flush();
        
        producer.close();
    }

Producer asynchronous send

public void asyncSend(){
        Properties properties=new Properties();
        properties.setProperty("key.serializer","192.168.157.200:9092");
       
        //生产者在发送批次之前等待更多消息加入批次的时间
        properties.setProperty("linger.ms","1");
        properties.setProperty("batch.size","20240");
        
        
        KafkaProducer producer=new KafkaProducer(properties);

        ProducerRecord record=new ProducerRecord("test1","这是一条消息");
        //异步发送方法1
        producer.send(record);
        //异步发送方法2
        producer.send(record,((Metadata,exception) -> {
            if(exception==null){
                System.out.println("record="+record.value());
            }
        }));
    }

Producer sequence assurance

Synchronous request sending + broker can only request one request at a time

 public void sequenceGuarantee(){
        Properties properties=new Properties();
        properties.setProperty("key.serializer","192.168.157.200:9092");
        //生产者在收到服务器响应之前可以发送多少个消息,保证一个一个的发
        properties.setProperty("max.in.flight.requests.per.connection","1");
        KafkaProducer producer=new KafkaProducer(properties);

        ProducerRecord record=new ProducerRecord("test1","这是一条消息");
        //同步发送
        producer.send(record);
        producer.flush();

        producer.close();
    }

Producer message reliability delivery

Transaction + idempotent

The transaction here is to send 100 messages. If an error is reported, all messages cannot be read by the consumer.

 public static void transaction(){
        Properties properties=new Properties();
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //重试次数
        properties.setProperty("retries","3");
        properties.setProperty("bootstrap.servers","192.168.157.200:9092");
        //生产者发送消息幂等,此时会默认把acks设置为all
        properties.setProperty("enable.idempotence","true");
        //事务id
        properties.setProperty("transactional.id","tx0001");
        ProducerRecord record=new ProducerRecord("test1","这是一条消息");
        KafkaProducer producer=new KafkaProducer(properties);
        try {
            producer.initTransactions();
            producer.beginTransaction();
            for (int i = 0; i < 100; i++) {
                producer.send(record,(recordMetadata,e) -> {
                   if(e!=null){
                       producer.abortTransaction();
                       throw new KafkaException("send error"+e.getMessage());
                   }
                });
            }
            producer.commitTransaction();
        } catch (ProducerFencedException e) {
            producer.abortTransaction();
            e.printStackTrace();
        }
        producer.close();
    }

Consumer characteristics

Consumer - consumer group

Each consumer group records the offset of a partition. A partition can only be consumed by one consumer group.

As shown in the figure, a topic has four partitions on two brokers.

For consumer group A, it has two consumers, so one consumer in it consumes two partitions. For consumer group B, it has four consumers, so one consumer consumes one partition

Consumer offset synchronous submission

void commitSyncReceive() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","49.234.77.60:9092");
        props.put("group.id","group_id");
        //关闭自动提交
        props.put("enable.auto.commit","false");
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");
        props.put("max.poll.records",1000);
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer",StringDeserializer.class.getName());
        props.put("value.deserializer",StringDeserializer.class.getName());
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList(TOPIC));

        while (true){
            ConsumerRecords<String,String> msgList=consumer.poll(1000);
            for (ConsumerRecord<String,String> record:msgList){
                System.out.printf("offset = %d,key = %s,value = %s%n",record.offset(),record.key(),record.value());
            }
            //同步提交,当前线程会阻塞直到 offset 提交成功
            consumer.commitSync();
        }

    }

Consumer - asynchronous submission

void commitAsyncReceive() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","group_id");
        props.put("enable.auto.commit",record.value());
            }
            //异步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition,OffsetAndMetadata> map,Exception e) {
                    if(e!=null){
                        System.err.println("commit Failed for "+map);
                    }
                }
            });
        }
    }

Consumer - Custom save offset

void commitCustomSaveOffest() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers",String>(props);
        consumer.subscribe(Arrays.asList(TOPIC),new ConsumerRebalanceListener() {
            //调用时机是Consumer停止拉取数据后,Rebalance开始之前,我们可以手动提交offset
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                
            }

            //调用时机是Rebalance之后,Consumer开始拉取数据之前,我们可以在此方法调整offset
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {

            }
        });

        while (true){
            ConsumerRecords<String,record.value());
            }
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition,Exception e) {
                    if(e!=null){
                        System.err.println("commit Failed for "+map);
                    }
                }
            });
        }

    }

4、 Springboot integrates Kafka

<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
</dependency>

#kafka
spring.kafka.bootstrap-servers=192.168.157.200:9092
# 发生错误后,消息重发的次数
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
# 设置生产者内存缓冲区的大小。
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1

#消费者
#自动提交的时间间隔
spring.kafka.consumer.auto-commit-interval=1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 在侦听器容器中运行的线程数。
spring.kafka.listener.concurrency=5
#listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false
@Component
public class MyKafkaProducer {


    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;


    public void send(String topic,Object object){
        ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,object);
        future.addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String,Object> result) {
                System.out.println("发送消息成功"+result);
            }
        });
    }
@Component
public class MyKafkaConsumer {

    @KafkaListener(topics = "test1",groupId = "group_test")
    public void consumer(ConsumerRecord<?,?> record,AckNowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            System.out.println("group_test 消费了: Topic:" + topic + ",Message:" + msg);
            ack.ackNowledge();
        }
    }

    @KafkaListener(topics = "test1",groupId = "group_test2")
    public void consumer2(ConsumerRecord<?,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            System.out.println("group_test2 消费了: Topic:" + topic + ",Message:" + msg);
            ack.ackNowledge();
        }
    }

}
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>