From aa6ab982d19902490d676e30183826d211b771bd Mon Sep 17 00:00:00 2001 From: bobol Date: Thu, 23 May 2024 10:07:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0rabbitmq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-dev.yml | 16 ++ .../src/main/resources/application-prod.yml | 5 +- lzbi-common/pom.xml | 5 + .../com/lzbi/common/config/RabbitConfig.java | 172 ++++++++++++++++++ .../config/ReportExportRabbitConfig.java | 37 ++++ .../controller/DcBusiHisReportController.java | 4 +- .../LogTimesacleHistoryThreeService.java | 125 ++++++------- .../com/lzbi/rabbit/TestRabbitController.java | 26 +++ .../rabbit/consumer/IConsumerService.java | 11 ++ .../consumer/impl/ReportExportConsumer.java | 81 +++++++++ .../rabbit/producer/ReportExportProducer.java | 25 +++ .../service/DcBusiReportRecordService.java | 11 +- 12 files changed, 440 insertions(+), 78 deletions(-) create mode 100644 lzbi-common/src/main/java/com/lzbi/common/config/RabbitConfig.java create mode 100644 lzbi-common/src/main/java/com/lzbi/common/config/ReportExportRabbitConfig.java create mode 100644 lzbi-module/src/main/java/com/lzbi/rabbit/TestRabbitController.java create mode 100644 lzbi-module/src/main/java/com/lzbi/rabbit/consumer/IConsumerService.java create mode 100644 lzbi-module/src/main/java/com/lzbi/rabbit/consumer/impl/ReportExportConsumer.java create mode 100644 lzbi-module/src/main/java/com/lzbi/rabbit/producer/ReportExportProducer.java diff --git a/lzbi-admin/src/main/resources/application-dev.yml b/lzbi-admin/src/main/resources/application-dev.yml index 0886e9e..142663f 100644 --- a/lzbi-admin/src/main/resources/application-dev.yml +++ b/lzbi-admin/src/main/resources/application-dev.yml @@ -5,6 +5,22 @@ server: # redis 配置 spring: + rabbitmq: + host: 10.10.10.55 + port: 5672 + username: admin + password: admin + virtual-host: / + # 这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调 + publisher-confirm-type: correlated + # 保证交换机能把消息推送到队列中 + publisher-returns: true + # 这个配置是保证消费者会消费消息,手动确认 + listener: + simple: + acknowledge-mode: manual + template: + mandatory: true redis: # 地址 host: 10.10.10.55 diff --git a/lzbi-admin/src/main/resources/application-prod.yml b/lzbi-admin/src/main/resources/application-prod.yml index 922d6f1..bf0e9b6 100644 --- a/lzbi-admin/src/main/resources/application-prod.yml +++ b/lzbi-admin/src/main/resources/application-prod.yml @@ -185,4 +185,7 @@ patrol-server: # 消息推送 message-send: - url: ${goal-server.message-server}/message/send \ No newline at end of file + url: ${goal-server.message-server}/message/send + +# 报表文件路径 +reportFilePath: /data/luenmeilz_bi_backend/report \ No newline at end of file diff --git a/lzbi-common/pom.xml b/lzbi-common/pom.xml index dd55f9d..381a6ed 100644 --- a/lzbi-common/pom.xml +++ b/lzbi-common/pom.xml @@ -154,6 +154,11 @@ postgresql 42.2.19 + + + org.springframework.boot + spring-boot-starter-amqp + \ No newline at end of file diff --git a/lzbi-common/src/main/java/com/lzbi/common/config/RabbitConfig.java b/lzbi-common/src/main/java/com/lzbi/common/config/RabbitConfig.java new file mode 100644 index 0000000..d37673d --- /dev/null +++ b/lzbi-common/src/main/java/com/lzbi/common/config/RabbitConfig.java @@ -0,0 +1,172 @@ +package com.lzbi.common.config; + +import org.springframework.amqp.core.AcknowledgeMode; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +/** + * 常用的三个配置如下 + * 1---设置手动应答(acknowledge-mode: manual) + * 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调 + * publisher-confirm-type: correlated + * # 保证交换机能把消息推送到队列中 + * publisher-returns: true + * template: + * # 以下是rabbitmqTemplate配置 + * mandatory: true) + * 3---设置重试 + */ +@Configuration +public class RabbitConfig { + + @Autowired + private ConnectionFactory rabbitConnectionFactory; + + //@Bean 缓存连接池 + //public CachingConnectionFactory rabbitConnectionFactory + + @Autowired + private RabbitProperties properties; + + //这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉 + // 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称 +// @Bean +// public ConnectionFactory connectionFactory() throws Exception { +// //创建工厂类 +// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory(); +// //用户名 +// cachingConnectionFactory.setUsername("gust"); +// //密码 +// cachingConnectionFactory.setPassword("gust"); +// //rabbitMQ地址 +// cachingConnectionFactory.setHost("127.0.0.1"); +// //rabbitMQ端口 +// cachingConnectionFactory.setPort(Integer.parseInt("5672")); +// +// //设置发布消息后回调 +// cachingConnectionFactory.setPublisherReturns(true); +// //设置发布后确认类型,此处确认类型为交互 +// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); +// +// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); +// return cachingConnectionFactory; +// } + + + // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称 + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { + SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); + containerFactory.setConnectionFactory(rabbitConnectionFactory); + + // 并发消费者数量 + containerFactory.setConcurrentConsumers(1); + containerFactory.setMaxConcurrentConsumers(20); + // 预加载消息数量 -- QOS + containerFactory.setPrefetchCount(1); + // 应答模式(此处设置为手动) + containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + //消息序列化方式 + containerFactory.setMessageConverter(new Jackson2JsonMessageConverter()); + // 设置通知调用链 (这里设置的是重试机制的调用链) + containerFactory.setAdviceChain(RetryInterceptorBuilder.stateless().recoverer(new RejectAndDontRequeueRecoverer()).retryOperations(rabbitRetryTemplate()).build()); + return containerFactory; + } + + // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称 + @Bean + public RabbitTemplate rabbitTemplate() { + RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); + //默认是用jdk序列化 + //数据转换为json存入消息队列,方便可视化界面查看消息数据 + rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 + rabbitTemplate.setMandatory(true); + //此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链 + rabbitTemplate.setRetryTemplate(rabbitRetryTemplate()); + //CorrelationData correlationData, boolean b, String s + rabbitTemplate.setConfirmCallback((correlationData, b, s) -> { + System.out.println("ConfirmCallback " + "相关数据:" + correlationData); + System.out.println("ConfirmCallback " + "确认情况:" + b); + System.out.println("ConfirmCallback " + "原因:" + s); + }); + //Message message, int i, String s, String s1, String s2 + rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { + System.out.println("ReturnCallback: " + "消息:" + message); + System.out.println("ReturnCallback: " + "回应码:" + i); + System.out.println("ReturnCallback: " + "回应消息:" + s); + System.out.println("ReturnCallback: " + "交换机:" + s1); + System.out.println("ReturnCallback: " + "路由键:" + s2); + }); + + return rabbitTemplate; + } + + // 重试的Template + @Bean + public RetryTemplate rabbitRetryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + // 设置监听 调用重试处理过程 + retryTemplate.registerListener(new RetryListener() { + @Override + public boolean open(RetryContext retryContext, RetryCallback retryCallback) { + // 执行之前调用 (返回false时会终止执行) + return true; + } + + @Override + public void close(RetryContext retryContext, RetryCallback retryCallback, Throwable throwable) { + // 重试结束的时候调用 (最后一次重试 ) + System.out.println("---------------最后一次调用"); + + return; + } + + @Override + public void onError(RetryContext retryContext, RetryCallback retryCallback, Throwable throwable) { + // 异常 都会调用 + System.err.println("-----第{}次调用" + retryContext.getRetryCount()); + } + }); + retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); + retryTemplate.setRetryPolicy(retryPolicyByProperties()); + return retryTemplate; + } + + @Bean + public ExponentialBackOffPolicy backOffPolicyByProperties() { + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds(); + long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds(); + double multiplier = properties.getListener().getSimple().getRetry().getMultiplier(); + // 重试间隔 + backOffPolicy.setInitialInterval(initialInterval * 1000); + // 重试最大间隔 + backOffPolicy.setMaxInterval(maxInterval * 1000); + // 重试间隔乘法策略 + backOffPolicy.setMultiplier(multiplier); + return backOffPolicy; + } + + @Bean + public SimpleRetryPolicy retryPolicyByProperties() { + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts(); + retryPolicy.setMaxAttempts(maxAttempts); + return retryPolicy; + } +} \ No newline at end of file diff --git a/lzbi-common/src/main/java/com/lzbi/common/config/ReportExportRabbitConfig.java b/lzbi-common/src/main/java/com/lzbi/common/config/ReportExportRabbitConfig.java new file mode 100644 index 0000000..34a241d --- /dev/null +++ b/lzbi-common/src/main/java/com/lzbi/common/config/ReportExportRabbitConfig.java @@ -0,0 +1,37 @@ +package com.lzbi.common.config; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ReportExportRabbitConfig { + //创建一个名为TestDirectQueue的队列 + @Bean + public Queue ReportExportDirectQueue() { + // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 + // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable + // autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。 + // arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。 + return new Queue("ReportExportDirectQueue", true); + } + + //创建一个名为TestDirectExchange的Direct类型的交换机 + @Bean + public DirectExchange ReportExportDirectExchange() { + // durable:是否持久化,默认是false,持久化交换机。 + // autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。 + // arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机 + return new DirectExchange("ReportExportDirectExchange", true, false); + } + + //绑定交换机和队列 + @Bean + public Binding bindingDirect() { + //bind队列to交换机中with路由key(routing key) + return BindingBuilder.bind(ReportExportDirectQueue()).to(ReportExportDirectExchange()).with("123"); + } +} \ No newline at end of file diff --git a/lzbi-module/src/main/java/com/lzbi/bi/controller/DcBusiHisReportController.java b/lzbi-module/src/main/java/com/lzbi/bi/controller/DcBusiHisReportController.java index e05a5a6..ad76529 100644 --- a/lzbi-module/src/main/java/com/lzbi/bi/controller/DcBusiHisReportController.java +++ b/lzbi-module/src/main/java/com/lzbi/bi/controller/DcBusiHisReportController.java @@ -192,7 +192,7 @@ public class DcBusiHisReportController extends BaseController { @PostMapping("/export") public AjaxResult export(@Validated @RequestBody LogTimeThreeQueryParamVo queryVo) { LoginUser loginUser = SecurityUtils.getLoginUser(); - logTimesacleHistoryThreeService.getExcellData(queryVo, 2, loginUser); + logTimesacleHistoryThreeService.exportDataReport(queryVo, 2, loginUser); return AjaxResult.success(); } @@ -201,7 +201,7 @@ public class DcBusiHisReportController extends BaseController { @PostMapping("/export2") public AjaxResult export2(@Validated @RequestBody LogTimeThreeQueryParamVo queryVo) { LoginUser loginUser = SecurityUtils.getLoginUser(); - logTimesacleHistoryThreeService.getExcellData(queryVo, 1, loginUser); + logTimesacleHistoryThreeService.exportDataReport(queryVo, 1, loginUser); return AjaxResult.success(); } diff --git a/lzbi-module/src/main/java/com/lzbi/code/service/LogTimesacleHistoryThreeService.java b/lzbi-module/src/main/java/com/lzbi/code/service/LogTimesacleHistoryThreeService.java index 55d3473..3f0843d 100644 --- a/lzbi-module/src/main/java/com/lzbi/code/service/LogTimesacleHistoryThreeService.java +++ b/lzbi-module/src/main/java/com/lzbi/code/service/LogTimesacleHistoryThreeService.java @@ -88,10 +88,7 @@ public class LogTimesacleHistoryThreeService extends ServiceImpl> excelData; - // 设置头部数据 - DcDymicHeaderQueryVo dcDymicHeaderQueryVo = new DcDymicHeaderQueryVo(); - dcDymicHeaderQueryVo.setParamModels(queryVo.getQueryParamClass()); - dcDymicHeaderQueryVo.setDeviceUuids(queryVo.getDeviceUuids()); - List dcDymicReportHeaderVos = this.selectHeaderInfo(dcDymicHeaderQueryVo); - List collect = dcDymicReportHeaderVos.stream().map(DcDymicReportHeaderVo::getParamCode).collect(Collectors.toList()); - queryVo.setQueryParamCodes(collect); - List list = this.selectDetailByQuery2(queryVo); - if (type == 1) { - excelData = getMapListDevice(list, dcDymicReportHeaderVos); - } else { - excelData = getMapList(list, dcDymicReportHeaderVos); - } - toExcel(dcBusiReportRecord, excelData); - // 更新报表记录 - dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.NOT_DOWNLOAD); - dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); - } catch (IOException e) { - log.error("导出数据异常", e); - // 更新报表记录 - dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.ERROR); - dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); + saveReportRecord(fileName, loginUser); + } + + //获取excel数据 1 excel设备参数为列模式 2 excel时间为列模式 3 浏览器设备参数为列模式 + //注意 excel有最大列数显示255 ,超出列数会报错 + public List> getExcellData(DcBusiReportRecord dcBusiReportRecord, LogTimeThreeQueryParamVo queryVo, int type) { + List> excelData; + // 设置头部数据 + DcDymicHeaderQueryVo dcDymicHeaderQueryVo = new DcDymicHeaderQueryVo(); + dcDymicHeaderQueryVo.setParamModels(queryVo.getQueryParamClass()); + dcDymicHeaderQueryVo.setDeviceUuids(queryVo.getDeviceUuids()); + List dcDymicReportHeaderVos = this.selectHeaderInfo(dcDymicHeaderQueryVo); + List collect = dcDymicReportHeaderVos.stream().map(DcDymicReportHeaderVo::getParamCode).collect(Collectors.toList()); + queryVo.setQueryParamCodes(collect); + List list = this.selectDetailByQuery2(queryVo); + if (type == 1) { + excelData = getMapListDevice(list, dcDymicReportHeaderVos); + } else { + excelData = getMapList(list, dcDymicReportHeaderVos); } + return excelData; + } /** @@ -184,21 +177,6 @@ public class LogTimesacleHistoryThreeService extends ServiceImpl> mapList) throws IOException { - Path folder = Paths.get(reportFilePath); - if (!Files.exists(folder)) { - Files.createDirectories(folder); - } - OutputStream out = Files.newOutputStream(Paths.get(dcBusiReportRecord.getUrl())); - BigExcelWriter writer = ExcelUtil.getBigWriter(); - writer.write(mapList, true); - writer.flush(out, true); - // 关闭writer,释放内存 - writer.close(); - //此处记得关闭输出流 - IoUtil.close(out); - } - public List selectHeaderInfo(DcDymicHeaderQueryVo queryVo) { return baseMapper.selectHeaderInfo(queryVo); } @@ -226,36 +204,37 @@ public class LogTimesacleHistoryThreeService extends ServiceImpl weatherReportList = baseMapper.getWeatherReport(exportWeatherReq); - List> excelData; - if (CollectionUtils.isEmpty(weatherReportList)) { - excelData = new ArrayList<>(); + List weatherReportList = baseMapper.getWeatherReport(exportWeatherReq); + List> excelData; + if (CollectionUtils.isEmpty(weatherReportList)) { + excelData = new ArrayList<>(); + Map map = new HashMap<>(); + map.put("时间", null); + map.put("地区", null); + map.put("天气", null); + map.put("温度", null); + excelData.add(map); + } else { + excelData = weatherReportList.stream().map(weatherReportVO -> { Map map = new HashMap<>(); - map.put("时间", null); - map.put("地区", null); - map.put("天气", null); - map.put("温度", null); - excelData.add(map); - } else { - excelData = weatherReportList.stream().map(weatherReportVO -> { - Map map = new HashMap<>(); - map.put("时间", weatherReportVO.getTimestampString()); - map.put("地区", weatherReportVO.getCity()); - map.put("天气", weatherReportVO.getWeather()); - map.put("温度", weatherReportVO.getTemperature()); - return map; - }).collect(Collectors.toList()); - } - toExcel(dcBusiReportRecord, excelData); - // 更新报表记录 - dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.NOT_DOWNLOAD); - dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); - } catch (IOException e) { - log.error("导出数据异常", e); - // 更新报表记录 - dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.ERROR); - dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); + map.put("时间", weatherReportVO.getTimestampString()); + map.put("地区", weatherReportVO.getCity()); + map.put("天气", weatherReportVO.getWeather()); + map.put("温度", weatherReportVO.getTemperature()); + return map; + }).collect(Collectors.toList()); } +// try { +// +// toExcel(dcBusiReportRecord, excelData); +// // 更新报表记录 +// dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.NOT_DOWNLOAD); +// dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); +// } catch (IOException e) { +// log.error("导出数据异常", e); +// // 更新报表记录 +// dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.ERROR); +// dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); +// } } } diff --git a/lzbi-module/src/main/java/com/lzbi/rabbit/TestRabbitController.java b/lzbi-module/src/main/java/com/lzbi/rabbit/TestRabbitController.java new file mode 100644 index 0000000..1bc788c --- /dev/null +++ b/lzbi-module/src/main/java/com/lzbi/rabbit/TestRabbitController.java @@ -0,0 +1,26 @@ +package com.lzbi.rabbit; + +import com.lzbi.common.core.domain.AjaxResult; +import com.lzbi.rabbit.producer.ReportExportProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.atomic.AtomicLong; + +@RestController +@RequestMapping("/test/rabbit") +public class TestRabbitController { + + @Autowired + private ReportExportProducer reportExportProducer; + + private static final AtomicLong id = new AtomicLong(0L); + + @GetMapping("/sendMessage") + public AjaxResult sendDirectMessage(){ + reportExportProducer.sendMsg(id.addAndGet(1L)); + return AjaxResult.success(); + } +} \ No newline at end of file diff --git a/lzbi-module/src/main/java/com/lzbi/rabbit/consumer/IConsumerService.java b/lzbi-module/src/main/java/com/lzbi/rabbit/consumer/IConsumerService.java new file mode 100644 index 0000000..7b85a87 --- /dev/null +++ b/lzbi-module/src/main/java/com/lzbi/rabbit/consumer/IConsumerService.java @@ -0,0 +1,11 @@ +package com.lzbi.rabbit.consumer; + +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; + +public interface IConsumerService { + + void process(Object msg, Channel channel, Message message); + + void business(Object msg); +} diff --git a/lzbi-module/src/main/java/com/lzbi/rabbit/consumer/impl/ReportExportConsumer.java b/lzbi-module/src/main/java/com/lzbi/rabbit/consumer/impl/ReportExportConsumer.java new file mode 100644 index 0000000..2636615 --- /dev/null +++ b/lzbi-module/src/main/java/com/lzbi/rabbit/consumer/impl/ReportExportConsumer.java @@ -0,0 +1,81 @@ +package com.lzbi.rabbit.consumer.impl; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.poi.excel.BigExcelWriter; +import cn.hutool.poi.excel.ExcelUtil; +import com.lzbi.common.constant.BizConstants; +import com.lzbi.rabbit.consumer.IConsumerService; +import com.lzbi.report.domain.DcBusiReportRecord; +import com.lzbi.report.service.DcBusiReportRecordService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; + +@Slf4j +@RabbitListener(queues = "ReportExportDirectQueue") +@Component +public class ReportExportConsumer implements IConsumerService { + + @Autowired + private DcBusiReportRecordService dcBusiReportRecordService; + + @RabbitHandler + @Override + public void process(Object msg, Channel channel, Message message) { + log.info("收到消息:{}", msg); + try { + this.business(msg); + //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。 + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void business(Object msg) { + Long reportRecordId = (Long) msg; + DcBusiReportRecord dcBusiReportRecord = dcBusiReportRecordService.selectDcBusiReportRecordById(reportRecordId); +// try { +// +// // 更新报表记录 +// dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.NOT_DOWNLOAD); +// dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); +// } catch (IOException e) { +// log.error("导出数据异常", e); +// // 更新报表记录 +// dcBusiReportRecord.setStatus(BizConstants.ReportRecordStatus.ERROR); +// dcBusiReportRecordService.updateDcBusiReportRecord(dcBusiReportRecord); +// } + } + + + +// private void toExcel(DcBusiReportRecord dcBusiReportRecord, List> mapList) throws IOException { +// Path folder = Paths.get(reportFilePath); +// if (!Files.exists(folder)) { +// Files.createDirectories(folder); +// } +// OutputStream out = Files.newOutputStream(Paths.get(dcBusiReportRecord.getUrl())); +// BigExcelWriter writer = ExcelUtil.getBigWriter(); +// writer.write(mapList, true); +// writer.flush(out, true); +// // 关闭writer,释放内存 +// writer.close(); +// //此处记得关闭输出流 +// IoUtil.close(out); +// } + +} \ No newline at end of file diff --git a/lzbi-module/src/main/java/com/lzbi/rabbit/producer/ReportExportProducer.java b/lzbi-module/src/main/java/com/lzbi/rabbit/producer/ReportExportProducer.java new file mode 100644 index 0000000..534bd8b --- /dev/null +++ b/lzbi-module/src/main/java/com/lzbi/rabbit/producer/ReportExportProducer.java @@ -0,0 +1,25 @@ +package com.lzbi.rabbit.producer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +/** + * 报表生产者 + */ +@Slf4j +@Component +public class ReportExportProducer { + + @Autowired + private RabbitTemplate rabbitTemplate; + + public void sendMsg(Long reportRecordId) { + log.info("发送消息,导出报表,报表记录id:{}", reportRecordId); + rabbitTemplate.convertAndSend("ReportExportDirectExchange", "123", reportRecordId, new CorrelationData(UUID.randomUUID().toString())); + } +} diff --git a/lzbi-module/src/main/java/com/lzbi/report/service/DcBusiReportRecordService.java b/lzbi-module/src/main/java/com/lzbi/report/service/DcBusiReportRecordService.java index 6230443..dbc5d7c 100644 --- a/lzbi-module/src/main/java/com/lzbi/report/service/DcBusiReportRecordService.java +++ b/lzbi-module/src/main/java/com/lzbi/report/service/DcBusiReportRecordService.java @@ -8,7 +8,9 @@ import java.util.List; import cn.hutool.core.lang.Snowflake; import cn.hutool.core.util.IdUtil; import com.lzbi.common.constant.BizConstants; +import com.lzbi.rabbit.producer.ReportExportProducer; import org.apache.commons.compress.utils.IOUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.lzbi.report.domain.DcBusiReportRecord; @@ -36,6 +38,9 @@ public class DcBusiReportRecordService extends ServiceImpl