Kafka detailed tutorial plus surface test questions

1、 Deploy Kafka cluster

To start the zookeeper service:

Modify the configuration file config / server properties

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=localhost:2181

Configure environment variables

vi /etc/profile

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

Start Kafka service: CD / usr / local / Kafka/

Create topic

View topic

View topic details

Generate message:

Receive message:

2、 Kafka architecture

Simply put: one consumer group is peer-to-peer, and multiple consumer groups can publish and subscribe.

A topic can have multiple partitions, and each partition is an ordered and immutable message sequence. New messages are continuously appended, and the partition will assign a sequence ID number - offset to each message record. Although the records are consumed, they will not be deleted immediately, but the offset will be moved. Kafka will have a configurable retention policy for deletion (7 days by default).

Kafka only guarantees the order of messages in one partition, but cannot guarantee the order of messages between different partitions of a subject. However, if you want to ensure that all messages are absolutely orderly, you can assign only one partition to one topic, although this will mean that each consumer group can only have one consumption process at the same time.

Partition policy

Which partition will the producer enter after sending a message?

2.2 storage architecture

00000000000000170410. The log file records messages from 170411 to ~ (next log file number). The third message in the figure corresponds to 348, that is, the offset of the third message in the log file is 348

3、 Ensure data reliability

3.1 replica synchronization strategy

There are generally two schemes, and Kafka chose the second.

3.2 ISR

ISR (in sync replica set) means the set of followers that are synchronized with the leader. When the follower in ISR completes data synchronization with the leader, it sends an ask to the producer. If the follower does not synchronize data within the specified time (replica.lag.time.max.ms this parameter is set), the ISR will be kicked out. After the leader fails, it will be elected in the ISR queue.

3.3 ack response mechanism

By setting request@ R_ 301_ 1117@.acks Answer to ensure. There are three settings:

3.4 fault handling details

Leo: maximum offset per copy

HW: the largest offer that consumers can see, and the smallest Leo in ISR

(1) When follower fails

After a failure, the follower will be temporarily kicked out of the ISR. After the follower is restarted, the follower will read the last HW recorded on the local disk, cut off the part higher than HW in his log file, and then start synchronization from the leader until the Leo of the follower is greater than or equal to the HW of the partition, and then you can rejoin the ISR.

(2) When the leader fails

After a leader fails, a leader will be selected again. In order to ensure the data consistency of multiple copies, the rest of the followers will cut off the parts higher than HW, and then synchronize from the new leader

Exactly once semantics

When ack is set to - 1, it can ensure that no data is lost between the producer and the server, that is, at least once

If ack is set to 0, the message can be sent at least once.

For some very important messages, it is necessary to ensure that they are neither lost nor repeated, that is, the exact only semantics. Kafka before version 0.11 can't do it. Version 0.11 of Kafka introduces a major feature: idempotency. The so-called idempotency means that no matter how many times the producer sends duplicate data to the server, the server side will persist only one. Idempotency combined with at least once semantics constitutes Kafka's exactly once semantics. Namely:

At least once + idempotency = exactly once

To enable idempotency, set enable Idompotent can be set to true

Implementation mode; A producer with idempotency enabled will allocate a PID during initialization, and the message sent to the same partition will be accompanied by sequence number. The broker side will cache < PID, partition, seqnumber >, and only one message with the same primary key will be persisted when it is submitted. However, the PID will change after restart, and different partitions also have different primary keys, so its idempotency cannot guarantee cross partition and cross session.

4、 Consumer

4.1 consumption mode

The message adopts the pull method. The disadvantage of the pull method is that if there is no data, it will cause empty polling. For this reason, Kafka consumers will pass in a duration parameter timeout when consuming data. If there is no data available for consumption, the consumer will wait for a period of time before returning. This duration is timeout.

4.2 partition allocation strategy

There are two strategies for Kafka message consumption, roundrobin and range

4.3 consumer offset location saving

Offset is saved with the consumer group + topic + partition as the key.

Saved in zookeeper before version 0.9

After version 0.9, it is saved in a topic built in Kafka, which is__ consumer_ offsets

5.1 Kafka efficient data reading and writing

Sequential writing can reach 600m / s, while random writing is only 100k / S 2 Zero copy technology

5.2 Kafka affairs

Kafka introduced transaction support after version 0.11. Transactions can ensure that messages are just once semantic, and production and consumption can cross partitions and sessions.

In order to realize cross partition and cross session transactions, it is necessary to introduce a globally unique transaction ID and bind the PID obtained by the producer with the transaction ID. In this way, after the producer is restarted, the original PID can be obtained through the ongoing transaction ID.

6、 Kafka API

6.1 Producer API

Message sending process

Code example

package com.mmc.springbootstudy.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @description:
 * @author: mmc
 * @create: 2021-04-22 21:22
 **/

public class ProductDemo {

    public final static String TOPIC = "mmc";


    /**
     * 不带回调的发送API
     */
    public void send() {
        Properties props = new Properties();
        props.put("bootstrap.servers","49.234.77.60:9092");
        props.put("acks","all");
        props.put("retries",3);
        props.put("batch.size",16384);
        props.put("key.serializer",StringSerializer.class.getName());
        props.put("value.serializer",StringSerializer.class.getName());
        String key="test";
        String value="我是一个小红花222";
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
        producer.send(new ProducerRecord<String,String>(TOPIC,key,value));
        producer.close();
    }

    /**
     * 带回调的发送
     */
    public void callSend() {
        Properties props = new Properties();
        props.put("bootstrap.servers",value),new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata,Exception e) {
                if(e == null){
                    System.out.println("发送成功");
                }else {
                    e.printStackTrace();
                }
            }
        });
        producer.close();
    }

    /**
     * 同步发送API
     * 一条消息发送之后,会阻塞当前线程,直至返回 ack。
     */
    public void syncSend() throws ExecutionException,InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers",String>(props);
        RecordMetadata recordMetadata = producer.send(new ProducerRecord<String,value)).get();
        System.out.println("----------recordMetadata:"+recordMetadata);
        producer.close();
    }

    public static void main( String[] args ) throws ExecutionException,InterruptedException {
//        new ProductDemo().send();
        new ProductDemo().syncSend();
    }
}

The kafkaproducer object is heavy and thread safe, so you can use the same object to send messages globally.

6.2 Consumer API

When consumers consume, they can distinguish between automatic submission, manual synchronous submission and manual asynchronous submission. Manual synchronous submission will block the current thread until it is submitted successfully and retry with failure. Asynchronous manual commit does not fail and retry.

Whether synchronous submission or asynchronous submission, data leakage consumption and repeated consumption may be caused. If offset is submitted before consumption, data consumption may be missed. If offset is submitted after consumption, data consumption may be repeated.

package com.mmc.springbootstudy.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @description:
 * @author: mmc
 * @create: 2021-04-22 21:32
 **/

public class ConsumerDemo {

    public final static String TOPIC = "mmc";

    /**
     * 自动提交offset
     * @throws InterruptedException
     */
    void receive() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","49.234.77.60:9092");
        props.put("group.id","group_id");
        props.put("enable.auto.commit","true");
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");
        props.put("max.poll.records",1000);
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer",StringDeserializer.class.getName());
        props.put("value.deserializer",StringDeserializer.class.getName());
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList(TOPIC));

        while (true){
            ConsumerRecords<String,String> msgList=consumer.poll(1000);
            for (ConsumerRecord<String,String> record:msgList){
                System.out.printf("offset = %d,key = %s,value = %s%n",record.offset(),record.key(),record.value());
            }
        }
    }


    /**
     * 手动同步提交
     * @throws InterruptedException
     */
    void commitSyncReceive() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","false");
        props.put("auto.commit.interval.ms",record.value());
            }
            //同步提交,当前线程会阻塞直到 offset 提交成功
            consumer.commitSync();
        }

    }

    /**
     * 手动异步提交
     * @throws InterruptedException
     */
    void commitAsyncReceive() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers",record.value());
            }
        
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition,OffsetAndMetadata> map,Exception e) {
                    if(e!=null){
                        System.err.println("commit Failed for "+map);
                    }
                }
            });
        }

    }




    public static void main(String[] args) throws InterruptedException {
//        new ConsumerDemo().receive();
        new ConsumerDemo().commitSyncReceive();
    }
}

6.3 user defined partition

Implement the partitioner interface and add it to the configuration

 props.put("partitioner.class","com.mmc.springbootstudy.kafka.MyPartition");

Custom partition implementation class:

package com.mmc.springbootstudy.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @description:
 * @author: mmc
 * @create: 2021-04-27 20:41
 **/

public class MyPartition implements Partitioner {
    @Override
    public int partition(String s,Object o,byte[] bytes,Object o1,byte[] bytes1,Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String,?> map) {

    }
}

Custom storage offset

consumer.subscribe(Arrays.asList(TOPIC),new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {

            }
        });

6.4 custom interceptors

package com.mmc.springbootstudy.kafka;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * @description:
 * @author: mmc
 * @create: 2021-04-30 20:47
 **/

public class CountIntercepter implements ProducerInterceptor<String,String> {

    private int successCount=0;

    private int failCount=0;

    @Override
    public ProducerRecord<String,String> onSend(ProducerRecord<String,String> producerRecord) {
        System.out.println("拦截到消息的分区:"+producerRecord.topic());
        return producerRecord;
    }

    @Override
    public void onAckNowledgement(RecordMetadata recordMetadata,Exception e) {
        if(e==null){
            successCount++;
        }else {
            failCount++;
        }
    }

    @Override
    public void close() {
        System.out.println("success count:"+successCount);
    }

    @Override
    public void configure(Map<String,?> map) {

    }
}

It needs to be added among producers

 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONfig,"com.mmc.springbootstudy.kafka.CountIntercepter");
      

7.1 Kafka Eagle monitoring

8、 Interview questions

A: in Kafka, replicas (including leaders) that keep a certain degree of synchronization with the leader replica constitute ISR. Replicas that lag too much with the leader constitute OSR. All replicas in the partition are generally called ar.

A: HW: high water level means that consumers can only pull the data before this offset

Leo: identifies the offset of the next message to be written in the current log file. The size is equal to the offset + 1 of the last message in the current log file

Producer: responsible for the sequential writing of messages to the leader copy

Consumer: a partition can only be consumed by one consumer in the same consumer group.

Kafka only guarantees the order of the same partition, so if you want to ensure the global order, you can customize the partition policy and send the associated messages to the same partition. Like each status of an order. 4. Structure of Kafka producer client

A: the whole producer client mainly has two threads, the main thread and the sender thread. Producer generates messages in the main thread, and then caches them in the message accumulator recordaccumulator through interceptors, serializers and partitions. The sender thread gets the message from the recordaccumulator and sends it to Kafka. The recordaccumulator is mainly used to cache messages so that they can be sent in batches to reduce the corresponding network transmission. The size of the recordaccumulator cache can be determined by configuring the parameter buffer Memory configuration, 32m by default. If the message creation speed is too fast, which exceeds the speed that the sender sends to the Kafka server, it will lead to insufficient cache space. At this time, the sender thread may block or throw an exception, max.block The MS configuration determines the maximum blocking time.

A double ended queue is maintained for each partition in the recordaccumulator. The contents of the queue are producerbatch, that is, deque. Create messages, write them to the tail, and send messages to read them from the head. Producerbatch is a batch of messages sent, which contains one or more producerrecords.

A: there are two types: rangeassignor allocation policy (range partition) and roundrobin assignor allocation policy (polling partition). Range range partition is adopted by default.

Range strategy: if there are 10 partitions and 3 consumers, calculate that one consumer consumes 3 partitions through 10 / 3 = 3. The extra partitions are consumed by the top consumers. Then consumer 1 consumes 0, 1, 2 and 3 partitions. Consumer 2 consumes 4, 5, 6 partitions. Consumer 3 consumes 7, 8, 9 partitions.

The disadvantage is that the previous consumer will consume more than one partition. If there are multiple topics, the consumer will consume more than one partition.

Randrobin strategy: for the same example, zone 0 is consumed by consumer 1, zone 1 is consumed by consumer 2, and zone 2 is consumed by consumer 3

Note: this strategy requires consumers in a group to subscribe to the same topic. In this way, the polling is uniform.

When the following situations occur, Kafka will perform a partition allocation operation, that is, the rebalance operation on the Kafka consumer side

A: brokercontroller: when the broker is started, the brokercontroller will be created. The first node that successfully creates the specified temporary node in zookeeper is brokercontroller. He is responsible for managing the online and offline of cluster brokers, partition replica allocation of all topics, leader election, etc.

Partition Leader:

A: it can be added but not reduced. Because if it is reduced, the existing data in the partition is difficult to handle.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>