如何保证消息的可靠投递?

生产者的可靠投递 集群的持久化 消费者的消费确认

难点在于生产者这边,如何去保证消息的发送和数据的入库实现一致性

比如A给B转账100块

1
2
3
4
5
public void transfer(Long uid,Long tUid,int amount) {
userAccountMapper.addAmount(sUid,-amount);
userAccountMapper.addAmount(tUid,ammout);
mqProducer.send("topic",new AccountChangeMsg(sUid,tUid,amount));
}

第一如果需要发送两条mq呢?

第二条mq是在事务里面的,如果事务还没有执行成功,消费者就收到了消息,这时候回查查不到转账记录的。

解决办法:

1.消息入库,事务在提交之后 才发送消息,每条消息都不会直接发送都是直接存到数据库当中,将消息ID存入ThreadLocal当中

消息和其他的都是入库的,已经能严格保证数据库的事务了

2.消息之后的投递怎么做到的,需要重写事务管理器 ,事务提交之后就去查询TreadLocal当中待发送的消息,

发送完成一条数据库就删除一条。

3.就算消息发送失败了,采用定时任务不断地重试,来确保消息发送正确。

消息处理的一致性是非常复杂的

消息中间件三大功能:应用的解耦 削峰填谷 数据分发

04.常见消息中间件介绍_哔哩哔哩_bilibili

kafaka是一种高吞吐量的分布式订阅消息系统,通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

RabbitMQ虽然是万但是很多小公司其实都是达不到这个级别的

Broker代理服务器:负责接收生产者的请求,把消息存储到中间件当中,接受消费者的请求,把消息推送给消费者。

命名服务NameSever:类似注册中心,管理代理服务器的注册中心,管理代理服务器的IP和端口

封装成Message对象 用户ID和积分数量

topic主题:按照业务进行划分,把不同的消息放在不同的主题之下。

消息队列MessageQueue:实际消息在Topic存储的数据结构

Tag可以给消息设置标签,进行消息的过滤

这个东西不是跟着我写的网关一样吗???

rocketMQ部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#docker 拉取
docker pull foxiswho/rocketmq:4.8.0
#启动nameserver
docker run -d -v /usr/local/rocketmq/logs:/opt/docker/rocketmq/logs \
--name rmqnamesrv \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 9876:9876 \
foxiswho/rocketmq:4.8.0 \
sh mqnamesrv
#broker.conf
brokerIP1= 120.78.190.233
namesrvAddr= 120.78.190.233:9876
brokerName=broker_all
#启动broker
docker run -d -v /opt/docker/rocketmq/logs:/usr/local/rocketmq/logs -v /opt/docker/rocketmq/store:/usr/local/rocketmq/store \
-v /opt/docker/rocketmq/conf:/usr/local/rocketmq/conf \
--name rmqbroker \
-e "NAMESRV_ADDR= 47.93.38.210:9876" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 10911:10911 -p 10912:10912 -p 10909:10909 \
foxiswho/rocketmq:4.8.0 \
sh mqbroker -c /usr/local/rocketmq/conf/broker.conf
docker run --name rmqconsole --link rmqnamesrv:rmqnamesrv \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=120.78.190.233:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8180:8080 -t styletang/rocketmq-console-ng

重命名是mv指令