HTTP方式
首先提供一个bean给springboot,
@Configuration public class OKHttpClientConfig { @Bean public OkHttpClient httpClient() { return new OkHttpClient(); } }然后借助okHTTP发送http请求。
@Slf4j @Service public class GroupBuyNotifyService { @Resource private OkHttpClient okHttpClient; public String groupBuyNotify(String apiUrl, String notifyRequestDTOJSON) throws Exception { try { // 1. 构建参数 MediaType mediaType = MediaType.parse("application/json"); RequestBody body = RequestBody.create(mediaType, notifyRequestDTOJSON); Request request = new Request.Builder() .url(apiUrl) .post(body) .addHeader("content-type", "application/json") .build(); // 2. 调用接口 Response response = okHttpClient.newCall(request).execute(); // 3. 返回结果 return response.body().string(); } catch (Exception e) { log.error("拼团回调 HTTP 接口服务异常 {}", apiUrl, e); throw new AppException(ResponseCode.HTTP_EXCEPTION); } } }考虑到应用场景可能是集群部署,使用Redisson加锁,确保不会重复发送。
@Service public class TradePort implements ITradePort { @Resource private GroupBuyNotifyService groupBuyNotifyService; @Resource private IRedisService redisService; @Override public String groupBuyNotify(NotifyTaskEntity notifyTask) throws Exception { RLock lock = redisService.getLock(notifyTask.lockKey()); try { if (lock.tryLock(3, 0, TimeUnit.SECONDS)) { try { // 无效的 notifyUrl 则直接返回成功 if (StringUtils.isBlank(notifyTask.getNotifyUrl()) || "暂无".equals(notifyTask.getNotifyUrl())) { return NotifyTaskHTTPEnumVO.SUCCESS.getCode(); } return groupBuyNotifyService.groupBuyNotify(notifyTask.getNotifyUrl(), notifyTask.getParameterJson()); } finally { if (lock.isLocked() && lock.isHeldByCurrentThread()) { lock.unlock(); } } } return NotifyTaskHTTPEnumVO.NULL.getCode(); } catch (Exception e) { Thread.currentThread().interrupt(); return NotifyTaskHTTPEnumVO.NULL.getCode(); } } }说到集群情况,不由关联到幂等性问题,常见的做法是在数据库解决,譬如给每一条消息一个唯一标识,或者是将id设为主键,不许重复插入;在高并发的情况下也可以如上使用Redisson锁解决。
MQ方式
这里用的是Rocketmq的方式发送回调消息,首先在yaml文件中配置好
# RocketMQ rocketmq: name-server: 127.0.0.1:9876 producer: group: group_buy_producer group-buy-mq: topic: group_buy_topic tags: team_success: team_success team_refund: team_refund然后在publisher类中发送消息
@Slf4j @Component public class EventPublisher { @Autowired private RocketMQTemplate rocketMQTemplate; @Value("${group-buy-mq.topic}") private String topic; public void publish(String tag, String message) { try { // RocketMQ: topic:tag String destination = topic + ":" + tag; rocketMQTemplate.convertAndSend(destination, message); } catch (Exception e) { log.error("发送MQ消息失败 team_success message:{}", message, e); throw e; } } }然后按以下格式配一个或多个listener类,负责接收消息。
@Slf4j @Component @RocketMQMessageListener( topic = "${group-buy-mq.topic}", // Topic selectorExpression = "team_success", // Tag(相当于 routingKey) consumerGroup = "team_success_consumer_group" // 消费组 ) public class TeamSuccessTopicListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("接收消息:{}", message); } }Rocketmq和Rabbitmq的优劣对比
- RocketMQ 使用 Java 语言开发
- 顺序消息、事务消息、消息过滤、定时消息等。顺序消息、事务消息、消息过滤、定时消息,丰富的特性尽可能多地提供思路及解决方案