Current location - Trademark Inquiry Complete Network - Futures platform - Controller and coordinator of Kafka core components
Controller and coordinator of Kafka core components
[Contents]

We already know that kafka's cluster consists of n brokers, and each broker is an instance of kafka or a service called Kafka. In fact, the controller is also a broker, and the controller is also called the leading broker.

In addition to the functions of general broker, he is also responsible for the selection of partition leaders, that is, for electing the copy of partition leaders.

When each agent in kafka is started, a KafkaController will be instantiated and the agent's id will be registered with zookeeper. In the process of cluster startup, one of the agents will be elected as the leader through the election mechanism, that is, the controller mentioned above.

Including cluster startup, there are three situations that can trigger controller election:

1, cluster startup

2. The agent where the controller is located has failed.

3.zookeeper knows that heartbeat, the session between the controller and itself has expired.

As usual, look at the picture first. We will explain the controller election process when the cluster is started according to the following figure.

Suppose this cluster has three agents, which are started at the same time.

(1) Three agents get the controller temporary node information from zookeeper. /controller stores the selected leader information. This is to confirm whether there is already a leader.

(2) If the leader has not been elected, then this node does not exist, and-1 is returned. If the json data of the leader is returned instead of-1, then the leader already exists and the election is over.

(3) The three broker found that they returned-1 and learned that there is no leader at present, so they all triggered to write their own information into the temporary node/controller. The first person to write will become the leader.

(4) Assuming that broker 0 is the fastest, he writes the /controller node first, and then he becomes the leader. Unfortunately, for broker 1 and broker2, they will throw out ZkNodeExistsException in the process of writing /controller, which means zk tells them that this node already exists.

After the above four steps, broker 0 successfully wrote to the /controller node, while other brokers failed to write, so broker 0 was successfully elected as the leader.

In addition, there is a controller_epoch node in zk, which stores the number of changes of the leader, with an initial value of 0. Every time the leader changes, this value will be+1. All requests to the controller will carry this value. If the controller compares with its own memory, the request value is small, indicating that a new election has taken place in kafka cluster, and this request is invalid after it expires. If the request value is greater than the controller memory value, it means that a new controller has been elected and he has abdicated, and the request is invalid. Kafka ensures the uniqueness of cluster controller and the consistency of operation through controller_epoch.

Therefore, the election of Kafka controller is to see who gets the /controller node to write its own information first.

The initialization of the controller is actually to initialize the components and listeners used by the controller and prepare metadata.

As mentioned earlier, each agent will instantiate and start a KafkaController. KafkaController and its component relationships, as well as the introduction of each component, are as follows:

The arrow in the figure indicates the hierarchical relationship of components, and other components will be initialized below them. It can be seen that the controller is still somewhat complicated, mainly including the following components:

1, ControllerContext, this object stores all the context information needed by the controller, including the surviving agents, all the topics and partition allocation schemes, AR, leader, ISR and other information of each partition.

2, a series of listeners, through the monitoring of zookeeper, trigger the corresponding operation, and the yellow boxes are all listeners.

3. Partition and replica state machine, which manages partitions and replicas.

4. The current proxy elector ZookeeperLeaderElector has related callback methods for high and low positions.

5.PartitionLeaderSelector

6. Theme deletion manager, TopicDeletetionManager

7.ControllerBrokerRequestBatch is used for batch communication between leaders and agents. Cache the requests generated by the state machine after processing, and then issue them uniformly.

8. The KafkaScheduler for controller balancing operation is only valid when the broker is the leader.

Some important information of Kafka cluster is recorded in ZK, such as all proxy nodes of the cluster, all partitions of the theme, and replica information of partitions (replica set, master replica, synchronous replica set). Each agent has a controller. In order to manage the whole cluster, Kafka chooses zk election mode to elect a "central controller" or "master controller" for the whole cluster. The controller is actually a proxy node. Besides the general proxy function, it also has the function of electing partition leaders. The central controller manages the information of all nodes, and manages the election and rebalancing of the leaders of the whole cluster nodes and partitions by registering various monitoring events with ZK. External events will update the data of ZK. Once the data of ZK changes, the controller will do different response processing.

Failover is actually that the agent where the leader is located fails and the leader moves to another agent. The process of transfer is the process of re-electing leaders.

After the leader is re-elected, he needs to register the corresponding authority for the agent and call the onControllerFailover () method of ZookeeperLeaderElector. In this method, a series of components are initialized and begin to complete various operations of the leader. The details are as follows, in fact, it is very similar to controller initialization.

1. Register relevant listeners for partition management.

2. Relevant monitoring of registration theme management.

3. Register the agent to change the listener

4. Reinitialize the controller context.

5. Start the ControllerChannelManager for communication between the controller and other agents.

6. Create the TopicDeletionManager object for deleting the topic and start it.

7. Start the partition state machine and the replica state machine.

8. Poll each topic and add PartitionModificationsListener to monitor the change of partition.

9. If the partition balance timing operation is set, a partition balance timing task will be created, which will be checked and executed within 300 seconds by default.

In addition to starting these components, the following operations have been completed in the onControllerFailover method:

1, /controller_epoch value+1, and updated to ControllerContext.

2, check whether to start the partition redistribution, and do related operations.

3. Check the priority copy need to choose as the leader, and do related operations.

4. Send a request to update metadata to all agents in kafka cluster.

Let's take a look at the method onControllerResignation called when canceling the leader's authority.

1. In this method, the authority of the controller is cancelled. Unmonitoring partitions and replica-aware listeners in zookeeper.

2. Shut down the started components.

3. Finally, clear the numerical value of the controller version recorded in the ControllerContext, and set the current proxy as RunnignAsBroker to become an ordinary proxy.

By learning the starting process of the controller, we should have understood the working principle of Kafka. The core is to monitor the related nodes of zookeeper and trigger corresponding operations when the nodes change.

When a new agent joins the cluster, it is said that the agent is online. On the contrary, when the agent is closed and the cluster is pushed out, it is called that the agent is offline.

Online agent:

1. Write data to /brokers/ids when the new agent is started.

2.BrokerChangeListener listens for changes. Call ControllerChannelManager Add Broker () to the new online node to complete the initialization of the new online proxy network layer.

3. call KafkaController. Onbrokerstartup () for processing.

3.5 Resume the thread of deleting the theme operation that was suspended because the new agent went online.

Agent offline:

1. Found offline node set.

2. Poll the offline node and call ControllerChannelManager. RemoveBroker () closes the network connection of each offline node. Empty the message queue of the downline node and close the downline node request.

3. Poll offline nodes and call Kafka controller. Onbrokerfailure for processing.

4. Send updateMetadataRequest to all surviving agents of the cluster.

As the name implies, the coordinator is responsible for coordinating the work. The coordinator mentioned in this section is used to coordinate the work distribution of consumers. Simply put, it is the initialization work at this stage after the consumer starts and before normal consumption. The normal operation of consumers depends entirely on the coordinator.

There are two main coordinators:

1. Consumer Coordinator

2. Team Coordinator

Kafka's introduction of coordinator has its historical process. It turns out that consumer information depends on the storage of zookeeper. When agents or consumers change, it will lead to consumer balance. At this time, consumers are opaque, and each consumer communicates with zookeeper alone, which is easy to cause herding and brain division problems.

In order to solve these problems, Kafka introduced the coordinator. The server introduces a GroupCoordinator and the consumer introduces a ConsumerCoordinator. When each agent is started, a GroupCoordinator instance is created to manage some consumer groups (cluster load balancing) and the offset of each consumer's consumption under this group. When each consumer is instantiated, a ConsumerCoordinator object is instantiated at the same time, which is responsible for the communication between each consumer and the server group coordinator under the same consumer group. As shown in the figure below:

Consumer coordinator can be regarded as a proxy class for consumers to do operations (it is not), and many operations of consumers are handled through consumer coordinator.

The consumer coordinator is mainly responsible for the following work:

1. Update metadata cached by consumers.

2. Apply to the group coordinator to join the group.

3. The corresponding treatment of consumers after joining the group.

4. Request to leave the consumer group

5. Submit compensation to the Group Coordinator.

6. Keep the connection awareness of the group coordinator through heartbeat.

7. The consumer coordinator elected as the team leader by the group coordinator is responsible for the distribution of consumer areas. The allocation result is sent to the group coordinator.

8. For consumers who are not leaders, the results are distributed synchronously through the consumer coordinator and the group coordinator.

The components and descriptions that the consumer coordinator mainly relies on are shown in the following figure:

It can be seen that these components can be compared with the work undertaken by the consumer coordinator.

The group coordinator is responsible for handling various requests sent by the consumer coordinator. It mainly provides the following functions:

Group coordinators are instantiated when broker is started, and each group coordinator is responsible for the management of some consumer groups. It mainly depends on the following components:

These components may also correspond to the functions of the group coordinator. The specific content is not detailed.

The following figure shows the process that consumers begin to choose leaders and join groups.

The process of consumers joining a group shows how consumer coordinators and group coordinators work together. Leading consumers will undertake the work of regional distribution, so the pressure of Kafka cluster will be much less. Consumers in the same group keep synchronization through the group coordinator. The correspondence between consumers and partitions is persistent in Kafka.

When consumers consume, they will maintain the offset of consumption locally, that is, the offset, so that they will know where to start spending next time. If the whole environment hasn't changed, that's enough. But once the consumer balance operation or partition changes, consumers no longer correspond to the original partition, and the offset of each consumer will not be synchronized to the server, so it is impossible to continue the previous work.

Therefore, only the consumption offset will be sent to the server regularly and managed centrally by the GroupCoordinator. After the partition is redistributed, each consumer reads the offset of its corresponding partition from the GroupCoordinator and continues his predecessor's work in the new partition.

The following figure shows the problem that the offset is not submitted to the server:

At the beginning, consumer 0 consumed partition 0, 1. Later, due to the new consumer 2, the partition was redistributed. Consumer 0 no longer consumes partition 2, but consumer 2 consumes partition 2, but because consumers can't communicate with each other, all consumers 2 don't know where to start spending.

Therefore, consumers need to submit their own consumption offset to the server regularly, so that after the re-partition operation, each consumer can find the offset allocated to his partition on the server and continue to consume.

Because kafka has the characteristics of high availability and horizontal expansion, when new partitions appear or new consumers enter the group, they need to redistribute the partitions corresponding to consumers, so if there is a problem with the submission of offset, they will repeat consumption or lose messages. Pay special attention to the timing and method of submitting offset! !

1. Automatically submit offset.

Set enable.auto.commit to true, set the period, and the default value is 5s. Every time the consumer calls the poll () method of the polling message, it will check whether it exceeds the uncommitted offset of 5s, and if it is, it will submit the offset returned from the last polling.

This is very convenient, but it will bring the problem of repeated consumption. If rebalancing is triggered after the latest submission of 3s offset, and the last submitted offset is still stored in the server, after rebalancing, new consumers will pull messages from the last submitted offset, and messages consumed within 3s will be repeatedly consumed.

2. Submit the offset manually

Set enable.auto.commit to false. Manually call CommitSync () in the program to commit the offset. At this point, submit the latest offset returned by the poll method.

CommitSync () is the synchronous submission offset, and the main program will block it until the offset is submitted successfully. This will limit the throughput of the program. If you reduce the frequency of submission, it is easy to repeat consumption.

Here we can commit the offset asynchronously using commitAsync (). Just submit, without waiting for the broker to return the submission results.

As long as there are no unrecoverable errors, CommitSync will try again until it succeeds. CommitAsync will not retry, and failure is failure. CommitAsync will not try again, because when it tries to commit again, other larger offsets may have been successfully committed. If a successful submission is attempted at this time, a smaller offset will overwrite a larger offset. Then, if rebalancing occurs at this time, new consumers will repeat the consumption information.