Kafka如何保证消息不被重复消费?

Kafka如何保证消息不被重复消费?-mikechen

如何保证消息不被重复消费,这是生产环境经常面临的问题,也是大厂重点考察内容,下面我就全面来详解Kafka如何保证消息不被重复消费@mikechen

消息重复消费

消息重复消费是指:同一个消息被同一个消费者多次处理,这个现象在分布式系统、和消息队列中很常见。

很可能会导致数据不一致、重复执行操作等问题,比如:下订单本只改下一次,很可能会造成多次……

如下图所示:

Kafka如何保证消息不被重复消费?-mikechen

所以,确保结果一致是非常重要的,这是处理重复消费问题的关键。

 

重复消费的原因

导致重复消费的原因可能出现在生产者,也可能出现在 MQ本身、 或 消费者这里。

比如:消费者在消费消息时,可能因为处理失败、程序崩溃、或其他….原因,都有可能导致同一条消息被重复处理。

举一个例子,比如:在消费者消费完一条数据,响应“ ack” 信号消费成功时,MQ 突然挂了;

导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。

或者,消费者处理完消息,但确认消息在网络传输中丢失,导致消息队列未收到确认,这些都会造成重复消费。

 

重复消费方案

确保消费者业务逻辑具有幂等性,是解决消息重复消费的关键点。

如下图所示:

Kafka如何保证消息不被重复消费?-mikechen

也就是说,无论同一个请求执行多少次,结果都是一致的。

比如:可以使用唯一索引解决幂等性

在业务系统中,为了确保幂等性,可以通过添加唯一索引、来防止重复数据的写入。

CREATE TABLE `order_log` (
  `id` int AUTO_INCREMENT,
  `orderid` varchar(32) NOT NULL DEFAULT '' COMMENT '唯一id',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uq_orderid` (`orderid`) COMMENT '唯一约束'
) ENGINE=InnoDB;

在这种设计下,每当我们尝试插入一个新的 orderid 时,如果该 orderid 已经存在,数据库将抛出一个唯一索引冲突的错误。

这种机制可以用来确保幂等性,即使多次收到相同的订单消息,也只会有一条记录插入到 order_log 表中。

 

2.数据库去重解决幂等性

为了确保幂等性,可以使用数据库去重表的设计方案,通常涉及创建一个辅助表(去重表)来记录已经处理过的操作。

如下所示:

public static void processOrder(String orderId) {
        String checkSQL = "SELECT COUNT(*) FROM processed_orders WHERE order_id = ?";
        String insertOrderSQL = "INSERT INTO orders (order_id, status) VALUES (?, 'processed')";
        String insertProcessedSQL = "INSERT INTO processed_orders (order_id) VALUES (?)";

        try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
             PreparedStatement checkStmt = connection.prepareStatement(checkSQL);
             PreparedStatement orderStmt = connection.prepareStatement(insertOrderSQL);
             PreparedStatement processedStmt = connection.prepareStatement(insertProcessedSQL)) {

            // 检查订单是否已处理
            checkStmt.setString(1, orderId);
            ResultSet rs = checkStmt.executeQuery();
            if (rs.next() && rs.getInt(1) > 0) {
                System.out.println("Order already processed: " + orderId);
                return;
            }

            // 开始事务
            connection.setAutoCommit(false);

            // 插入订单
            orderStmt.setString(1, orderId);
            orderStmt.executeUpdate();

            // 插入已处理订单记录
            processedStmt.setString(1, orderId);
            processedStmt.executeUpdate();

            // 提交事务
            connection.commit();

            System.out.println("Order processed successfully: " + orderId);

        } catch (SQLException e) {
            e.printStackTrace();
        }
}

通过在插入主表记录之前检查去重表,可以确定操作是否已经处理过,如果没有处理过再执行插入操作并记录到去重表中。

以及,还可以通过设计幂等的更新操作,来确保同样的请求多次执行不会影响结果。

方案有很多,本质都是解决:数据校验的唯一性即可!

mikechen

mikechen睿哥,10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。

关注「mikechen」公众号,获取更多技术干货!

后台回复架构即可获取《阿里架构师进阶专题全部合集》,后台回复面试即可获取《史上最全阿里Java面试题总结

评论交流
    说说你的看法