news 2026/4/22 21:19:24

Kafka入门:从初识到Spring Boot实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka入门:从初识到Spring Boot实战

回顾完RabbitMQ,再跟我一起回顾下Kafka ~

一、Kafka介绍

1. 什么是Kafka?

Kafka是由Apache软件基金会开发的分布式流处理平台,最初由LinkedIn公司设计,现已成为大数据领域核心的消息中间件。它能处理实时数据流,支持高吞吐、低延迟、可扩展的消息传递,广泛用于日志收集、实时分析、事件驱动架构等场景。

2. 核心特点

高吞吐:单机可支持百万级消息/秒,通过分区并行处理实现。

持久化:消息持久化到磁盘,支持TB级数据存储(默认保留7天)。

分布式:集群由多个Broker(服务器)组成,支持水平扩展。

多订阅者:一个Topic的消息可被多个消费者组独立消费(广播/负载均衡)。

二、Kafka架构与核心组件

1. 核心组件

组件 作用

Broker Kafka服务器节点,存储Topic数据,每个Broker有唯一ID(broker.id)。

Topic 消息的逻辑分类(如order-topic),类似“消息频道”,包含多个Partition。

Partition Topic的物理分片(有序日志文件),分布式存储的基本单位,每个Partition有Leader和Follower副本。

Producer 发送消息到Topic的客户端(如订单服务)。

Consumer 从Topic订阅消息的客户端(如库存服务)。

Consumer Group 消费者组,组内多个消费者负载均衡消费Partition,组间独立消费(广播)。

2. 架构图(Mermaid)

Kafka Cluster

发送消息

分区存储

分区存储

同步数据

负载均衡消费

协调

协调

协调

管理消费者组

Broker 1

broker.id=0

• TopicA-Partition0 Leader

• TopicB-Partition1 Follower

Broker 2

broker.id=1

• TopicA-Partition1 Leader

• TopicB-Partition0 Leader

Broker 3

broker.id=2

• TopicA-Partition0 Follower

ZooKeeper

集群协调

存储元数据

Producer

发送消息到Topic

Consumer Group

组内负载均衡消费

TopicA

• Partition0

• Partition1

三、消息流转完整路径(生产者→消费者)

1. 流转步骤

生产者发送消息:生产者指定Topic和Key(可选),通过分区器将消息分配到Partition(默认按Key哈希)。

Broker存储消息:Leader副本接收消息并写入磁盘(Segment文件),Follower副本同步数据。

消费者组分配Partition:消费者组启动时,协调者(Coordinator)将Topic的Partition分配给组内消费者(一个Partition仅被一个消费者消费)。

消费者拉取消息:消费者定期拉取(Poll)分配到的Partition消息,处理后提交偏移量(Offset)。

2. 消息流转图示(Mermaid)

Consumer (Group)

Broker (Follower)

Broker (Leader)

Producer

Consumer (Group)

Broker (Follower)

Broker (Leader)

Producer

1. 生产者发送消息

2. 消费者拉取消息

发送消息到Topic-Partition0 (Key: order-1)

写入本地日志 (LEO=100)

同步消息 (LEO=100)

确认同步 (LEO=100)

返回ACK (消息提交成功)

Poll请求 (获取Partition0消息)

返回消息 (Offset=99, Value=订单数据)

处理消息 (扣减库存)

提交偏移量 (Offset=100)

四、Kafka安装(ZooKeeper传统模式,CentOS 7)

1. 环境准备

CentOS 7系统,关闭防火墙(或开放端口2181、9092):

systemctl stop firewalld && systemctl disable firewalld

安装JDK 8+:

yum install java-1.8.0-openjdk-devel -y

2. 安装ZooKeeper(Kafka依赖)

步骤1:下载并解压

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz

tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/

mv /opt/apache-zookeeper-3.7.1-bin /opt/zookeeper

步骤2:配置ZooKeeper

cd /opt/zookeeper/conf

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg # 修改以下配置

dataDir=/var/lib/zookeeper # 数据存储目录

clientPort=2181 # 客户端端口

步骤3:启动ZooKeeper

mkdir -p /var/lib/zookeeper

/opt/zookeeper/bin/zkServer.sh start # 启动

/opt/zookeeper/bin/zkServer.sh status # 查看状态(显示Mode: standalone为成功)

3. 安装Kafka Broker

步骤1:下载并解压

wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz

tar -zxvf kafka_2.13-3.6.0.tgz -C /opt/

mv /opt/kafka_2.13-3.6.0 /opt/kafka

步骤2:配置Kafka

cd /opt/kafka/config

vim server.properties # 修改以下配置

# 核心配置

broker.id=0 # 当前Broker唯一ID(集群中不可重复)

listeners=PLAINTEXT://localhost:9092 # 监听地址(本地测试用localhost)

log.dirs=/var/lib/kafka/logs # 消息存储目录

zookeeper.connect=localhost:2181/kafka # 连接ZooKeeper(/kafka为根节点)

步骤3:启动Kafka

mkdir -p /var/lib/kafka/logs

/opt/kafka/bin/kafka-server-start.sh -daemon config/server.properties # 后台启动

jps # 查看进程(显示Kafka为成功)

4. 创建Topic(测试用)

/opt/kafka/bin/kafka-topics.sh --create \

--topic order-topic \ # 主题名称

--bootstrap-server localhost:9092 \ # Kafka地址

--partitions 3 \ # 分区数(建议≥3)

--replication-factor 1 # 副本数(单节点只能设1)

五、Spring Boot保姆级案例(生产者+消费者)

1. 项目结构

src/main/java/com/example/kafkademo/

├── KafkaDemoApplication.java # 启动类

├── model/Order.java # 订单实体类

├── producer/OrderProducer.java # 生产者服务

├── consumer/OrderConsumer.java # 消费者服务

└── controller/OrderController.java # 测试接口

src/main/resources/

└── application.yml # 配置文件

2. pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.7.15</version> <!-- Spring Boot 2.7.x稳定版 -->

<relativePath/>

</parent>

<groupId>com.example</groupId>

<artifactId>kafka-demo</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>kafka-demo</name>

<dependencies>

<!-- Web依赖(提供HTTP接口) -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<!-- Kafka依赖 -->

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

<!-- Lombok(简化实体类) -->

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<optional>true</optional>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

</project>

3. application.yml配置

server:

port: 8080 # 应用端口

spring:

application:

name: kafka-demo # 应用名称

kafka:

bootstrap-servers: localhost:9092 # Kafka集群地址(多个用逗号分隔)

# 生产者配置

producer:

key-serializer: org.apache.kafka.common.serialization.StringSerializer # Key序列化器(字符串)

value-serializer: org.springframework.kafka.support.serialization.JsonSerializer # Value序列化器(JSON)

acks: all # 消息确认级别:all=所有ISR副本确认(最高可靠性)

retries: 3 # 发送失败重试次数

enable-idempotence: true # 启用幂等性(防重复消息)

# 消费者配置

consumer:

group-id: order-group # 消费者组ID(同一组内负载均衡)

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Key反序列化器

value-deserializer: org.springframework.kafka.support.serialization.JsonDeserializer # Value反序列化器

auto-offset-reset: earliest # 无偏移量时策略:earliest=从头消费

enable-auto-commit: false # 关闭自动提交偏移量(手动控制)

properties:

spring.json.trusted.packages: "com.example.kafkademo.model" # 信任的实体类包(JSON反序列化用)

# 监听器配置(消费者)

listener:

ack-mode: manual_immediate # 手动立即提交偏移量(处理完一条提交一条)

concurrency: 3 # 并发消费者数(建议=Topic分区数,此处3分区)

4. 实体类(Order.java)

package com.example.kafkademo.model;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import java.math.BigDecimal;

/**

* 订单实体类(消息载体)

*/

@Data // Lombok注解:自动生成getter/setter/toString等

@NoArgsConstructor // 无参构造

@AllArgsConstructor // 全参构造

public class Order {

private String orderId; // 订单ID

private String productName; // 商品名称

private BigDecimal amount; // 订单金额

private String status; // 订单状态(CREATED/PAID/SHIPPED)

}

5. 生产者服务(OrderProducer.java)

package com.example.kafkademo.producer;

import com.example.kafkademo.model.Order;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Service;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

import java.math.BigDecimal;

import java.util.UUID;

/**

* 订单生产者服务:发送订单消息到Kafka

*/

@Service // 标记为Spring服务组件

@Slf4j // Lombok日志注解

public class OrderProducer {

// 注入KafkaTemplate(Spring Boot自动配置,用于发送消息)

@Autowired

private KafkaTemplate<String, Order> kafkaTemplate;

private static final String TOPIC_NAME = "order-topic"; // 目标Topic名称(需与消费者一致)

/**

* 发送订单消息

* @param order 订单对象(若为null则自动生成测试订单)

*/

public void sendOrder(Order order) {

// 1. 若订单ID为空,生成UUID作为订单ID

if (order == null) {

order = new Order();

order.setOrderId(UUID.randomUUID().toString()); // 随机生成订单ID

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 16:11:36

智能体:未来社会的核心竞争力

引言&#xff1a;为何必须学习智能体&#xff1f;在2025年的今天&#xff0c;人工智能已从概念走向现实&#xff0c;而智能体&#xff08;Agent&#xff09; 作为能够自主感知环境、决策并执行任务的AI系统&#xff0c;正成为驱动社会变革的核心力量。学习智能体&#xff0c;已…

作者头像 李华
网站建设 2026/4/23 11:37:21

这个CNN-LSTM融合模型真是我最近调试得最顺手的分类工具了。咱们直接上代码,先从数据预处理开始看

CNN-LSTM 分类&#xff0c;基于卷积神经网络(CNN)-长短期记忆神经网络(LSTM)数据分类预测 MATLAB(2020及以上版本以上)语言 中文注释清楚 非常适合科研小白&#xff0c;程序已经调试好&#xff0c;替换数据就可以直接使用 多特征输入单输出的二分类及多分类模型。 预测结果图…

作者头像 李华
网站建设 2026/4/23 12:59:26

开源社区治理终极指南:构建高效协作的完整方案

在当今开源生态中&#xff0c;Champ项目通过其3D参数化人体动画技术展示了技术创新与社区治理的完美结合。开源社区治理不仅是代码协作的框架&#xff0c;更是项目可持续发展的核心引擎。本文将为新手开发者和项目维护者提供一套完整的社区治理构建方案&#xff0c;帮助您从零开…

作者头像 李华
网站建设 2026/4/23 13:19:51

MakeMeAHanzi完整指南:免费获取9000+汉字笔画数据

MakeMeAHanzi完整指南&#xff1a;免费获取9000汉字笔画数据 【免费下载链接】makemeahanzi Free, open-source Chinese character data 项目地址: https://gitcode.com/gh_mirrors/ma/makemeahanzi 想要学习汉字书写却苦于没有标准笔画顺序参考&#xff1f;MakeMeAHanz…

作者头像 李华
网站建设 2026/4/23 11:34:55

狂飙突进的新能源车,需要一场人车信任的重建

真理总是越辩越明。作者 I 王彬封面 I F1&#xff1a;狂飙飞车当前的新能源车市场似乎陷入了一个怪圈。一方面&#xff0c;技术迭代持续提速&#xff0c;智能化成为今年车市主流&#xff0c;年初比亚迪就喊出了“智驾平权”的口号&#xff0c;年底高阶智能辅助驾驶已经杀入 15 …

作者头像 李华
网站建设 2026/4/23 16:13:36

Edge-TTS终极指南:免费文本转语音的完美解决方案

Edge-TTS终极指南&#xff1a;免费文本转语音的完美解决方案 【免费下载链接】edge-tts Use Microsoft Edges online text-to-speech service from Python WITHOUT needing Microsoft Edge or Windows or an API key 项目地址: https://gitcode.com/GitHub_Trending/ed/edge-…

作者头像 李华