ActiveMQ 全套自学教程:从入门到实践
📌 前言:什么是消息队列?
在正式学习 ActiveMQ 之前,我们先理解一个生活场景:你点了一份外卖,餐厅不需要等你本人过来取餐,而是通过外卖骑手将餐品送到你手中。在这个过程中,订单系统(生产者)->外卖平台(消息队列)->外卖骑手(消费者),三者被解耦开来,各自独立运行。
这就是消息队列的核心思想:异步通信、系统解耦、流量削峰。
随着分布式架构的普及,各系统间需要在不同节点之间高效传递消息,同时应对流量洪峰进行削峰填谷。消息中间件正是为了解决这些问题而诞生的。
ActiveMQ 就是诸多消息中间件中的经典代表,市面上常见的还有 RabbitMQ、Kafka、RocketMQ 等。
第一章:ActiveMQ 概述
1.1 什么是 ActiveMQ?
ActiveMQ是 Apache 软件基金会出品的一个开源消息中间件,采用纯 Java 语言编写,完全遵循JMS(Java Message Service)1.1 规范。它诞生于分布式系统蓬勃发展的时期,专门用来解决各个系统组件之间的通信与协作问题。
简单来说,ActiveMQ 就像一个 "消息邮局":发送方把消息投递到邮局,接收方到邮局去取,发件人和收件人不需要同时在场,极大地提高了系统的灵活性和可靠性。
只要操作系统支持 Java 虚拟机,ActiveMQ 就可以稳定运行,具有出色的跨平台能力。
1.2 历史地位与现状
2004年:ActiveMQ 首次发布,基于 JMS 规范
2007年:被 Apache 软件基金会纳入顶级项目
2010年代:成为企业级应用 JMS 实现的首选
2020年至今:虽然 Kafka 等新一代消息队列崛起,ActiveMQ 在高吞吐场景有所退让,但在企业级稳定场景中仍占据重要地位
正如一句总结所说:ActiveMQ 是 "企业级消息传递的稳健选择",Kafka 是 "大数据实时处理的性能之王"。
1.3 核心特性一览
| 特性 | 说明 |
|---|---|
| 多协议支持 | 兼容 JMS 1.1/2.0、STOMP、AMQP、MQTT、OpenWire 等协议 |
| 多种消息模型 | 支持点对点(Queue)和发布/订阅(Topic)两种模式 |
| 消息持久化 | 支持 KahaDB、JDBC 等多种持久化方式,防止消息丢失 |
| 高可用性 | 支持主从架构(Master-Slave)和网络集群(Network of Brokers) |
| 事务支持 | 支持 JMS 事务,保证消息的原子性发送与接收 |
| Web 控制台 | 提供直观的 Web 管理界面,方便监控和运维 |
| 跨平台 | 纯 Java 实现,支持 Windows、Linux、macOS 等 |
第二章:JMS 规范基础(必学核心)
在学习 ActiveMQ 之前,必须理解它的根基——JMS 规范。
2.1 什么是 JMS?
JMS(Java Message Service)是 Java 平台中关于面向消息中间件(MOM)的一套 API 标准。它定义了 Java 应用程序之间进行异步消息通信的统一接口。
可以用一个类比来理解:JMS 就好比是JDBC 接口,而 ActiveMQ 就是MySQL 驱动实现。JDBC 提供了统一的数据库访问方式,JMS 提供了统一的消息收发方式。不同的厂商只要实现了这套规范,就能让应用程序流畅切换。
2.2 JMS 核心对象模型
JMS 定义了几个关键对象,它们是编写任何 ActiveMQ 程序的基础:
text
ConnectionFactory(连接工厂) ↓ 创建 Connection(连接) ↓ 创建 Session(会话) ↓ 创建 Producer/Consumer(生产者/消费者) ←→ Destination(目的地,即 Queue 或 Topic) ↓ 发送/接收 Message(消息)
各对象详解:
| 对象 | 作用 | 通俗理解 |
|---|---|---|
| ConnectionFactory | 创建连接的工厂 | 就像 "邮政总局",负责建立通信链路 |
| Connection | 客户端与服务器之间的活动连接 | 就像是 "电话线",接通后才能通话 |
| Session | 生产和消费消息的单线程上下文 | 就像是 "一次会话",所有的收发都在此进行 |
| Destination | 消息的目标地址(Queue/Topic) | 就像是 "收件地址",决定消息去哪 |
| Producer | 由 Session 创建,用于发送消息 | "发件人" |
| Consumer | 由 Session 创建,用于接收消息 | "收件人" |
| Message | 实际传输的数据载体 | "信件本身" |
Session 是特别重要的概念:它是一个单线程上下文,不仅用来创建生产者和消费者,还提供了事务性支持——可以把一组消息的发送和接收合并为一个原子操作。
2.3 消息结构
JMS 消息由三部分组成:
| 组成部分 | 说明 |
|---|---|
| 消息头(Header) | 包含路由信息、优先级、过期时间等,由 JMS 提供者或发送者自动设置 |
| 消息属性(Property) | 由发送者自定义的附加信息,用于消息过滤等场景 |
| 消息体(Body) | 实际传输的数据,JMS 定义了 5 种类型 |
五种消息体类型:
| 类型 | 说明 |
|---|---|
| TextMessage | 文本消息,最常用,如 JSON 字符串 |
| ObjectMessage | 序列化的 Java 对象 |
| BytesMessage | 二进制字节消息 |
| MapMessage | 键值对消息 |
| StreamMessage | Java 流数据消息 |
第三章:ActiveMQ 安装与环境搭建
3.1 环境要求
JDK 8 或更高版本(ActiveMQ 5.x 推荐 JDK 8)
2GB 以上内存
3.2 Windows 系统安装
Step 1:下载
访问 ActiveMQ 官网 http://activemq.apache.org/components/classic/download/,下载推荐版本(如 5.18.3),文件名类似apache-activemq-5.18.3-bin.zip。
Step 2:解压
解压到指定目录,例如D:\activemq\apache-activemq-5.18.3。
Step 3:启动
进入bin\win64目录(64位系统),双击activemq.bat脚本。
3.3 Linux 系统安装
bash
# 1. 安装 JDK(以 Ubuntu 为例) sudo apt update sudo apt install openjdk-11-jdk java -version # 2. 下载 ActiveMQ wget https://archive.apache.org/dist/activemq/5.18.3/apache-activemq-5.18.3-bin.tar.gz # 3. 解压到 /opt 目录 sudo tar -xzf apache-activemq-5.18.3-bin.tar.gz -C /opt # 4. 进入目录并启动 cd /opt/apache-activemq-5.18.3 bin/activemq start # 5. 查看日志确认启动 tail -f data/activemq.log # 6. 停止服务 bin/activemq stop
3.4 验证安装
打开浏览器,访问:http://localhost:8161/admin/
默认用户名和密码都是admin / admin。
成功登录后,你会看到 ActiveMQ 的 Web 管理界面,可以在此监控和管理Queues(队列)和Topics(主题)。
3.5 端口说明
| 端口 | 用途 |
|---|---|
| 61616 | JMS 连接端口(应用程序连接使用),支持 TCP/OpenWire 协议 |
| 8161 | Web 管理控制台端口 |
如果需要修改端口,可以在conf/activemq.xml中修改 61616 端口,在conf/jetty.xml中修改 8161 端口。
💡 小贴士:启动如果遇到端口冲突,关闭其他占用端口的程序,或在配置文件中修改默认端口即可。
第四章:核心概念——Queue 与 Topic
ActiveMQ 支持两种核心的消息模型,理解它们的区别是正确使用 ActiveMQ 的关键。
4.1 点对点模型(P2P / Queue)
核心思想:消息生产者将消息发送到队列(Queue)中,每条消息只能被一个消费者接收并消费。即使队列有多个消费者在监听,消息也不会被重复消费。
特点:
🎯 一对一投递,消息被一个消费者消费后,队列中不再保留
⏳ 生产者和消费者没有时间依赖,消费者随时可以来取积压的消息
⚖️ 多消费者时可实现负载均衡——多个消费者轮询处理队列中的消息
💾 消息会持久保存在队列中,直到被消费或过期
类比:就像取快递,每个包裹只能被一个人取走。
4.2 发布/订阅模型(Pub/Sub / Topic)
核心思想:消息生产者(发布者)将消息发送到主题(Topic)中,所有订阅该主题的消费者都会收到这个消息。
特点:
📢 一对多广播,消息会被所有订阅者接收
⚠️ 有时间依赖性:消费者必须在生产者发送消息之前完成订阅,否则收不到已发布的消息(除非使用持久订阅)
📡 适用于消息广播场景,如系统通知、配置更新等
类比:就像订阅微信公众号,关注之后你才能收到推送的文章。
4.3 Queue vs Topic 对比总结
| 对比维度 | Queue(队列) | Topic(主题) |
|---|---|---|
| 消息消费方式 | 1对1,一条消息一个消费者 | 1对多,一条消息所有订阅者 |
| 消息存储 | 消息会持久保存直到被消费 | 默认不持久化,只在内存中 |
| 消费者时机 | 随时上线都能收到积压消息 | 必须在消息发送前订阅 |
| 典型场景 | 下单处理、任务分发 | 系统通知、实时推送 |
| 负载均衡 | ✅ 天然支持 | ❌ 每条消息所有订阅者都收 |
⚠️重要提醒:在发布/订阅模式中,如果消费者在消息发布后才启动,它将收不到之前的消息。需要保证消息不丢失时,请使用持久订阅(Durable Subscriber)。
第五章:第一个 Java 程序——收发消息
5.1 环境准备(Maven 项目)
在创建 Java 项目之前,先在pom.xml中添加 ActiveMQ 客户端依赖:
xml
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.18.3</version> </dependency>
5.2 编写消息发送者(Producer)
java
import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class MyProducer { public static void main(String[] args) throws JMSException { // 1. 创建连接工厂(指定 ActiveMQ 服务器地址) ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 2. 创建连接并启动 Connection connection = factory.createConnection(); connection.start(); // 3. 创建会话(参数:是否事务,确认模式) Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE); // 4. 创建目的地(队列) Queue queue = session.createQueue("MyFirstQueue"); // 5. 创建生产者并发送消息 MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("Hello, ActiveMQ!"); producer.send(message); System.out.println("消息发送成功!"); // 6. 关闭资源 session.close(); connection.close(); } }代码关键步骤说明:
createConnectionFactory:配置 ActiveMQ 服务器地址(默认端口 61616)createSession(false, Session.AUTO_ACKNOWLEDGE):非事务模式 + 自动确认createQueue("MyFirstQueue"):创建名为 MyFirstQueue 的队列send(message):将消息发送到 Broker
5.3 编写消息接收者(Consumer)
java
import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class MyConsumer { public static void main(String[] args) throws JMSException { // 1-3 步与生产者相同 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE); // 4. 创建目的地(与生产者使用同一个队列名) Queue queue = session.createQueue("MyFirstQueue"); // 5. 创建消费者 MessageConsumer consumer = session.createConsumer(queue); // 6. 接收消息(同步阻塞方式) TextMessage message = (TextMessage) consumer.receive(); System.out.println("收到消息: " + message.getText()); // 7. 关闭资源 session.close(); connection.close(); } }5.4 发布/订阅模式示例
只需将上面代码中的Queue替换为Topic:
java
// 生产者 Topic topic = session.createTopic("MyFirstTopic"); MessageProducer producer = session.createProducer(topic); producer.send(message); // 消费者 Topic topic = session.createTopic("MyFirstTopic"); MessageConsumer consumer = session.createConsumer(topic);⚠️注意:Topic 模式下,消费者必须先启动并完成订阅,然后生产者发送消息,消费者才能收到。
第六章:Spring Boot 集成 ActiveMQ
在实际企业开发中,绝大多数项目都会使用 Spring Boot 框架。Spring Boot 对 ActiveMQ 提供了优秀的自动配置支持,可以大大简化开发工作。
6.1 添加 Maven 依赖
xml
<!-- ActiveMQ Starter(核心依赖) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 连接池支持(推荐,提升性能) --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
6.2 配置文件(application.yml)
yaml
spring: activemq: broker-url: tcp://localhost:61616 # ActiveMQ 服务器地址 user: admin # 用户名 password: admin # 密码 pool: enabled: true # 启用连接池 max-connections: 10 # 最大连接数 jms: pub-sub-domain: false # false=Queue模式, true=Topic模式(全局配置)
如果你的项目中既有 Queue 又有 Topic,建议设为false,然后通过代码层面分别配置。
6.3 配置 Bean(Queue 和 Topic)
java
@Configuration public class ActiveMQConfig { @Bean public Queue queue() { return new ActiveMQQueue("springboot.queue"); } @Bean public Topic topic() { return new ActiveMQTopic("springboot.topic"); } }6.4 消息生产者(使用 JmsMessagingTemplate)
JmsMessagingTemplate是 Spring 提供的消息发送工具类,能大幅简化代码。
java
@RestController @RequestMapping("/message") public class MessageController { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; // 发送队列消息 @GetMapping("/sendQueue") public String sendQueueMsg(@RequestParam String msg) { jmsMessagingTemplate.convertAndSend(queue, msg); return "队列消息发送成功: " + msg; } // 发送主题消息 @GetMapping("/sendTopic") public String sendTopicMsg(@RequestParam String msg) { jmsMessagingTemplate.convertAndSend(topic, msg); return "主题消息发送成功: " + msg; } }6.5 消息消费者(使用 @JmsListener)
java
@Component public class MessageListener { // 监听队列消息 @JmsListener(destination = "springboot.queue") public void receiveQueueMsg(String message) { System.out.println("收到队列消息: " + message); // 这里写你的业务处理逻辑 } // 监听主题消息(需要在配置中启用 pub-sub) @JmsListener(destination = "springboot.topic", containerFactory = "topicListenerContainerFactory") public void receiveTopicMsg(String message) { System.out.println("收到主题消息: " + message); } }6.6 典型应用场景
ActiveMQ 在企业应用中的三个经典场景:
| 场景 | 说明 | 示例 |
|---|---|---|
| 异步任务处理 | 解耦耗时操作 | 用户注册后发送邮件、短信验证码 |
| 系统间通信 | 跨服务数据同步 | 订单系统通知库存系统扣减库存 |
| 流量削峰 | 缓冲高并发请求 | 秒杀活动时将请求放入队列,排队处理 |
第七章:消息持久化
ActiveMQ 的消息默认是持久化的,即消息会存入磁盘,即便 Broker 崩溃重启也不会丢失。理解消息持久化,是确保系统可靠性的关键。
7.1 消息的可靠性级别
| 级别 | 说明 | 直接成本 | 性能 |
|---|---|---|---|
| 持久性消息(PERSISTENT) | ActiveMQ 默认模式,保证消息"只传送一次,成功使用一次"。可靠性是绝对优先的 | 持久化存储 I/O | 较低 |
| 非持久性消息(NON_PERSISTENT) | 最多传送一次,服务器重启后消息会丢失 | 减少存储开销 | 较高 |
7.2 三种持久化方式
① KahaDB(默认,强烈推荐)
从 ActiveMQ 5.4 开始引入,基于文件的高性能日志存储,专门为消息持久化设计。数据被追加到日志文件中,当数据不再被需要时,日志文件会被自动清理。
② JDBC 持久化(基于数据库)
将消息存入 MySQL 等关系型数据库,会创建三张表:activemq_msgs(消息存储)、activemq_acks(确认信息)、activemq_lock(分布式锁)。Queue 和 Topic 的消息都存储在activemq_msgs表中。优点是便于跨节点恢复和统一管理,但性能相对较差。
③ LevelDB 持久化(高可用)
基于 Google 开发的 LevelDB 高性能键值存储引擎,虽然已经标记为弃用(推荐 KahaDB),但在基于 ZooKeeper 的高可用集群中仍然有重要作用。
7.3 配置持久化方式
在conf/activemq.xml中指定:
xml
<!-- KahaDB 持久化(默认,无需修改) --> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> <!-- JDBC 持久化示例 --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>第八章:消息确认机制(ACK)
消息确认机制决定了消息何时被视为 "已投递" 或 "已消费",直接影响消息丢失和重复消费的风险。ActiveMQ 支持三种确认模式:
| 确认模式 | 说明 | 适用场景 |
|---|---|---|
| AUTO_ACKNOWLEDGE(自动确认) | 默认模式,receive()方法返回或onMessage()成功执行后,自动确认 | 对消息丢失不太敏感的场景 |
| CLIENT_ACKNOWLEDGE(客户端手动确认) | 在消费端显式调用message.acknowledge()才会确认 | 必须先确保业务处理完成再确认的场景 |
| SESSION_TRANSACTED(事务会话) | 一组消息在一个事务中进行,可批量提交或回滚 | 需要原子性发送/接收的场景 |
代码示例——手动确认模式:
java
// 创建会话时指定手动确认 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 消费端 Message message = consumer.receive(); // ... 处理业务逻辑 ... message.acknowledge(); // 业务处理成功后手动确认
第九章:高级特性速览
9.1 持久订阅(Durable Subscriber)
在 Topic 模式下,普通消费者如果离线,就会错过离线期间的消息。持久订阅则能解决这个问题:即便消费者离线,ActiveMQ 也会为其保存消息,等到它重新上线后投递。
创建持久订阅的关键代码:
java
// 设置客户端ID(订阅者唯一标识) connection.setClientID("myDurableSubscriber"); // 创建持久订阅者 MessageConsumer consumer = session.createDurableSubscriber(topic, "subscriptionName");9.2 虚拟主题(Virtual Topics)
虚拟主题是 ActiveMQ 的一个高级特性,结合了 Queue 和 Topic 的优点:既能像 Topic 一样广播消息,又能像 Queue 一样让消费者持久接收。
9.3 消息过滤与选择器
消费者可以通过选择器有选择地接收消息:
java
// 只接收 priority > 5 且 type = ' urgent ' 的消息 String selector = "priority > 5 AND type = 'urgent'"; MessageConsumer consumer = session.createConsumer(queue, selector);
9.4 死信队列(DLQ)
如果消息因为某些原因无法被正常消费(如消息格式错误),ActiveMQ 会将其转入死信队列(Dead Letter Queue),避免阻塞正常消息处理。
9.5 延迟/定时消息
ActiveMQ 支持在指定时间后投递消息,实现定时任务的效果:
java
TextMessage message = session.createTextMessage("延迟消息"); // 设置延迟 30 秒投递 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 30000); producer.send(message);第十章:集群与高可用架构
生产环境中,单台 ActiveMQ 存在单点故障风险。通过集群和主从架构,可以有效提升可用性。
10.1 主从架构(Master-Slave)
ActiveMQ 主从架构的原理是:在多个 Broker 实例之间共享持久化存储。只有一台 "主" 服务器对外提供读写服务,其余 "从" 服务器则处于待命状态等待随时接管。一旦主服务器宕机,某个从服务器会迅速获得持久化存储的锁,升级为新的主服务器,从而保障客户端能够持续正常运转。
三种主从实现方式对比:
| 方式 | 原理 | 优缺点 |
|---|---|---|
| 共享文件系统(KahaDB) | 多个 Broker 共享同一个 KahaDB 数据目录 | 简单易配,但依赖共享存储的可靠性 |
| JDBC 主从 | 多个 Broker 共享同一个数据库 | 可利用数据库高可用,但性能较低 |
| ZooKeeper + LevelDB | 通过 ZooKeeper 注册所有 Broker 并进行选举 | 自动故障转移,生产环境推荐方案 |
10.2 ZooKeeper + LevelDB 集群方案
这是目前生产环境最推荐的高可用方案,从 ActiveMQ 5.9 开始引入。
工作原理:
在多个独立的物理机或虚拟机上都分别安装并运行 ActiveMQ,这些节点通过 ZooKeeper 进行注册和协调
ZooKeeper 从各节点中选举一个作为 Master(主节点),其余作为 Slave(从节点)
Master 负责提供服务,Slave 只与 Master 同步数据且不接受客户端连接
如果 Master 宕机,ZooKeeper 自动从 Slave 中选举新的 Master
故障节点恢复后自动加入集群成为 Slave
优势:自动故障转移、数据多副本保障不丢失,而代价是需要额外维护一个 ZooKeeper 集群。
10.3 网络集群(Network of Brokers)
适用于需要负载均衡和跨 Broker 消息路由的场景。多个独立的 Broker 组成一个网络,互相转发消息,实现消息在不同 Broker 之间的自由流动。
一个实用的组合策略是:Master-Slave 负责高可用,Network of Brokers 负责负载均衡,两者搭配使用:先搭建多组主从实现高可用,再通过网络集群串联这些主从组来实现横向扩展。
第十一章:监控与运维
11.1 Web 管理控制台
ActiveMQ 自带功能全面的 Web 后台管理界面,访问地址http://localhost:8161/admin/,默认账号密码均为admin。
登录后可以实时查看:
📊 Queues 和 Topics 的消息数量、消费者数量
🗑️ 手动删除/移动消息
⚙️ Broker 运行状态和配置信息
11.2 关键性能优化参数
初学者最容易踩坑的地方在于性能配置不当,只需关注这四个参数即可解决大部分问题:
| 优化项 | 配置参数 | 作用 |
|---|---|---|
| 预取限制 | prefetchLimit | 控制消费者单次获取消息的数量,防止个别消费者 "囤积" 太多消息导致其他消费者饥饿 |
| 异步发送 | useAsyncSend=true | 生产者改为异步发送,大幅提升吞吐量 |
| 内存分配 | ACTIVEMQ_OPTS环境变量 | 根据服务器负载调整 JVM 内存大小 |
| 死信队列 | 自动启用(DLQ) | 处理无法投递的消息,避免阻塞正常队列 |
11.3 常见问题排查
| 问题 | 可能原因 | 解决思路 |
|---|---|---|
| 消息堆积 | 消费速度跟不上生产速度 | 增加消费者数量、优化消费逻辑、设置消息过期时间 |
| 连接不上 | 网络配置或防火墙 | 检查 61616 和 8161 端口是否开放 |
| 内存溢出 | JVM 内存配置过低 | 调整ACTIVEMQ_OPTS中的-Xmx参数 |
🎯 学习路线图
对于小白同学,建议按照以下顺序逐步学习:
text
第一步:理解核心概念 ├── 什么是消息队列?为什么需要它? └── JMS 规范的基础对象模型(Connection、Session、Producer/Consumer) 第二步:动手实践 ├── 安装 ActiveMQ → 访问 Web 控制台 ├── 编写纯 Java 程序收发消息(Queue 和 Topic) └── 理解 Queue 和 Topic 的核心区别 第三步:Spring Boot 集成 ├── 引入依赖和配置 ├── 使用 JmsMessagingTemplate 发送消息 └── 使用 @JmsListener 接收消息 第四步:深入进阶 ├── 消息持久化(KahaDB、JDBC) ├── 消息确认机制(AUTO / CLIENT / 事务) ├── 持久订阅和死信队列 └── 集群与高可用部署
❓ 常见面试题精选
Q1:ActiveMQ 的消息模型有哪些?有什么区别?
答:主要有点对点(Queue)和发布/订阅(Topic)两种。Queue 中一条消息只能被一个消费者消费,消息会持久保存;Topic 中一条消息会被所有订阅者收到,消费者必须在消息发送前订阅。Queue 适用于任务分发,Topic 适用于消息广播。
Q2:如何保证消息不丢失?
答:从三个环节保障——① 生产者发送时使用持久化消息(默认);② Broker 端使用持久化存储(KahaDB/JDBC)并及时 fsync 刷盘;③ 消费者端使用手动确认(CLIENT_ACKNOWLEDGE),确保消息处理完成后才确认。
Q3:ActiveMQ 如何实现高可用?
答:常用方案是 ZooKeeper + LevelDB 主从集群。ZooKeeper 负责选举 Master,LevelDB 负责数据持久化和同步。Master 对外提供服务,Slave 实时同步数据,Master 宕机时 ZooKeeper 自动选举新的 Master 实现故障转移。
Q4:如何处理消息堆积?
答:首先增加消费者数量进行分流;其次优化消费端的业务处理逻辑,缩短单条消息处理时间;此外可以设置消息过期时间(TTL),避免消息无限期积压;最后合理配置prefetchLimit预取限制,防止消息分配不均。
📖 推荐资源
ActiveMQ 官方文档:https://activemq.apache.org/
ActiveMQ Classic 下载:http://activemq.apache.org/components/classic/download/
祝学习愉快!遇到问题可以先去 Web 控制台的 Queues 页面检查消息状态,这是排查问题最直观有效的方式。