RabbitMQ 消息发送和消费过程
所属分类 rabbitmq
浏览量 199
生产者发送消息
生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
生产者通过bindingKey (绑定Key)将交换器和队列绑定( binding )起来
生产者发送消息至RabbitMQ Broker,其中包含routingKey (路由键)、交换器等信息
相应的交换器根据接收到的routingKey 查找相匹配的队列
如果找到,则将从生产者发送过来的消息存入相应的队列中
如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道
关闭连接
消费者接收消息
消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel)
消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息
消费者确认( ack) 接收到的消息
RabbitMQ 从队列中删除相应己经被确认的消息
关闭信道
关闭连接
com.rabbitmq:amqp-client:5.9.0
HelloProducer.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.10");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123456");
factory.setVirtualHost("/");
// 建立TCP连接
Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明消息队列 队列名 是否持久化 是否排他的 是否自动删除
// 消息队列其他属性信息
channel.queueDeclare("hello", false, false, false, null);
channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());
channel.close();
connection.close();
}
}
HelloConsumer push模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
public class HelloConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// VirtualHost / 转义 %2f
factory.setUri("amqp://root:123456@192.168.10.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
//
channel.basicConsume("hello", (consumerTag, message)->{
System.out.println((new String(message.getBody())));
}, (consumerTag)->{});
// channel.close();;
// connection.close();
}
}
HelloGetConsumer get模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
public class HelloGetConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.10.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
// get模式 basicGet
GetResponse getResponse = channel.basicGet("hello", true);
System.out.println(new String(getResponse.getBody()));
channel.close();;
connection.close();
}
}
上一篇
下一篇
flink prometheus 监控指标
JMS 和 AMQP
rabbitmq 管理页面
Flink JobManager与TaskManage 运行架构
springboot 接口 post 报错 Cannot generate variable name for non-typed Collection parameter type
prometheus file_sd_config 基于文件的服务发现