如何保证消息不被重复消费,这是生产环境经常面临的问题,也是大厂重点考察内容,下面我就全面来详解Kafka如何保证消息不被重复消费@mikechen
消息重复消费
消息重复消费是指:同一个消息被同一个消费者多次处理,这个现象在分布式系统、和消息队列中很常见。
很可能会导致数据不一致、重复执行操作等问题,比如:下订单本只改下一次,很可能会造成多次……
如下图所示:
所以,确保结果一致是非常重要的,这是处理重复消费问题的关键。
重复消费的原因
导致重复消费的原因可能出现在生产者,也可能出现在 MQ本身、 或 消费者这里。
比如:消费者在消费消息时,可能因为处理失败、程序崩溃、或其他….原因,都有可能导致同一条消息被重复处理。
举一个例子,比如:在消费者消费完一条数据,响应“ ack” 信号消费成功时,MQ 突然挂了;
导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。
或者,消费者处理完消息,但确认消息在网络传输中丢失,导致消息队列未收到确认,这些都会造成重复消费。
重复消费方案
确保消费者业务逻辑具有幂等性,是解决消息重复消费的关键点。
如下图所示:
也就是说,无论同一个请求执行多少次,结果都是一致的。
比如:可以使用唯一索引解决幂等性
在业务系统中,为了确保幂等性,可以通过添加唯一索引、来防止重复数据的写入。
CREATE TABLE `order_log` ( `id` int AUTO_INCREMENT, `orderid` varchar(32) NOT NULL DEFAULT '' COMMENT '唯一id', PRIMARY KEY (`id`), UNIQUE KEY `uq_orderid` (`orderid`) COMMENT '唯一约束' ) ENGINE=InnoDB;
在这种设计下,每当我们尝试插入一个新的 orderid
时,如果该 orderid
已经存在,数据库将抛出一个唯一索引冲突的错误。
这种机制可以用来确保幂等性,即使多次收到相同的订单消息,也只会有一条记录插入到 order_log
表中。
2.数据库去重解决幂等性
为了确保幂等性,可以使用数据库去重表的设计方案,通常涉及创建一个辅助表(去重表)来记录已经处理过的操作。
如下所示:
public static void processOrder(String orderId) { String checkSQL = "SELECT COUNT(*) FROM processed_orders WHERE order_id = ?"; String insertOrderSQL = "INSERT INTO orders (order_id, status) VALUES (?, 'processed')"; String insertProcessedSQL = "INSERT INTO processed_orders (order_id) VALUES (?)"; try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); PreparedStatement checkStmt = connection.prepareStatement(checkSQL); PreparedStatement orderStmt = connection.prepareStatement(insertOrderSQL); PreparedStatement processedStmt = connection.prepareStatement(insertProcessedSQL)) { // 检查订单是否已处理 checkStmt.setString(1, orderId); ResultSet rs = checkStmt.executeQuery(); if (rs.next() && rs.getInt(1) > 0) { System.out.println("Order already processed: " + orderId); return; } // 开始事务 connection.setAutoCommit(false); // 插入订单 orderStmt.setString(1, orderId); orderStmt.executeUpdate(); // 插入已处理订单记录 processedStmt.setString(1, orderId); processedStmt.executeUpdate(); // 提交事务 connection.commit(); System.out.println("Order processed successfully: " + orderId); } catch (SQLException e) { e.printStackTrace(); } }
通过在插入主表记录之前检查去重表,可以确定操作是否已经处理过,如果没有处理过再执行插入操作并记录到去重表中。
以及,还可以通过设计幂等的更新操作,来确保同样的请求多次执行不会影响结果。
方案有很多,本质都是解决:数据校验的唯一性即可!
mikechen
mikechen睿哥,10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!

后台回复【架构】即可获取《阿里架构师进阶专题全部合集》,后台回复【面试】即可获取《史上最全阿里Java面试题总结》