官网https://www.rabbitmq.com/tutorials/tutorial-one-java.html
需求:使用简单模式完成消息传递
步骤:
① 创建工程(生产者、消费者)
② 分别添加依赖
③ 编写生产者发送消息
④ 编写消费者接收消息
项目创建成功后,把src目录删除
然后创建子模块
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
public class Producer {
public static void main(String[] args) {
try {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址
connectionFactory.setHost("主机地址");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/");
//连接用户名;默认为guest
connectionFactory.setUsername("admin");
//连接密码;默认为guest
connectionFactory.setPassword("写上自己的密码");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当mq重启之后,还在
* exclusive 参数3:是否独占本次连接
* ① 是否独占,只能有一个消费者监听这个队列
* ② 当connection关闭时,是否删除队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除
* arguments 参数5:队列其它参数
*/
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息
* 参数4:消息内容
*/
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行程序:在浏览器输入服务器地址
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
public class Consumer {
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
//ip
factory.setHost("主机地址");
//端口 默认值 5672
factory.setPort(5672);
//虚拟机 默认值/
factory.setVirtualHost("/");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("写上自己的密码");
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//消费方通过信息获取数据,该方法不会自动执行
DefaultConsumer consumer=new DefaultConsumer(channel){
//接收队列中数据的方法
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
} ;
//5. 创建队列Queue,通过信道获取队列的数据
String queue_name="simple_queue";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认 ,类似咱们发短信,发送成功会收到一个确认消息(如果自动签收,会删除队列的该条消息)
3. callback:回调对象
*/
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume(queue_name,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行程序
上述的入门案例中中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ是AMQP协议的Erlang的实现。
在入门案例中:
生产者发送消息
1.生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
2.声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
3.将路由键(空字符串)与队列绑定起来;
4.发送消息至RabbitMQ Broker;
5.关闭信道;
6.关闭连接;
消费者接收消息
1.消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
2.向Broker 请求消费相应队列中的消息,设置相应的回调函数;
3.等待Broker投递响应队列中的消息,消费者接收消息;
4.确认(ack,自动确认)接收到的消息;
5.RabbitMQ从队列中删除相应已经被确认的消息;
6.关闭信道;
7.关闭连接;
阅读量:2012
点赞量:0
收藏量:0