900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > Java架构学习(三十三)ActivityMQ基础消息中间件概述异步与同步MQ作用MQ件通讯方式MQ

Java架构学习(三十三)ActivityMQ基础消息中间件概述异步与同步MQ作用MQ件通讯方式MQ

时间:2022-11-03 22:20:44

相关推荐

Java架构学习(三十三)ActivityMQ基础消息中间件概述异步与同步MQ作用MQ件通讯方式MQ

ActivityMQ基础

一、消息中间件概述

什么是消息中间件?与传统的传输通讯有什么区别?答:异步,无需等待,消息存放在队列里面。调用别人的接口时候,返回的是同步还是异步?答:发送请求/响应是同步的。

什么是同步?什么是异步?

当A项目调用B接口时,B接口有延迟,会产生什么场景?答:1、A会一直等待,等待B影响给我,http有设置超时时间。2、同步接口中,如果网络延迟,可能会产生重复提交接口产生的重复提交如何解决?采用token(令牌)验证。防止机器模拟请求采用 验证码的方式验证、A调用B如果B没有及时响应。A项目默认有3次重试补偿机制。将该信息存放在日志表(补偿表)中,再使用定时job每天晚上健康检查数据,可以手动补偿B责任:处理幂等性问题。

请求与响应 = 同步请求方式同步请求方式:缺点:1、会阻塞、超时、还会发生重复提交导致数据不一致。

二、为什么要使用消息中间件

为什么要使用消息中间件?答:消息中间件可以解决高并发、两种通讯方式:点对点通讯、发布订阅、异步通讯(无需等待)

生产者:发送消息,提供接口的,主要向队列中发送消息队列:存放消息地址消息:发送的报文信息消费者:调用接口的,主要从队列中获取消息1、第一种通讯方式(消息模型) 点对点通讯(队列)生产者 队列 消费者 关系

消息中间件队列的作用:生产者:主要向队列发送消息。消费者:主要从队列中获取消息步骤:1、生产者向队列进行发送消息,如果消费者不在,队列就会将消息缓存2、消费者向队列中获取到消息之后,消费成功之后,该消息队列直接被清除掉。(不清除就会产生重复消费问题)

生产者向队列中生产高并发流量,消费者会不会挂掉?答:不会,因为队列会缓存消息。为什么MQ能够解决高并发问题?答:不会立马处理那么多的消息,队列会进行缓存,进行排队。

队列可以做持久化的

三、消息中间件通讯方式

JMS:java发送消息,客户端与服务器进行通讯的方式,可以理解成java的消息中间件消息模型:1、点对点通讯方式:流程:生产者 队列 消费者2、发布订阅流程:生产者 主题 消费者第一种通讯方式:点对点通讯方式: 一对一 异步通讯,生产者生产的消息只允许有一个消费者进行消费。

第二种通讯方式:发布订阅 一个生产者 可以多个消费者 一对多

四、消息中间件应用场景

消息中间件应用场景:1、点对点进行通讯:一对一。没有使用消息中间件流程:

使用消息中间件后流程:

点对点通讯,程序效率提高

2、发布订阅:一对多。应用:推送,但是一般不会做,都会使用第三方推送。如:极光推送、融云、环信

五、ActivityMQ 安装

MQ种类:RabbitMQ、Redis(也可以做发布订阅)、ZeroMQ、Kafka、RocketMQ、OnesMQ(阿里收费)kafka主要是存放日志的,

六、使用ActivityMQ发送消息

导入依赖:

<!-- 引入activityMQ依赖包 --><dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency></dependencies>

生产者代码:ProducerTest.java

package com.leeue.activemq.test01;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/*** @classDesc: 功能描述:(这是一个生产者 使用activityMQ)* @author:<a href="leeue@">李月</a>* @Version:v1.0* @createTime:9月21日 下午5:51:45*/public class ProducerTest {public static void main(String[] args) throws JMSException {//1、获取mq连接工程ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616"); // 这个是MQ真正的后台通讯地址。//2、创建连接Connection createConnection = activeMQConnectionFactory.createConnection();//3、启动连接createConnection.start();//4、创建会话工厂 设置成默认的Session session = createConnection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);//5.创建队列Destination destination = session.createQueue("leeueMsg");//6. 得到生产者MessageProducer producer = session.createProducer(destination);//7.开始生产 这里设置成不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);for(int i = 0 ; i < 5; i++) {senMsg(session, producer, "我是生产者:"+i);}System.out.println("生产者发送消息完毕...");}public static void senMsg(Session session, MessageProducer producer,String msg) throws JMSException {//1.创建文本消息TextMessage textMessage = session.createTextMessage("Hello MQ"+msg);//2.发送消息producer.send(textMessage);}}

当生产者发送消息的时候,没有消费者,那么生产者发送的消息就会存放在队列里面去图:

消费者代码:

package com.leeue.activemq.test01;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/*** @classDesc: 功能描述:(这是个消费者)* @author:<a href="leeue@">李月</a>* @Version:v1.0* @createTime:9月22日 下午12:25:29*/public class Consumer001 {public static void main(String[] args) throws JMSException {// 1、获取mq连接工程ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // 这个是MQ真正的后台通讯地址。// 2、创建连接Connection createConnection = activeMQConnectionFactory.createConnection();// 3、启动连接createConnection.start();// 4、创建会话工厂 设置成默认的Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5.创建队列Destination destination = session.createQueue("leeueMsg");// 6. 创建一个消费者MessageConsumer consumer = session.createConsumer(destination);// 7.建立一个while循环一直监听消息while (true) {// 监听消息 因为刚刚消费者传送的报文就是 TextMessage格式的,所以这边强转就行了.TextMessage textMessage = (TextMessage) consumer.receive();if (textMessage != null) {System.out.println("消费者获取到消息:" + textMessage.getText());} else {break;}}System.out.println("消费者消费完毕...");}}

七、ActivityMQ的消息订阅

1、主题代码ProducerTest02.java

package com.leeue.activemq.test02;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/*** @classDesc: 功能描述:(这是一个主题 使用activityMQ)* @author:<a href="leeue@">李月</a>* @Version:v1.0* @createTime:9月21日 下午5:51:45*/public class ProducerTest02 {public static void main(String[] args) throws JMSException {//1、获取mq连接工程ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616"); // 这个是MQ真正的后台通讯地址。//2、创建连接Connection createConnection = activeMQConnectionFactory.createConnection();//3、启动连接createConnection.start();//4、创建会话工厂 设置成默认的Session session = createConnection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);//5. 得到生产者MessageProducer producer = session.createProducer(null);//6.开始生产 这里设置成不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);for(int i = 0 ; i < 5; i++) {senMsg(session, producer, "我是生产者主题:"+i);}System.out.println("生产者发送消息完毕 主题...");}public static void senMsg(Session session, MessageProducer producer,String msg) throws JMSException {//1.创建文本消息TextMessage textMessage = session.createTextMessage("Hello MQ"+msg);//2.创建一个通道Destination destination = session.createTopic("leeueTopic");//3.指定主题,然后发送这个消息producer.send(destination,textMessage);}}

2、订阅者代码 Consumer002.java 这个跟消费者没什么区别,、就是获取订阅主题的消息而已

package com.leeue.activemq.test02;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/*** @classDesc: 功能描述:(这是个消费者)* @author:<a href="leeue@">李月</a>* @Version:v1.0* @createTime:9月22日 下午12:25:29*/public class Consumer002 {public static void main(String[] args) throws JMSException {// 1、获取mq连接工程ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // 这个是MQ真正的后台通讯地址。// 2、创建连接Connection createConnection = activeMQConnectionFactory.createConnection();// 3、启动连接createConnection.start();// 4、创建会话工厂 设置成默认的Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5.创建一个主题Destination destination = session.createTopic("leeueTopic");// 6. 创建一个消费者MessageConsumer consumer = session.createConsumer(destination);// 7.建立一个while循环一直监听消息while (true) {// 监听消息 因为刚刚消费者传送的报文就是 TextMessage格式的,所以这边强转就行了.TextMessage textMessage = (TextMessage) consumer.receive();if (textMessage != null) {System.out.println("消费者获取主题发布的消息:" + textMessage.getText());} else {break;}}System.out.println("消费者消费完毕...");}}

主题和订阅者是实时在通信的,并且是一个主题可以对应多个消费者。

Java架构学习(三十三)ActivityMQ基础消息中间件概述异步与同步MQ作用MQ件通讯方式MQ应用场景ActivityMQ安装使用ActivityMQ的案例主题和订阅

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。