news 2026/4/23 14:22:17

Kafka笔记

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka笔记

Apache Kafka 是一个强大的分布式流处理平台,适用于大规模数据处理和实时分析。它的高吞吐量、低延迟、可扩展性和容错性使其成为现代数据架构中的重要组件。无论是用于消息队列、日志聚合还是流式处理,Kafka 都提供了高效、可靠的解决方案。

一、核心特性

  1. 高吞吐量

    Kafka 能够处理高吞吐量的数据,支持每秒数百万条消息的读写,适用于大规模数据处理场景。
  2. 低延迟

    Kafka 的设计确保了低延迟的消息传递,通常在毫秒级别,适合对实时性要求较高的应用。
  3. 可扩展性

    Kafka 是一个分布式系统,可以轻松扩展到多个服务器,通过增加更多的 broker 来提高系统的处理能力。
  4. 持久化存储

    Kafka 将消息持久化存储在磁盘上,支持数据的可靠存储和故障恢复。
  5. 容错性

    Kafka 支持副本机制,确保数据的高可用性和容错性。即使部分节点故障,数据也不会丢失。
  6. 消息持久化和顺序保证

    Kafka 保证消息在分区内的顺序,并且可以配置消息的持久化策略,确保数据不会因为系统故障而丢失。

二、主要组件

  1. Broker

    Kafka 集群由多个 broker 组成,每个 broker 是一个 Kafka 服务器实例,负责存储和管理消息。
  2. Topic

    Topic 是 Kafka 中消息的分类,生产者将消息发送到特定的 topic,消费者从 topic 中读取消息。
  3. Partition

    为了提高可扩展性,每个 topic 可以被划分为多个分区(Partition),每个分区是一个有序的消息队列。
  4. Producer

    生产者是向 Kafka 发送消息的应用程序,负责将数据写入指定的 topic。
  5. Consumer

    消费者是从 Kafka 读取消息的应用程序,负责从 topic 中读取数据。
  6. Consumer Group

    消费者组是一组消费者实例,它们共同消费一个 topic 的消息,确保每个消息只被组内的一个消费者处理。

三、使用场景

  1. 消息队列

    Kafka 可以作为高性能的消息队列使用,支持高吞吐量的消息传递和复杂的消费模式。
  2. 日志聚合

    Kafka 常用于收集和聚合系统日志,将日志数据集中存储和分析。
  3. 流式处理

    Kafka 与流处理框架(如 Apache Flink、Apache Spark Streaming)集成,支持实时数据处理和分析。
  4. 事件源

    Kafka 可以作为事件源系统的核心组件,支持事件驱动的架构。
  5. 微服务通信

    Kafka 用于微服务之间的异步通信,支持服务间的解耦和高可用性。

四、架构

Kafka 的架构基于分布式系统设计,具有以下特点:

  1. 分布式存储

    消息分布在多个 broker 上,通过分区和副本机制提高系统的可扩展性和容错性。
  2. 高可用性

    Kafka 支持副本机制,确保数据的高可用性。即使部分 broker 故障,系统仍然可以正常运行。
  3. 水平扩展

    Kafka 集群可以通过增加更多的 broker 来水平扩展,提高系统的处理能力。
  4. 消息持久化

    Kafka 将消息持久化存储在磁盘上,支持数据的可靠存储和故障恢复。

五、单元测试

生产

KafkaproducerTest.java

@BeforeEach注解表示该方法在每个测试方法执行之前都会被调用

@AfterEach注解表示该方法在每个测试方法执行之后都会被调用。

//包声明 package org.javaup; //导包 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Properties; public class kafkaproducertest {//类声明 private KafkaProducer<String, String> producer; @BeforeEach public void setUp() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.128:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); } @AfterEach public void tearDown() { producer.close(); } @Test public void testSend() { String topic = "test-topic"; String key = "test-key"; String value = "你好吗,朋友"; producer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { if (exception == null) { System.out.println("Message sent successfully: " + metadata.topic() + " " + metadata.partition() + " " + metadata.offset()); } else { exception.printStackTrace(); } }); // 等待消息发送完成 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
  • 这个方法用于初始化 Kafka 生产者实例:

    • 创建一个Properties对象来存储 Kafka 生产者的配置。

    • 设置BOOTSTRAP_SERVERS_CONFIG,指定 Kafka broker 的地址。

    • 设置KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG,指定键和值的序列化器。

    • 使用这些配置创建一个KafkaProducer实例。

@Test public void testSend() { String topic = "test-topic"; String key = "test-key"; String value = "你好吗,朋友"; producer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { if (exception == null) { System.out.println("Message sent successfully: " + metadata.topic() + " " + metadata.partition() + " " + metadata.offset()); } else { exception.printStackTrace(); } }); // 等待消息发送完成 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
  • @Test注解表示这是一个测试方法。

  • 这个方法用于测试 Kafka 生产者发送消息的功能:

    • 定义要发送的消息的主题、键和值。

    • 使用producer.send方法发送消息,并提供一个回调函数来处理发送结果:

      • 如果消息发送成功,打印成功信息。

      • 如果发送失败,打印异常信息。

    • 使用Thread.sleep等待一段时间,确保消息发送完成

消费

KafkaconsumerTest.java

package org.javaup; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class kafkaconsumer { private KafkaConsumer<String, String> consumer; @BeforeEach public void setUp() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.128:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); } @AfterEach public void tearDown() { consumer.close(); } @Test public void testReceive() { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: " + record.value()); } } } }
结果:

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 13:12:53

localhost:7860无法访问?解决CosyVoice3 WebUI连接问题

localhost:7860无法访问&#xff1f;解决CosyVoice3 WebUI连接问题 在本地部署 AI 语音克隆系统时&#xff0c;你是否曾遇到过这样的场景&#xff1a;满怀期待地运行了 bash run.sh&#xff0c;终端显示服务已启动&#xff0c;但浏览器打开 http://localhost:7860 却一片空白&a…

作者头像 李华
网站建设 2026/4/17 12:46:53

Git commit规范提交CosyVoice3项目代码:团队协作最佳实践

Git Commit 规范在 CosyVoice3 项目中的实践&#xff1a;让协作更高效 你有没有遇到过这样的场景&#xff1f;翻看一个开源项目的提交历史&#xff0c;满屏都是“update”, “fix bug”, “add something”——这些模糊的 commit 信息就像一堆没有标签的抽屉&#xff0c;打开前…

作者头像 李华
网站建设 2026/4/23 10:57:41

Mac用户如何体验CosyVoice3?M系列芯片适配情况说明

Mac用户如何体验CosyVoice3&#xff1f;M系列芯片适配情况说明 在生成式AI浪潮席卷各行各业的今天&#xff0c;语音合成技术早已不再是机械朗读文字的“工具人”&#xff0c;而是朝着情感化、个性化和自然交互的方向快速演进。阿里近期开源的 CosyVoice3 正是这一趋势下的代表性…

作者头像 李华
网站建设 2026/4/23 9:45:17

火山引擎AI大模型 vs CosyVoice3:语音合成能力横向对比

火山引擎AI大模型 vs CosyVoice3&#xff1a;语音合成能力横向对比 在虚拟主播一夜爆红、有声书市场持续扩张的今天&#xff0c;语音合成已不再是“能读出来就行”的基础功能。用户期待的是带有情绪起伏的声音、地道的方言表达&#xff0c;甚至是亲人的声音复现——这背后&…

作者头像 李华
网站建设 2026/4/23 9:46:25

清华镜像站加速CosyVoice3依赖库下载:pip配置教程

清华镜像站加速CosyVoice3依赖库下载&#xff1a;pip配置教程 在AI语音合成技术迅速普及的今天&#xff0c;越来越多开发者开始尝试部署如 CosyVoice3 这类功能强大的开源项目。这款由阿里推出的语音克隆系统&#xff0c;仅需3秒音频即可复刻人声&#xff0c;并支持通过自然语…

作者头像 李华
网站建设 2026/4/23 9:48:03

无需编程基础也能上手:CosyVoice3 WebUI界面详细说明

无需编程基础也能上手&#xff1a;CosyVoice3 WebUI界面详细说明 在短视频、播客和虚拟人内容爆发的今天&#xff0c;个性化语音合成正从“技术炫技”走向“生产力工具”。然而&#xff0c;大多数语音克隆系统仍被代码门槛、复杂训练流程和高昂算力需求所束缚。直到阿里开源的 …

作者头像 李华