首页  

RabbitMQ 消息发送和消费过程     所属分类 rabbitmq 浏览量 199
生产者发送消息

生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
生产者通过bindingKey (绑定Key)将交换器和队列绑定( binding )起来
生产者发送消息至RabbitMQ Broker,其中包含routingKey (路由键)、交换器等信息
相应的交换器根据接收到的routingKey 查找相匹配的队列
如果找到,则将从生产者发送过来的消息存入相应的队列中
如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道
关闭连接


消费者接收消息 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息 消费者确认( ack) 接收到的消息 RabbitMQ 从队列中删除相应己经被确认的消息 关闭信道 关闭连接
com.rabbitmq:amqp-client:5.9.0 HelloProducer.java import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class HelloProducer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.10"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 建立TCP连接 Connection connection = factory.newConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明消息队列 队列名 是否持久化 是否排他的 是否自动删除 // 消息队列其他属性信息 channel.queueDeclare("hello", false, false, false, null); channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes()); channel.close(); connection.close(); } }
HelloConsumer push模式 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; public class HelloConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // VirtualHost / 转义 %2f factory.setUri("amqp://root:123456@192.168.10.10:5672/%2f"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); // channel.basicConsume("hello", (consumerTag, message)->{ System.out.println((new String(message.getBody()))); }, (consumerTag)->{}); // channel.close();; // connection.close(); } }
HelloGetConsumer get模式 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.GetResponse; public class HelloGetConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.10.10:5672/%2f"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); // get模式 basicGet GetResponse getResponse = channel.basicGet("hello", true); System.out.println(new String(getResponse.getBody())); channel.close();; connection.close(); } }

上一篇     下一篇
flink prometheus 监控指标

JMS 和 AMQP

rabbitmq 管理页面

Flink JobManager与TaskManage 运行架构

springboot 接口 post 报错 Cannot generate variable name for non-typed Collection parameter type

prometheus file_sd_config 基于文件的服务发现