1. 重试机制、超时控制、失败兜底方案设计
我来详细讲解Spring Cloud中的重试机制、超时控制、失败兜底方案 的设计与实现。
一、整体架构设计
分层防护策略
客户端请求 → 超时控制 → 重试机制 → 熔断降级 → 失败兜底二、超时控制方案
Feign客户端超时配置
# application.yml feign: client: config: default: # 全局默认配置 connectTimeout: 5000 # 连接超时(ms) readTimeout: 10000 # 读取超时(ms) service-provider: # 针对特定服务 connectTimeout: 3000 readTimeout: 5000 # 或使用Ribbon配置(Feign底层使用Ribbon) ribbon: ConnectTimeout: 3000 ReadTimeout: 10000 OkToRetryOnAllOperations: false # 只对GET请求重试 MaxAutoRetries: 0 # 当前服务实例重试次数 MaxAutoRetriesNextServer: 1 # 切换实例重试次数RestTemplate超时配置
@Configuration public class RestTemplateConfig { @Bean public RestTemplate restTemplate() { // 使用HttpClient连接池 PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(200); connectionManager.setDefaultMaxPerRoute(50); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(5000) .setConnectionRequestTimeout(2000) .setSocketTimeout(10000) .build(); HttpClient httpClient = HttpClientBuilder.create() .setConnectionManager(connectionManager) .setDefaultRequestConfig(requestConfig) .build(); return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient)); } }WebClient超时配置(响应式)
@Bean public WebClient webClient() { HttpClient httpClient = HttpClient.create() .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .responseTimeout(Duration.ofSeconds(10)) .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(10)) .addHandlerLast(new WriteTimeoutHandler(10))); return WebClient.builder() .clientConnector(new ReactorClientHttpConnector(httpClient)) .build(); }三、重试机制实现
Spring Retry + Feign重试
// 1. 添加依赖 // spring-retry和aspectjweaver @Configuration @EnableRetry public class RetryConfig { @Bean public Retryer feignRetryer() { // 重试间隔100ms,最大重试间隔1s,最大重试次数3次 return new Retryer.Default(100, 1000, 3); } } // 2. Feign客户端启用重试 @FeignClient( name = "service-provider", configuration = FeignRetryConfig.class ) public interface ServiceClient { @GetMapping("/api/data") @Retryable( value = {FeignException.class, TimeoutException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2) ) ResponseData getData(@RequestParam("id") String id); // 兜底方法 @Recover ResponseData recover(FeignException e, String id) { return ResponseData.fallback(id); } } // 3. 自定义重试配置 public class FeignRetryConfig { @Bean public Retryer customRetryer() { return new Retryer.Default( 100, // 重试间隔 1000, // 最大重试间隔 3 // 最大重试次数 ) { @Override public void continueOrPropagate(RetryableException e) { // 只对特定异常重试 if (e.getCause() instanceof ConnectException || e.getCause() instanceof SocketTimeoutException) { super.continueOrPropagate(e); } else { throw e; } } }; } }Resilience4j重试(推荐)
// 1. 添加依赖:resilience4j-spring-boot2, resilience4j-retry // 2. 配置 resilience4j.retry: instances: serviceA: max-attempts: 3 wait-duration: 500ms retry-exceptions: - org.springframework.web.client.ResourceAccessException - java.util.concurrent.TimeoutException ignore-exceptions: - com.example.BusinessException enable-exponential-backoff: true exponential-backoff-multiplier: 2 exponential-max-wait-duration: 5s // 3. 使用注解 @Service public class RemoteService { @Retry(name = "serviceA", fallbackMethod = "fallback") public String callServiceA() { return restTemplate.getForObject("http://service-a/api", String.class); } private String fallback(Exception e) { return "fallback-response"; } } // 4. 编程式使用 @Bean public RetryRegistry retryRegistry() { RetryConfig config = RetryConfig.custom() .maxAttempts(3) .waitDuration(Duration.ofMillis(500)) .retryOnException(e -> e instanceof TimeoutException) .retryOnResult(response -> ((Response)response).getStatus() == 500) .intervalFunction(IntervalFunction.ofExponentialBackoff(500, 2)) .build(); return RetryRegistry.of(config); }四、失败兜底方案
服务降级(Fallback)
// 1. Feign Fallback @FeignClient( name = "user-service", fallback = UserServiceFallback.class, fallbackFactory = UserServiceFallbackFactory.class ) public interface UserServiceClient { @GetMapping("/users/{id}") User getUser(@PathVariable Long id); @PostMapping("/users") User createUser(@RequestBody User user); } // 2. Fallback实现 @Component public class UserServiceFallback implements UserServiceClient { @Override public User getUser(Long id) { return User.builder() .id(id) .name("默认用户") .status("降级") .build(); } @Override public User createUser(User user) { throw new ServiceDegradeException("用户服务暂时不可用"); } } // 3. FallbackFactory(可获取异常信息) @Component public class UserServiceFallbackFactory implements FallbackFactory<UserServiceClient> { @Override public UserServiceClient create(Throwable cause) { return new UserServiceClient() { @Override public User getUser(Long id) { log.warn("用户服务降级,异常:{}", cause.getMessage()); return getDefaultUser(id); } @Override public User createUser(User user) { // 异步记录失败请求,后续补偿 asyncSaveFailedRequest(user); throw new BusinessException("服务暂时不可用,请稍后重试"); } }; } }熔断器(Circuit Breaker)
// Resilience4j熔断器配置 resilience4j.circuitbreaker: instances: userService: sliding-window-size: 10 minimum-number-of-calls: 5 permitted-number-of-calls-in-half-open-state: 3 automatic-transition-from-open-to-half-open-enabled: true wait-duration-in-open-state: 10s failure-rate-threshold: 50 event-consumer-buffer-size: 10 record-exceptions: - org.springframework.web.client.ResourceAccessException - java.util.concurrent.TimeoutException - java.io.IOException ignore-exceptions: - com.example.BusinessException // 使用 @Service public class UserService { @CircuitBreaker( name = "userService", fallbackMethod = "fallbackGetUser" ) public User getUser(Long id) { return userServiceClient.getUser(id); } public User fallbackGetUser(Long id, Exception e) { // 从本地缓存获取 return cacheService.getUserFromCache(id) .orElseGet(() -> User.defaultUser(id)); } @Bulkhead( name = "userService", type = Bulkhead.Type.SEMAPHORE, fallbackMethod = "bulkheadFallback" ) public User createUser(User user) { return userServiceClient.createUser(user); } }多级降级策略
@Component public class MultiLevelFallbackService { @Autowired private CacheService cacheService; @Autowired private LocalStorageService localStorageService; @Autowired private AsyncCompensationService compensationService; /** * 三级降级策略: * 1. 主服务 → 2. 本地缓存 → 3. 静态数据 → 4. 异步补偿 */ public User getUserWithMultiFallback(Long userId) { try { // 第一级:主服务 return userServiceClient.getUser(userId); } catch (Exception e) { log.warn("主服务调用失败,尝试本地缓存", e); try { // 第二级:本地缓存 return cacheService.getUser(userId) .orElseThrow(() -> new CacheNotFoundException("缓存未命中")); } catch (Exception cacheEx) { log.warn("缓存获取失败,尝试静态数据", cacheEx); // 第三级:静态数据/默认值 User staticUser = getStaticUser(userId); // 第四级:异步补偿(记录失败,后续重试) compensationService.recordFailedRequest( "getUser", userId, e); return staticUser; } } } /** * 智能路由:根据服务状态选择最优服务 */ public Response smartRoute(Request request) { List<ServiceEndpoint> endpoints = serviceRegistry.getHealthyEndpoints(); for (ServiceEndpoint endpoint : endpoints) { try { return callWithTimeout(endpoint, request, 2000); } catch (Exception e) { metrics.recordFailure(endpoint); continue; } } // 所有服务都失败,返回降级响应 return getDegradedResponse(request); } }五、完整配置示例
综合配置YAML
# application.yml spring: cloud: openfeign: client: config: default: connectTimeout: 3000 readTimeout: 10000 loggerLevel: full circuitbreaker: enabled: true compression: request: enabled: true response: enabled: true # Resilience4j配置 resilience4j: retry: instances: backendA: max-attempts: 3 wait-duration: 500ms retry-exceptions: - org.springframework.web.client.ResourceAccessException retry-on-result-status: [500, 502, 503] circuitbreaker: instances: backendA: register-health-indicator: true sliding-window-size: 10 minimum-number-of-calls: 5 permitted-number-of-calls-in-half-open-state: 3 wait-duration-in-open-state: 10s failure-rate-threshold: 50 bulkhead: instances: backendA: max-concurrent-calls: 10 max-wait-duration: 10ms timelimiter: instances: backendA: timeout-duration: 5s # 线程池配置(防止雪崩) hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 10000 circuitBreaker: requestVolumeThreshold: 20 sleepWindowInMilliseconds: 5000 fallback: isolation: semaphore: maxConcurrentRequests: 50全局异常处理
@RestControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler(FeignException.class) public ResponseEntity<ErrorResponse> handleFeignException(FeignException e) { log.error("Feign调用异常", e); ErrorResponse error = ErrorResponse.builder() .code("SERVICE_UNAVAILABLE") .message("依赖服务暂时不可用") .suggestion("请稍后重试") .timestamp(System.currentTimeMillis()) .build(); return ResponseEntity .status(HttpStatus.SERVICE_UNAVAILABLE) .body(error); } @ExceptionHandler(CircuitBreakerOpenException.class) public ResponseEntity<ErrorResponse> handleCircuitBreakerOpen( CircuitBreakerOpenException e) { ErrorResponse error = ErrorResponse.builder() .code("CIRCUIT_BREAKER_OPEN") .message("服务熔断保护中") .suggestion("请等待10秒后重试") .retryAfter(10) .build(); return ResponseEntity .status(HttpStatus.TOO_MANY_REQUESTS) .header("Retry-After", "10") .body(error); } }六、最佳实践建议
重试策略选择
- GET请求 :可安全重试(幂等)
- POST/PUT请求 :谨慎重试,需业务判断
- 关键业务 :有限重试+告警
- 非关键业务 :快速失败
超时设置原则
# 推荐配置 超时层级: 1. 连接超时: 1-3s 2. 读取超时: 5-10s 3. 总超时: 10-30s 4. 熔断超时: 单独设置监控与告警
@Component public class ResilienceMetrics { @Autowired private MeterRegistry meterRegistry; @EventListener public void onRetryEvent(RetryOnRetryEvent event) { // 记录重试指标 meterRegistry.counter("retry.attempts", "method", event.getMethodName()) .increment(); } @EventListener public void onCircuitBreakerEvent(CircuitBreakerOnStateTransitionEvent event) { // 熔断器状态变更告警 if (event.getStateTransition().getToState() == CircuitBreaker.State.OPEN) { alertService.sendAlert("熔断器开启: " + event.getCircuitBreakerName()); } } }配置管理建议
# 不同环境不同配置 --- spring: profiles: dev resilience4j.retry.instances.backendA.max-attempts: 5 --- spring: profiles: prod resilience4j.retry.instances.backendA.max-attempts: 3 # 生产环境减少重试七、常见问题解决方案
重试导致的重复提交
// 使用幂等令牌 @PostMapping("/orders") public Order createOrder(@RequestBody OrderRequest request, @RequestHeader("Idempotency-Key") String idempotencyKey) { // 检查是否已处理 if (idempotencyService.isProcessed(idempotencyKey)) { return idempotencyService.getResult(idempotencyKey); } // 处理请求并保存结果 Order order = orderService.create(request); idempotencyService.saveResult(idempotencyKey, order); return order; }级联故障预防
// 使用Bulkhead隔离 @Bulkhead(name = "paymentService", type = Bulkhead.Type.THREADPOOL) @CircuitBreaker(name = "paymentService") @Retry(name = "paymentService") @TimeLimiter(name = "paymentService") public CompletableFuture<PaymentResult> processPayment(PaymentRequest request) { return CompletableFuture.supplyAsync(() -> paymentClient.process(request)); }这样的设计可以提供:
- 弹性 :系统能够从故障中恢复
- 稳定性 :避免级联故障
- 可观测性 :完善的监控和告警
- 用户体验 :优雅的降级和友好的错误提示
2. 服务隔离:线程池隔离、信号量隔离实战
Spring Cloud 服务隔离主要通过 Hystrix 实现(虽然目前已进入维护模式,但很多项目仍在用,且思想通用)。下面我将详细讲解 线程池隔离 和 信号量隔离 的原理、区别与实战代码。
一、隔离机制对比
表格 还在加载中,请等待加载完成后再尝试复制
二、线程池隔离实战
添加依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>启动类开启Hystrix
@SpringBootApplication @EnableCircuitBreaker // 或 @EnableHystrix public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }使用线程池隔离
@Service public class OrderService { // 1. 使用@HystrixCommand配置线程池隔离(默认) @HystrixCommand( commandKey = "getOrderById", groupKey = "OrderService", threadPoolKey = "orderThreadPool", // 线程池标识 threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "10"), // 核心线程数 @HystrixProperty(name = "maxQueueSize", value = "20"), // 队列大小 @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15") // 队列拒绝阈值 }, commandProperties = { @HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"), // 隔离策略 @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000"), @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10") }, fallbackMethod = "getOrderFallback" ) public Order getOrderById(Long id) { // 模拟远程调用 return restTemplate.getForObject("http://order-service/orders/" + id, Order.class); } // 2. 降级方法 public Order getOrderFallback(Long id) { return Order.builder() .id(id) .name("默认订单") .status("服务降级") .build(); } }线程池配置详解
hystrix: command: default: execution: isolation: strategy: THREAD # 全局默认线程隔离 thread: timeoutInMilliseconds: 5000 threadpool: default: coreSize: 20 maximumSize: 30 allowMaximumSizeToDivergeFromCoreSize: true maxQueueSize: -1 # -1表示使用SynchronousQueue,0表示无队列 queueSizeRejectionThreshold: 5三、信号量隔离实战
信号量隔离配置
@Service public class CacheService { @HystrixCommand( commandKey = "getFromLocalCache", commandProperties = { @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"), // 信号量隔离 @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "100"), // 最大并发数 // 注意:信号量隔离不支持线程超时,需依赖外部超时控制 @HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "50") }, fallbackMethod = "cacheFallback" ) public String getFromLocalCache(String key) { // 本地内存缓存访问,快速操作 return localCache.get(key); } public String cacheFallback(String key) { return "cache-fallback-value"; } }信号量适用场景示例
@Component public class FastLocalService { // 高频、快速的内存操作 @HystrixCommand( commandProperties = { @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"), @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "500") } ) public Boolean validateToken(String token) { // 本地JWT验证,无网络IO return jwtUtil.validate(token); } // 聚合多个快速本地操作 @HystrixCommand( commandProperties = { @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"), @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "200") } ) public UserInfo getCurrentUser() { String token = getTokenFromContext(); Boolean valid = validateToken(token); if (!valid) throw new RuntimeException("Invalid token"); return getUserFromThreadLocal(); } }四、高级配置与监控
自定义线程池
@Configuration public class HystrixConfig { @Bean public HystrixCommandAspect hystrixAspect() { return new HystrixCommandAspect(); } // 通过代码配置 @PostConstruct public void init() { HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD) .withExecutionTimeoutInMilliseconds(3000); HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter() .withCoreSize(15) .withMaximumSize(30) .withAllowMaximumSizeToDivergeFromCoreSize(true); } }Hystrix Dashboard监控
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> </dependency>@EnableHystrixDashboard @EnableTurbine // 聚合多个实例 public class MonitorApplication { // 访问 http://localhost:port/hystrix }五、Resilience4j替代方案(推荐)
由于Hystrix停止更新,建议使用Resilience4j:
依赖
<dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>线程池隔离(Bulkhead模式)
resilience4j: thread-pool-bulkhead: instances: orderService: maxThreadPoolSize: 10 coreThreadPoolSize: 5 queueCapacity: 20@Service public class OrderServiceV2 { @Bulkhead(name = "orderService", type = Bulkhead.Type.THREADPOOL) @TimeLimiter(name = "orderService") public CompletableFuture<Order> getOrderAsync(Long id) { return CompletableFuture.supplyAsync(() -> restTemplate.getForObject("http://order-service/orders/" + id, Order.class) ); } }信号量隔离
resilience4j: semaphore-bulkhead: instances: cacheService: maxConcurrentCalls: 100 maxWaitDuration: 10ms@Bulkhead(name = "cacheService", type = Bulkhead.Type.SEMAPHORE) public String getFromCache(String key) { return cache.get(key); }六、选择建议
- 使用线程池隔离 :
- 调用第三方HTTP服务
- 数据库查询(特别是慢查询)
- 可能长时间阻塞的操作
- 使用信号量隔离 :
- 内存缓存访问
- 快速本地计算
- 内部方法调用(无网络IO)
- 最佳实践 :
- 为不同的依赖服务配置不同的线程池
- 设置合理的超时时间(略大于P99响应时间)
- 配合熔断器使用(失败率阈值、滑动窗口)
- 监控线程池队列堆积情况
七、常见问题
Q1:线程池隔离为什么能防止雪崩?
A:每个服务使用独立线程池,即使某个服务线程池耗尽,也不会影响其他服务的线程资源。
Q2:信号量隔离如何控制超时?
A:信号量隔离本身不控制超时,需要结合外部超时机制(如Feign/OKHttp超时)。
Q3:如何确定线程池大小?
公式参考:线程数 = (请求到达率 × 平均响应时间) + 缓冲系数 例如:QPS=100,平均响应时间=50ms,则需 100×0.05=5,加上缓冲可设为10。
以上是Spring Cloud服务隔离的完整实战方案。在实际项目中,建议根据具体场景选择合适的隔离策略,并通过监控不断调整参数。
3. 网关+服务双层限流架构设计
我来为你详细设计一个Spring Cloud网关+服务双层限流架构。
一、架构设计概览
客户端请求 ↓ [API网关层限流] ← 基于IP/用户/路径的全局限流 ↓ [路由转发] ↓ [服务层限流] ← 基于业务逻辑的细粒度限流 ↓ 业务服务处理二、网关层限流设计
使用Sentinel实现网关限流
# application.yml spring: cloud: gateway: routes: - id: user-service uri: lb://user-service predicates: - Path=/api/user/** filters: - name: RequestRateLimiter args: redis-rate-limiter.replenishRate: 10 # 每秒令牌数 redis-rate-limiter.burstCapacity: 20 # 令牌桶容量 redis-rate-limiter.requestedTokens: 1 # 每次请求消耗令牌数 - name: Sentinel args: resourceName: user_route grade: 1 # 0-线程数,1-QPS count: 100 # 阈值 limitApp: default自定义网关过滤器
@Component public class GatewayRateLimitFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress(); String path = exchange.getRequest().getPath().value(); String key = "gateway:rate:" + ip + ":" + path; // 使用Redis实现滑动窗口限流 Long current = redisTemplate.opsForValue().increment(key); if (current == 1) { redisTemplate.expire(key, 1, TimeUnit.SECONDS); } if (current > 100) { // QPS限制 exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } }基于Sentinel的网关规则配置
@Configuration public class GatewaySentinelConfig { @PostConstruct public void initGatewayRules() { Set<GatewayFlowRule> rules = new HashSet<>(); // API维度限流 rules.add(new GatewayFlowRule("user_api") .setCount(1000) .setIntervalSec(1) .setBurst(2000)); // 路由维度限流 rules.add(new GatewayFlowRule("user-service") .setResourceMode(SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID) .setCount(500)); // API分组限流 rules.add(new GatewayFlowRule("api_group") .setResourceMode(SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME) .setCount(2000)); GatewayRuleManager.loadRules(rules); } }三、服务层限流设计
使用Resilience4j实现服务限流
# 服务配置 resilience4j: ratelimiter: instances: userService: limit-for-period: 50 # 周期内允许的请求数 limit-refresh-period: 1s # 刷新周期 timeout-duration: 0 # 等待令牌的超时时间 allow-health-indicator: true服务层AOP限流
@Aspect @Component @Slf4j public class ServiceRateLimitAspect { @Autowired private RateLimiterRegistry rateLimiterRegistry; @Around("@annotation(rateLimit)") public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable { String methodName = joinPoint.getSignature().toShortString(); String key = "service:rate:" + methodName + ":" + getCurrentUserId(); RateLimiter limiter = rateLimiterRegistry.rateLimiter(key, () -> RateLimiterConfig.custom() .limitForPeriod(rateLimit.value()) .limitRefreshPeriod(Duration.ofSeconds(rateLimit.timeUnit())) .build()); // 获取许可 boolean permit = limiter.acquirePermission(); if (!permit) { throw new RateLimitException("服务繁忙,请稍后重试"); } return joinPoint.proceed(); } } // 自定义注解 @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RateLimit { int value() default 100; // 请求数 int timeUnit() default 1; // 时间单位(秒) }基于Guava的本地限流
@Service public class LocalRateLimitService { private final LoadingCache<String, RateLimiter> limiters = CacheBuilder.newBuilder() .maximumSize(1000) .expireAfterAccess(1, TimeUnit.HOURS) .build(new CacheLoader<String, RateLimiter>() { @Override public RateLimiter load(String key) { return RateLimiter.create(100); // 每秒100个请求 } }); public boolean tryAcquire(String key) { RateLimiter limiter = limiters.getUnchecked(key); return limiter.tryAcquire(); } // 动态调整限流策略 public void updateRate(String key, double qps) { limiters.put(key, RateLimiter.create(qps)); } }四、分布式限流方案
Redis + Lua分布式限流
-- rate_limit.lua local key = KEYS[1] -- 限流key local limit = tonumber(ARGV[1]) -- 限流大小 local window = tonumber(ARGV[2]) -- 时间窗口(秒) local current = redis.call('GET', key) if current == false then redis.call('SET', key, 1) redis.call('EXPIRE', key, window) return 1 else if tonumber(current) < limit then redis.call('INCR', key) return 1 else return 0 end end@Service public class DistributedRateLimiter { @Autowired private StringRedisTemplate redisTemplate; private static final String LUA_SCRIPT = "..."; // 上面的Lua脚本 public boolean tryAcquire(String key, int limit, int window) { List<String> keys = Collections.singletonList(key); Object result = redisTemplate.execute( new DefaultRedisScript<>(LUA_SCRIPT, Long.class), keys, String.valueOf(limit), String.valueOf(window) ); return "1".equals(String.valueOf(result)); } }滑动窗口限流算法
@Component public class SlidingWindowRateLimiter { // 使用Redis ZSet实现滑动窗口 public boolean isAllowed(String key, int maxRequests, long windowInSeconds) { long currentTime = System.currentTimeMillis(); long windowStart = currentTime - windowInSeconds * 1000; // 移除时间窗口外的请求 redisTemplate.opsForZSet().removeRangeByScore(key, 0, windowStart); // 获取当前窗口内的请求数 Long count = redisTemplate.opsForZSet().count(key, windowStart, currentTime); if (count < maxRequests) { // 添加当前请求 redisTemplate.opsForZSet().add(key, UUID.randomUUID().toString(), currentTime); redisTemplate.expire(key, windowInSeconds + 1, TimeUnit.SECONDS); return true; } return false; } }五、分层限流策略配置
限流规则配置中心
# Nacos配置中心 rate-limit: gateway: global: # 全局配置 qps: 10000 burst: 20000 api-groups: user-api: path-pattern: /api/user/** qps: 1000 strategy: IP # IP/TOKEN/PARAM order-api: path-pattern: /api/order/** qps: 500 strategy: USER_ID service: user-service: methods: getUserInfo: qps: 100 strategy: USER_ID updateUser: qps: 50 strategy: IP order-service: methods: createOrder: qps: 200 strategy: USER_ID fallback: "order.create.fallback"动态规则管理
@RestController @RequestMapping("/rate-limit") public class RateLimitController { @PostMapping("/update-rule") public void updateRule(@RequestBody RateLimitRule rule) { // 更新网关规则 updateGatewayRule(rule); // 更新服务规则 updateServiceRule(rule); // 广播到集群 eventPublisher.publishEvent(new RateLimitRuleChangeEvent(rule)); } @GetMapping("/metrics") public RateLimitMetrics getMetrics() { // 收集各层限流指标 return RateLimitMetrics.builder() .gatewayBlocked(getGatewayBlockedCount()) .serviceBlocked(getServiceBlockedCount()) .currentQps(getCurrentQps()) .build(); } }六、监控与告警
Prometheus监控指标
# application.yml management: endpoints: web: exposure: include: prometheus,health,metrics metrics: export: prometheus: enabled: true tags: application: ${spring.application.name}@Component public class RateLimitMetricsCollector { @Autowired private MeterRegistry meterRegistry; private final Counter gatewayBlockedCounter; private final Counter serviceBlockedCounter; private final DistributionSummary requestLatency; public RateLimitMetricsCollector() { gatewayBlockedCounter = Counter.builder("rate_limit.gateway.blocked") .description("网关层被限流的请求数") .register(meterRegistry); serviceBlockedCounter = Counter.builder("rate_limit.service.blocked") .description("服务层被限流的请求数") .register(meterRegistry); } public void recordGatewayBlock() { gatewayBlockedCounter.increment(); } }告警规则配置
# alert-rules.yml groups: - name: rate-limit-alerts rules: - alert: HighGatewayBlockRate expr: rate(rate_limit_gateway_blocked_total[5m]) > 10 for: 2m labels: severity: warning annotations: summary: "网关限流触发频繁" - alert: ServiceRateLimitTriggered expr: rate(rate_limit_service_blocked_total[5m]) > 50 for: 1m labels: severity: critical七、最佳实践建议
- 分层策略 :
- 网关层:粗粒度限流,防恶意攻击
- 服务层:细粒度限流,保护业务逻辑
- 限流维度 :
- IP地址
- 用户ID
- API路径
- 业务参数组合
- 动态调整 :
- 根据业务高峰自动调整
- 基于历史数据预测调整
- 支持热更新配置
- 降级策略 :
- 快速失败返回
- 排队等待
- 服务降级
- 返回缓存数据
- 监控重点 :
- 限流触发频率
- 系统负载情况
- 业务影响范围
- 用户体验指标
这个双层限流架构既能防止外部恶意请求,又能保护内部服务不被过载,实现全方位的流量控制。