bobol
6 months ago
12 changed files with 440 additions and 78 deletions
@ -0,0 +1,172 @@ |
|||||
|
package com.lzbi.common.config; |
||||
|
|
||||
|
import org.springframework.amqp.core.AcknowledgeMode; |
||||
|
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; |
||||
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; |
||||
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||
|
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; |
||||
|
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.boot.autoconfigure.amqp.RabbitProperties; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.retry.RetryCallback; |
||||
|
import org.springframework.retry.RetryContext; |
||||
|
import org.springframework.retry.RetryListener; |
||||
|
import org.springframework.retry.backoff.ExponentialBackOffPolicy; |
||||
|
import org.springframework.retry.policy.SimpleRetryPolicy; |
||||
|
import org.springframework.retry.support.RetryTemplate; |
||||
|
|
||||
|
/** |
||||
|
* 常用的三个配置如下 |
||||
|
* 1---设置手动应答(acknowledge-mode: manual) |
||||
|
* 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调 |
||||
|
* publisher-confirm-type: correlated |
||||
|
* # 保证交换机能把消息推送到队列中 |
||||
|
* publisher-returns: true |
||||
|
* template: |
||||
|
* # 以下是rabbitmqTemplate配置 |
||||
|
* mandatory: true) |
||||
|
* 3---设置重试 |
||||
|
*/ |
||||
|
@Configuration |
||||
|
public class RabbitConfig { |
||||
|
|
||||
|
@Autowired |
||||
|
private ConnectionFactory rabbitConnectionFactory; |
||||
|
|
||||
|
//@Bean 缓存连接池
|
||||
|
//public CachingConnectionFactory rabbitConnectionFactory
|
||||
|
|
||||
|
@Autowired |
||||
|
private RabbitProperties properties; |
||||
|
|
||||
|
//这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉
|
||||
|
// 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称
|
||||
|
// @Bean
|
||||
|
// public ConnectionFactory connectionFactory() throws Exception {
|
||||
|
// //创建工厂类
|
||||
|
// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
|
||||
|
// //用户名
|
||||
|
// cachingConnectionFactory.setUsername("gust");
|
||||
|
// //密码
|
||||
|
// cachingConnectionFactory.setPassword("gust");
|
||||
|
// //rabbitMQ地址
|
||||
|
// cachingConnectionFactory.setHost("127.0.0.1");
|
||||
|
// //rabbitMQ端口
|
||||
|
// cachingConnectionFactory.setPort(Integer.parseInt("5672"));
|
||||
|
//
|
||||
|
// //设置发布消息后回调
|
||||
|
// cachingConnectionFactory.setPublisherReturns(true);
|
||||
|
// //设置发布后确认类型,此处确认类型为交互
|
||||
|
// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
|
||||
|
//
|
||||
|
// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
|
||||
|
// return cachingConnectionFactory;
|
||||
|
// }
|
||||
|
|
||||
|
|
||||
|
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
|
||||
|
@Bean |
||||
|
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { |
||||
|
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); |
||||
|
containerFactory.setConnectionFactory(rabbitConnectionFactory); |
||||
|
|
||||
|
// 并发消费者数量
|
||||
|
containerFactory.setConcurrentConsumers(1); |
||||
|
containerFactory.setMaxConcurrentConsumers(20); |
||||
|
// 预加载消息数量 -- QOS
|
||||
|
containerFactory.setPrefetchCount(1); |
||||
|
// 应答模式(此处设置为手动)
|
||||
|
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); |
||||
|
//消息序列化方式
|
||||
|
containerFactory.setMessageConverter(new Jackson2JsonMessageConverter()); |
||||
|
// 设置通知调用链 (这里设置的是重试机制的调用链)
|
||||
|
containerFactory.setAdviceChain(RetryInterceptorBuilder.stateless().recoverer(new RejectAndDontRequeueRecoverer()).retryOperations(rabbitRetryTemplate()).build()); |
||||
|
return containerFactory; |
||||
|
} |
||||
|
|
||||
|
// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
|
||||
|
@Bean |
||||
|
public RabbitTemplate rabbitTemplate() { |
||||
|
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); |
||||
|
//默认是用jdk序列化
|
||||
|
//数据转换为json存入消息队列,方便可视化界面查看消息数据
|
||||
|
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); |
||||
|
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
|
||||
|
rabbitTemplate.setMandatory(true); |
||||
|
//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
|
||||
|
rabbitTemplate.setRetryTemplate(rabbitRetryTemplate()); |
||||
|
//CorrelationData correlationData, boolean b, String s
|
||||
|
rabbitTemplate.setConfirmCallback((correlationData, b, s) -> { |
||||
|
System.out.println("ConfirmCallback " + "相关数据:" + correlationData); |
||||
|
System.out.println("ConfirmCallback " + "确认情况:" + b); |
||||
|
System.out.println("ConfirmCallback " + "原因:" + s); |
||||
|
}); |
||||
|
//Message message, int i, String s, String s1, String s2
|
||||
|
rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { |
||||
|
System.out.println("ReturnCallback: " + "消息:" + message); |
||||
|
System.out.println("ReturnCallback: " + "回应码:" + i); |
||||
|
System.out.println("ReturnCallback: " + "回应消息:" + s); |
||||
|
System.out.println("ReturnCallback: " + "交换机:" + s1); |
||||
|
System.out.println("ReturnCallback: " + "路由键:" + s2); |
||||
|
}); |
||||
|
|
||||
|
return rabbitTemplate; |
||||
|
} |
||||
|
|
||||
|
// 重试的Template
|
||||
|
@Bean |
||||
|
public RetryTemplate rabbitRetryTemplate() { |
||||
|
RetryTemplate retryTemplate = new RetryTemplate(); |
||||
|
// 设置监听 调用重试处理过程
|
||||
|
retryTemplate.registerListener(new RetryListener() { |
||||
|
@Override |
||||
|
public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) { |
||||
|
// 执行之前调用 (返回false时会终止执行)
|
||||
|
return true; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { |
||||
|
// 重试结束的时候调用 (最后一次重试 )
|
||||
|
System.out.println("---------------最后一次调用"); |
||||
|
|
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { |
||||
|
// 异常 都会调用
|
||||
|
System.err.println("-----第{}次调用" + retryContext.getRetryCount()); |
||||
|
} |
||||
|
}); |
||||
|
retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); |
||||
|
retryTemplate.setRetryPolicy(retryPolicyByProperties()); |
||||
|
return retryTemplate; |
||||
|
} |
||||
|
|
||||
|
@Bean |
||||
|
public ExponentialBackOffPolicy backOffPolicyByProperties() { |
||||
|
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); |
||||
|
long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds(); |
||||
|
long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds(); |
||||
|
double multiplier = properties.getListener().getSimple().getRetry().getMultiplier(); |
||||
|
// 重试间隔
|
||||
|
backOffPolicy.setInitialInterval(initialInterval * 1000); |
||||
|
// 重试最大间隔
|
||||
|
backOffPolicy.setMaxInterval(maxInterval * 1000); |
||||
|
// 重试间隔乘法策略
|
||||
|
backOffPolicy.setMultiplier(multiplier); |
||||
|
return backOffPolicy; |
||||
|
} |
||||
|
|
||||
|
@Bean |
||||
|
public SimpleRetryPolicy retryPolicyByProperties() { |
||||
|
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); |
||||
|
int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts(); |
||||
|
retryPolicy.setMaxAttempts(maxAttempts); |
||||
|
return retryPolicy; |
||||
|
} |
||||
|
} |
@ -0,0 +1,37 @@ |
|||||
|
package com.lzbi.common.config; |
||||
|
|
||||
|
import org.springframework.amqp.core.Binding; |
||||
|
import org.springframework.amqp.core.BindingBuilder; |
||||
|
import org.springframework.amqp.core.DirectExchange; |
||||
|
import org.springframework.amqp.core.Queue; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
@Configuration |
||||
|
public class ReportExportRabbitConfig { |
||||
|
//创建一个名为TestDirectQueue的队列
|
||||
|
@Bean |
||||
|
public Queue ReportExportDirectQueue() { |
||||
|
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
|
||||
|
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
|
||||
|
// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。
|
||||
|
// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。
|
||||
|
return new Queue("ReportExportDirectQueue", true); |
||||
|
} |
||||
|
|
||||
|
//创建一个名为TestDirectExchange的Direct类型的交换机
|
||||
|
@Bean |
||||
|
public DirectExchange ReportExportDirectExchange() { |
||||
|
// durable:是否持久化,默认是false,持久化交换机。
|
||||
|
// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。
|
||||
|
// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机
|
||||
|
return new DirectExchange("ReportExportDirectExchange", true, false); |
||||
|
} |
||||
|
|
||||
|
//绑定交换机和队列
|
||||
|
@Bean |
||||
|
public Binding bindingDirect() { |
||||
|
//bind队列to交换机中with路由key(routing key)
|
||||
|
return BindingBuilder.bind(ReportExportDirectQueue()).to(ReportExportDirectExchange()).with("123"); |
||||
|
} |
||||
|
} |
@ -0,0 +1,26 @@ |
|||||
|
package com.lzbi.rabbit; |
||||
|
|
||||
|
import com.lzbi.common.core.domain.AjaxResult; |
||||
|
import com.lzbi.rabbit.producer.ReportExportProducer; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.web.bind.annotation.GetMapping; |
||||
|
import org.springframework.web.bind.annotation.RequestMapping; |
||||
|
import org.springframework.web.bind.annotation.RestController; |
||||
|
|
||||
|
import java.util.concurrent.atomic.AtomicLong; |
||||
|
|
||||
|
@RestController |
||||
|
@RequestMapping("/test/rabbit") |
||||
|
public class TestRabbitController { |
||||
|
|
||||
|
@Autowired |
||||
|
private ReportExportProducer reportExportProducer; |
||||
|
|
||||
|
private static final AtomicLong id = new AtomicLong(0L); |
||||
|
|
||||
|
@GetMapping("/sendMessage") |
||||
|
public AjaxResult sendDirectMessage(){ |
||||
|
reportExportProducer.sendMsg(id.addAndGet(1L)); |
||||
|
return AjaxResult.success(); |
||||
|
} |
||||
|
} |
@ -0,0 +1,11 @@ |
|||||
|
package com.lzbi.rabbit.consumer; |
||||
|
|
||||
|
import com.rabbitmq.client.Channel; |
||||
|
import org.springframework.amqp.core.Message; |
||||
|
|
||||
|
public interface IConsumerService { |
||||
|
|
||||
|
void process(Object msg, Channel channel, Message message); |
||||
|
|
||||
|
void business(Object msg); |
||||
|
} |
@ -0,0 +1,81 @@ |
|||||
|
package com.lzbi.rabbit.consumer.impl; |
||||
|
|
||||
|
import cn.hutool.core.io.IoUtil; |
||||
|
import cn.hutool.poi.excel.BigExcelWriter; |
||||
|
import cn.hutool.poi.excel.ExcelUtil; |
||||
|
import com.lzbi.common.constant.BizConstants; |
||||
|
import com.lzbi.rabbit.consumer.IConsumerService; |
||||
|
import com.lzbi.report.domain.DcBusiReportRecord; |
||||
|
import com.lzbi.report.service.DcBusiReportRecordService; |
||||
|
import com.rabbitmq.client.Channel; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.amqp.core.Message; |
||||
|
import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.io.OutputStream; |
||||
|
import java.nio.file.Files; |
||||
|
import java.nio.file.Path; |
||||
|
import java.nio.file.Paths; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
@Slf4j |
||||
|
@RabbitListener(queues = "ReportExportDirectQueue") |
||||
|
@Component |
||||
|
public class ReportExportConsumer implements IConsumerService { |
||||
|
|
||||
|
@Autowired |
||||
|
private DcBusiReportRecordService dcBusiReportRecordService; |
||||
|
|
||||
|
@RabbitHandler |
||||
|
@Override |
||||
|
public void process(Object msg, Channel channel, Message message) { |
||||
|
log.info("收到消息:{}", msg); |
||||
|
try { |
||||
|
this.business(msg); |
||||
|
//由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
|
||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
||||
|
} catch (IOException e) { |
||||
|
throw new RuntimeException(e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void business(Object msg) { |
||||
|
Long reportRecordId = (Long) msg; |
||||
|
DcBusiReportRecord dcBusiReportRecord = dcBusiReportRecordService.selectDcBusiReportRecordById(reportRecordId); |
||||
|
// try {
|
||||
|
//
|
||||
|
// // 更新报表记录
|
||||
|
// dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.NOT_DOWNLOAD);
|
||||
|
// dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord);
|
||||
|
// } catch (IOException e) {
|
||||
|
// log.error("导出数据异常", e);
|
||||
|
// // 更新报表记录
|
||||
|
// dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.ERROR);
|
||||
|
// dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord);
|
||||
|
// }
|
||||
|
} |
||||
|
|
||||
|
|
||||
|
|
||||
|
// private void toExcel(DcBusiReportRecord dcBusiReportRecord, List<Map<String, Object>> mapList) throws IOException {
|
||||
|
// Path folder = Paths.get(reportFilePath);
|
||||
|
// if (!Files.exists(folder)) {
|
||||
|
// Files.createDirectories(folder);
|
||||
|
// }
|
||||
|
// OutputStream out = Files.newOutputStream(Paths.get(dcBusiReportRecord.getUrl()));
|
||||
|
// BigExcelWriter writer = ExcelUtil.getBigWriter();
|
||||
|
// writer.write(mapList, true);
|
||||
|
// writer.flush(out, true);
|
||||
|
// // 关闭writer,释放内存
|
||||
|
// writer.close();
|
||||
|
// //此处记得关闭输出流
|
||||
|
// IoUtil.close(out);
|
||||
|
// }
|
||||
|
|
||||
|
} |
@ -0,0 +1,25 @@ |
|||||
|
package com.lzbi.rabbit.producer; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.amqp.rabbit.connection.CorrelationData; |
||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.UUID; |
||||
|
|
||||
|
/** |
||||
|
* 报表生产者 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class ReportExportProducer { |
||||
|
|
||||
|
@Autowired |
||||
|
private RabbitTemplate rabbitTemplate; |
||||
|
|
||||
|
public void sendMsg(Long reportRecordId) { |
||||
|
log.info("发送消息,导出报表,报表记录id:{}", reportRecordId); |
||||
|
rabbitTemplate.convertAndSend("ReportExportDirectExchange", "123", reportRecordId, new CorrelationData(UUID.randomUUID().toString())); |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue