如何保证消息的可靠投递?
生产者的可靠投递 集群的持久化 消费者的消费确认
难点在于生产者这边,如何去保证消息的发送和数据的入库实现一致性
比如A给B转账100块
1 2 3 4 5
| public void transfer(Long uid,Long tUid,int amount) { userAccountMapper.addAmount(sUid,-amount); userAccountMapper.addAmount(tUid,ammout); mqProducer.send("topic",new AccountChangeMsg(sUid,tUid,amount)); }
|
第一如果需要发送两条mq呢?
第二条mq是在事务里面的,如果事务还没有执行成功,消费者就收到了消息,这时候回查查不到转账记录的。
解决办法:
1.消息入库,事务在提交之后 才发送消息,每条消息都不会直接发送都是直接存到数据库当中,将消息ID存入ThreadLocal当中
消息和其他的都是入库的,已经能严格保证数据库的事务了
2.消息之后的投递怎么做到的,需要重写事务管理器 ,事务提交之后就去查询TreadLocal当中待发送的消息,
发送完成一条数据库就删除一条。
3.就算消息发送失败了,采用定时任务不断地重试,来确保消息发送正确。
消息处理的一致性是非常复杂的
消息中间件三大功能:应用的解耦 削峰填谷 数据分发
04.常见消息中间件介绍_哔哩哔哩_bilibili
kafaka是一种高吞吐量的分布式订阅消息系统,通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
RabbitMQ虽然是万但是很多小公司其实都是达不到这个级别的
Broker代理服务器:负责接收生产者的请求,把消息存储到中间件当中,接受消费者的请求,把消息推送给消费者。
命名服务NameSever:类似注册中心,管理代理服务器的注册中心,管理代理服务器的IP和端口
封装成Message对象 用户ID和积分数量
topic主题:按照业务进行划分,把不同的消息放在不同的主题之下。
消息队列MessageQueue:实际消息在Topic存储的数据结构
Tag可以给消息设置标签,进行消息的过滤
这个东西不是跟着我写的网关一样吗???
rocketMQ部署
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #docker 拉取 docker pull foxiswho/rocketmq:4.8.0 #启动nameserver docker run -d -v /usr/local/rocketmq/logs:/opt/docker/rocketmq/logs \ --name rmqnamesrv \ -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \ -p 9876:9876 \ foxiswho/rocketmq:4.8.0 \ sh mqnamesrv #broker.conf brokerIP1= 120.78.190.233 namesrvAddr= 120.78.190.233:9876 brokerName=broker_all #启动broker docker run -d -v /opt/docker/rocketmq/logs:/usr/local/rocketmq/logs -v /opt/docker/rocketmq/store:/usr/local/rocketmq/store \ -v /opt/docker/rocketmq/conf:/usr/local/rocketmq/conf \ --name rmqbroker \ -e "NAMESRV_ADDR= 47.93.38.210:9876" \ -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \ -p 10911:10911 -p 10912:10912 -p 10909:10909 \ foxiswho/rocketmq:4.8.0 \ sh mqbroker -c /usr/local/rocketmq/conf/broker.conf docker run --name rmqconsole --link rmqnamesrv:rmqnamesrv \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=120.78.190.233:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ -p 8180:8080 -t styletang/rocketmq-console-ng
|
重命名是mv指令