RabbitMQ消息的确认模式
RabbitMQ消息的确认模式
一 消息发送者的三种确认模式
1 普通发送确认模式
channel.waitForConfirms()
(1)导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.1</version>
</dependency>
</dependencies>
(2)发送端
public class Send {
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.115.130");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
String message="普通发送者确认模式测试消息!";
//启动发送者确认模式
channel.confirmSelect();
channel.basicPublish("directConfirmExchange","confirmRoutingKey",null,message.getBytes("utf-8"));
//阻塞线程等待服务返回响应 ,用于是否消费发送成功,如果服务确认消费已经发送完成则返回true 否则返回false
//可以为这个方法指定一个毫秒用于确定我们的需要等待服务确认的超时时间,
//如果超过了指定的时间以后则会抛出异常InterruptedException 表示服务器出现问题了需要补发消息或
//将消息缓存到Redis中稍后利用定时任务补发
//无论是返回false还是抛出异常消息都有可能发送成功有可能没有发送成功
//如果我们要求这个消息一定要发送到队列例如订单数据,那怎么我们可以采用消息补发
//所谓补发就是重新发送一次消息,可以使用递归或利用Redis+定时任务来完成补发
boolean flag= channel.waitForConfirms();
System.out.println("消息发送成功"+flag);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2 批量发送确认模式
channel.waitForConfirmsOrDie()
(1)导入依赖
(2)发送端
public class Send {
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.115.130");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
String message="普通发送者确认模式测试消息!";
//启动发送者确认模式
channel.confirmSelect();
channel.basicPublish("directConfirmExchange","confirmRoutingKey",null,message.getBytes("utf-8"));
/**
* waitForConfirmsOrDie 批量消息确认,它会同时向服务中确认之前当前通道中发送的所有的消息是否已经全部成功写入
* 这个方法没有任何的返回值,如果服务器中有一条消息没有能够成功或向服务器发送确认时服务不可访问都被认定为
* 消息确认失败,可能有有消息没有发送成功,我们需要进行消费的补发。
* 如果无法向服务器获取确认信息那么方法就会抛出InterruptedException异常,这时就需要补发消息到队列
* waitForConfirmsOrDie方法可以指定一个参数timeout 用于等待服务器的确认时间,如果超过这个时间也会
* 抛出异常,表示确认失败需要补发消息
*
* 注意:
* 批量消息确认的速度比普通的消息确认要快,但是如果一旦出现了消息补发的情况,我们不能确定具体
* 是哪条消息没有完成发送,需要将本次的发送的所有消息全部进行补发
*
*/
channel.waitForConfirmsOrDie();
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3 异步监听发送确认模式
channel.addConfirmListener(new ConfirmListener() {});
(1)引入依赖
(2)发送端
public class Send {
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.115.130");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
String message="普通发送者确认模式测试消息!";
//启动发送者确认模式
channel.confirmSelect();
/**
*异步消息确认监听器,需要在发送消息前启动
*/
channel.addConfirmListener(new ConfirmListener() {
//消息确认以后的回调方法
//参数 1 为被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
//参数 2 为当前消息是否同时确认了多个
//注意:如果参数 2 为true 则表示本次确认同时确认了多条消息,消息等于当前参数1 (消息编号)的所有消息
// 全部被确认 如果为false 则表示只确认多了当前编号的消息
public void handleAck(long l, boolean b) throws IOException {
System.out.println("消息被确认了 --- 消息编号:"+l+" 是否确认了多条:"+b);
}
//消息没有确认的回调方法
//如果这个方法被执行表示当前的消息没有被确认 需要进行消息补发
//参数 1 为没有被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
//参数 2 为当前消息是否同时没有确认多个
//注意: 如果参数2 为true 则表示小于当前编号的所有的消息可能都没有发送成功需要进行消息的补发
// 如果参数2 为false则表示当前编号的消息没法发送成功需要进行补发
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息没有被确认-----消息编号:"+l+" 是否没有确认多条:"+b);
}
});
for(int i=0;i<10000;i++){
channel.basicPublish("directConfirmExchange","confirmRoutingKey",null,message.getBytes("utf-8"));
}
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// if(channel!=null){
// try {
// channel.close();
// } catch (IOException e) {
// e.printStackTrace();
// } catch (TimeoutException e) {
// e.printStackTrace();
// }
// }
// if(connection!=null){
// try {
// connection.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
}
}
}
二 消息的消费者确认模式
手动确认主要并使用以下方法:
basicAck(): 用于肯定确认,multiple参数用于多个消息确认。
basicRecover():是路由不成功的消息可以使用recovery重新发送到队列中。
basicReject():是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。
basicNack():可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true
(1)引入依赖
(2)接收端
public class Receive {
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.115.130");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection=null;
Channel channel=null;
try {
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);
channel.exchangeDeclare("directConfirmExchange","direct",true);
channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
//启动事务
channel.txSelect();
/**
* 接收消息
* 参数 2 为消息的确认机制,true表示自动消息确认,确认以后消息会从队列中被移除 ,当读取完消息以后就会自动确认
* 如果为false 表示手动确认消息
* 注意:
* 1、如果我们只是接收的消息但是还没有来得处理,当前应用就崩溃或在进行处理的时候例如像数据库中
* 写数据但是数据库这时不可用,那么由于消息是自动确认的那么这个消息就会在接收完成以后自动从队列中
* 被删除,这就会丢失消息
*/
channel.basicConsume("confirmQueue",false,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取当前当前消息是否被接收过一次如果返回值为false表示消息之前没有被接收过,如果返回值为true
//则表示之前这个消息被接收过,可能也处理完成,因此我们要进行消息的防重复处理
Boolean isRedeliver= envelope.isRedeliver();
//获取当前内部类中的通道
Channel c= this.getChannel();
if(!isRedeliver){
String message=new String(body);
System.out.println("消费者 处理了消息---"+message);
//获取消息的编号,我们需要根据消息的编号来确认消息
long tag= envelope.getDeliveryTag();
//手动确认消息,确认以后表示当前消息已经成功处理了,需要从队列中移除掉
//这个方法应该在当前消息的处理程序全部完成以后执行
//参数 1 为消息的序号
//参数 2 为是否确认多个,如果为true则表示需要确认小等于当前编号的所有消息,false就是单个确认值确认当前消息
// c.basicAck(tag,true);
}else{
//程序到了这里表示这个消息之前已经被接收过需要进行防重复处理
//例如查询数据库中是否已经添加了记录或已经修改过了记录
//如果经过判断这条没有被处理完成则需要重新处理消息然后确认掉这条消息
//如果已经处理过了则直接确认消息即可不需要进行其他处理操作
//c.basicAck(tag,false);
}
//注意:如果启动了事务,而消息消费者确认模式为手动确认那么必须要提交事务否则即使调用了确认方法
//那么消息也不会从队列中被移除掉
// c.txCommit();
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}