Spring Boot 定时任务与异步处理最佳实践
引言
定时任务和异步处理是企业级应用中常见的需求,如定时数据同步、异步消息处理、批量任务执行等。Spring Boot 提供了强大的定时任务和异步处理支持,通过简单的注解即可实现复杂的调度需求。本文将深入探讨定时任务与异步处理的最佳实践。
一、定时任务基础
1.1 启用定时任务
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }1.2 基础定时任务
import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @Slf4j @Component public class BasicScheduledTasks { @Scheduled(fixedRate = 5000) public void fixedRateTask() { log.info("Fixed rate task executed at: {}", LocalDateTime.now()); } @Scheduled(fixedDelay = 3000) public void fixedDelayTask() { log.info("Fixed delay task executed at: {}", LocalDateTime.now()); } @Scheduled(initialDelay = 1000, fixedRate = 5000) public void initialDelayTask() { log.info("Initial delay task executed at: {}", LocalDateTime.now()); } @Scheduled(cron = "0 0 2 * * ?") public void cronTask() { log.info("Cron task executed at: {}", LocalDateTime.now()); } }1.3 Cron 表达式详解
| 字段 | 允许值 | 允许的特殊字符 |
|---|---|---|
| 秒 | 0-59 | , - * / |
| 分 | 0-59 | , - * / |
| 时 | 0-23 | , - * / |
| 日 | 1-31 | , - * ? / L W |
| 月 | 1-12 或 JAN-DEC | , - * / |
| 周 | 1-7 或 SUN-SAT | , - * ? / L # |
常用 Cron 表达式示例:
| 表达式 | 含义 |
|---|---|
0 0 2 * * ? | 每天凌晨2点执行 |
0 30 18 * * ? | 每天下午6:30执行 |
0 0 0 * * MON | 每周一凌晨0点执行 |
0 0 0 1 * ? | 每月1号凌晨0点执行 |
0 0/5 * * * ? | 每5分钟执行一次 |
0 30 9-17 * * MON-FRI | 工作日9:30到17:30每小时执行 |
二、定时任务配置
2.1 配置文件
spring: application: name: scheduling-demo scheduling: pool: size: 10 thread-name-prefix: scheduler- logging: level: com.example.app: DEBUG2.2 自定义任务调度器
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class SchedulingConfig { @Value("${scheduling.pool.size:10}") private int poolSize; @Value("${scheduling.thread-name-prefix:scheduler-}") private String threadNamePrefix; @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(poolSize); scheduler.setThreadNamePrefix(threadNamePrefix); scheduler.setAwaitTerminationSeconds(60); scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.setErrorHandler(throwable -> { log.error("Scheduled task error", throwable); }); scheduler.initialize(); return scheduler; } }2.3 动态定时任务
import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; @Slf4j @Service public class DynamicSchedulingService { private final TaskScheduler taskScheduler; private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>(); public DynamicSchedulingService(TaskScheduler taskScheduler) { this.taskScheduler = taskScheduler; } public String scheduleTask(String cronExpression, Runnable task) { String taskId = UUID.randomUUID().toString(); ScheduledFuture<?> future = taskScheduler.schedule(task, new CronTrigger(cronExpression)); scheduledTasks.put(taskId, future); log.info("Scheduled task with id: {}, cron: {}", taskId, cronExpression); return taskId; } public void cancelTask(String taskId) { ScheduledFuture<?> future = scheduledTasks.remove(taskId); if (future != null) { future.cancel(false); log.info("Cancelled task with id: {}", taskId); } } public void updateTask(String taskId, String newCronExpression, Runnable newTask) { cancelTask(taskId); scheduleTask(newCronExpression, newTask); } public int getActiveTaskCount() { return scheduledTasks.size(); } }三、异步处理基础
3.1 启用异步处理
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication @EnableAsync public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }3.2 基础异步方法
import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.concurrent.CompletableFuture; @Slf4j @Service public class AsyncService { @Async public void asyncMethod() { log.info("Async method started at: {}", LocalDateTime.now()); // 执行耗时操作 try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } log.info("Async method completed at: {}", LocalDateTime.now()); } @Async public CompletableFuture<String> asyncWithReturnValue() { log.info("Async with return value started"); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return CompletableFuture.completedFuture("Async result"); } @Async("customExecutor") public void asyncWithCustomExecutor() { log.info("Async with custom executor: {}", Thread.currentThread().getName()); } }3.3 异步配置
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @EnableAsync public class AsyncConfig { @Value("${async.core-pool-size:5}") private int corePoolSize; @Value("${async.max-pool-size:10}") private int maxPoolSize; @Value("${async.queue-capacity:100}") private int queueCapacity; @Value("${async.thread-name-prefix:async-}") private String threadNamePrefix; @Bean(name = "taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(threadNamePrefix); executor.setRejectedExecutionHandler((r, e) -> { log.warn("Task rejected from executor: {}", e); }); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; } @Bean(name = "customExecutor") public Executor customExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(5); executor.setQueueCapacity(50); executor.setThreadNamePrefix("custom-"); executor.initialize(); return executor; } }四、异步任务组合
4.1 CompletableFuture 组合
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; @Slf4j @Service public class AsyncCompositionService { public CompletableFuture<String> fetchUserData(String userId) { return CompletableFuture.supplyAsync(() -> { log.info("Fetching user data for: {}", userId); try { Thread.sleep(1000); } catch (InterruptedException e) {} return "{\"userId\": \"" + userId + "\", \"name\": \"John\"}"; }); } public CompletableFuture<String> fetchUserOrders(String userId) { return CompletableFuture.supplyAsync(() -> { log.info("Fetching orders for: {}", userId); try { Thread.sleep(1500); } catch (InterruptedException e) {} return "{\"userId\": \"" + userId + "\", \"orders\": [1, 2, 3]}"; }); } public CompletableFuture<String> fetchUserPreferences(String userId) { return CompletableFuture.supplyAsync(() -> { log.info("Fetching preferences for: {}", userId); try { Thread.sleep(800); } catch (InterruptedException e) {} return "{\"userId\": \"" + userId + "\", \"theme\": \"dark\"}"; }); } public CompletableFuture<UserData> getUserData(String userId) { return CompletableFuture.allOf( fetchUserData(userId), fetchUserOrders(userId), fetchUserPreferences(userId) ).thenApplyAsync(ignored -> { String userData = fetchUserData(userId).join(); String orders = fetchUserOrders(userId).join(); String preferences = fetchUserPreferences(userId).join(); return UserData.builder() .userId(userId) .userInfo(userData) .orders(orders) .preferences(preferences) .build(); }); } @lombok.Data @lombok.Builder public static class UserData { private String userId; private String userInfo; private String orders; private String preferences; } }4.2 异步异常处理
import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; @Slf4j @Service public class AsyncExceptionHandlingService { @Async public CompletableFuture<String> taskWithException() { return CompletableFuture.supplyAsync(() -> { log.info("Task with exception started"); throw new RuntimeException("Async task failed"); }).exceptionally(ex -> { log.error("Async task error", ex); return "Error handled: " + ex.getMessage(); }); } @Async public CompletableFuture<String> taskWithRetry() { return executeWithRetry(() -> { log.info("Executing task"); if (Math.random() > 0.3) { throw new RuntimeException("Temporary failure"); } return "Success"; }, 3); } private <T> CompletableFuture<T> executeWithRetry( java.util.function.Supplier<T> task, int maxRetries) { return CompletableFuture.supplyAsync(task) .exceptionally(ex -> { if (maxRetries > 0) { log.warn("Retry {} times remaining", maxRetries); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return executeWithRetry(task, maxRetries - 1).join(); } throw new RuntimeException("Max retries exceeded", ex); }); } }五、定时任务进阶
5.1 分布式定时任务
import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; @Slf4j @Component public class DistributedScheduledTask { private static final String LOCK_KEY = "scheduled:task:lock"; private static final long LOCK_EXPIRE = 30000; private final StringRedisTemplate redisTemplate; public DistributedScheduledTask(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } @Scheduled(cron = "0 0/5 * * * ?") public void distributedTask() { String lockValue = LocalDateTime.now().toString(); Boolean acquired = redisTemplate.opsForValue() .setIfAbsent(LOCK_KEY, lockValue, LOCK_EXPIRE, TimeUnit.MILLISECONDS); if (Boolean.TRUE.equals(acquired)) { try { log.info("Distributed task executed at: {}", LocalDateTime.now()); executeTask(); } finally { String currentValue = redisTemplate.opsForValue().get(LOCK_KEY); if (lockValue.equals(currentValue)) { redisTemplate.delete(LOCK_KEY); } } } else { log.info("Task is running on another node"); } } private void executeTask() { // 实际任务逻辑 log.info("Executing distributed task..."); } }5.2 任务调度监控
import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.scheduling.annotation.Scheduled; import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @Slf4j @Component public class TaskMonitor { private final Map<String, TaskMetrics> taskMetrics = new ConcurrentHashMap<>(); @Scheduled(cron = "0 0/1 * * * ?") public void monitorTask() { TaskMetrics metrics = taskMetrics.computeIfAbsent("monitor", k -> new TaskMetrics()); metrics.recordExecution(); log.info("Task monitor - Executions: {}, Avg Duration: {}ms", metrics.getExecutionCount(), metrics.getAverageDuration()); } @Data public static class TaskMetrics { private final AtomicLong executionCount = new AtomicLong(0); private final AtomicLong totalDuration = new AtomicLong(0); private volatile LocalDateTime lastExecutionTime; public void recordExecution() { executionCount.incrementAndGet(); lastExecutionTime = LocalDateTime.now(); } public void recordDuration(long durationMs) { totalDuration.addAndGet(durationMs); } public double getAverageDuration() { long count = executionCount.get(); return count > 0 ? (double) totalDuration.get() / count : 0; } } }5.3 任务执行超时处理
import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.concurrent.*; @Slf4j @Component public class TimeoutHandlingTask { private final ExecutorService executorService = Executors.newSingleThreadExecutor(); @Scheduled(fixedRate = 10000) public void taskWithTimeout() { log.info("Task with timeout started at: {}", LocalDateTime.now()); CompletableFuture<Void> task = CompletableFuture.runAsync(() -> { try { executeLongRunningTask(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("Task interrupted"); } }, executorService); try { task.get(5, TimeUnit.SECONDS); log.info("Task completed successfully"); } catch (TimeoutException e) { log.error("Task timed out after 5 seconds"); task.cancel(true); } catch (InterruptedException | ExecutionException e) { log.error("Task execution failed", e); } } private void executeLongRunningTask() throws InterruptedException { log.info("Executing long running task..."); Thread.sleep(8000); log.info("Long running task completed"); } }六、异步事件处理
6.1 定义事件
import lombok.Getter; import org.springframework.context.ApplicationEvent; @Getter public class OrderCreatedEvent extends ApplicationEvent { private final String orderId; private final String customerId; public OrderCreatedEvent(Object source, String orderId, String customerId) { super(source); this.orderId = orderId; this.customerId = customerId; } }6.2 发布事件
import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; @Slf4j @Service @RequiredArgsConstructor public class OrderService { private final ApplicationEventPublisher eventPublisher; public void createOrder(String customerId) { String orderId = "ORD-" + System.currentTimeMillis(); log.info("Creating order: {}", orderId); eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId, customerId)); log.info("Order created successfully: {}", orderId); } }6.3 异步事件监听
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Slf4j @Component public class OrderEventListener { @Async @EventListener public void handleOrderCreatedEvent(OrderCreatedEvent event) { log.info("Async handling order created event: {}", event.getOrderId()); try { sendNotification(event.getCustomerId(), event.getOrderId()); updateInventory(event.getOrderId()); logOrder(event.getOrderId()); } catch (Exception e) { log.error("Error handling order created event", e); } } private void sendNotification(String customerId, String orderId) throws InterruptedException { log.info("Sending notification to customer: {}", customerId); Thread.sleep(500); } private void updateInventory(String orderId) throws InterruptedException { log.info("Updating inventory for order: {}", orderId); Thread.sleep(800); } private void logOrder(String orderId) throws InterruptedException { log.info("Logging order to audit system: {}", orderId); Thread.sleep(300); } }七、最佳实践
7.1 定时任务最佳实践
- 线程池配置:合理配置线程池大小,避免资源浪费
- 异常处理:每个定时任务应有独立的异常处理逻辑
- 幂等性保证:确保任务多次执行结果一致
- 分布式锁:多实例部署时使用分布式锁防止重复执行
- 监控告警:添加任务执行监控和失败告警
- 任务隔离:不同类型的任务使用不同的线程池
7.2 异步处理最佳实践
- 线程池隔离:不同类型的异步任务使用独立线程池
- 超时控制:设置合理的超时时间,避免任务无限等待
- 异常传播:正确处理异步任务的异常
- 返回值处理:使用 CompletableFuture 处理异步返回值
- 资源清理:及时清理不再需要的异步任务
- 监控指标:记录异步任务的执行时间和成功率
7.3 性能优化建议
- 合理设置线程池参数:根据任务类型和系统资源调整线程池大小
- 避免阻塞操作:异步任务中避免长时间阻塞操作
- 批量处理:对于高频任务考虑批量处理
- 缓存优化:对于重复计算使用缓存
- 异步组合:合理使用 CompletableFuture 组合多个异步任务
7.4 安全考虑
- 任务权限控制:限制定时任务的访问权限
- 敏感信息保护:避免在日志中输出敏感信息
- 输入验证:对任务参数进行严格验证
- 防止任务注入:禁止动态加载未知任务
结语
定时任务和异步处理是构建高性能、高可用应用的重要组成部分。通过合理配置线程池、实现异常处理机制、确保任务幂等性和添加监控告警,可以构建健壮的任务调度系统。在实际项目中,应根据业务需求选择合适的调度策略,并不断优化性能和可靠性。