Spring事务详解,看这篇就够了!

Spring事务详解,看这篇就够了!-mikechen的互联网架构

之前谈了Spring的两大核心SpringIOCSpringAOP,今天接着谈另一个重点Spring事务@mikechen

Spring事务

Spring 本身并不实现事务,Spring事务 的本质 还是 底层数据库 对事务的支持,没有 数据库 事务的支持,Spring事务就不会生效。

Spring 事务 提供一套抽象的事务管理,并且结合 Spring IOCSpring AOP,简化了应用程序使用数据库事务,通过声明式事务,可以做到对应用程序无侵入的实现事务功能。例如 使用JDBC 操作数据库,想要使用事务的步骤为:

1、获取连接 Connection con = DriverManager.getConnection()

2、开启事务con.setAutoCommit(true/false);

3、执行CRUD

4、提交事务/回滚事务 con.commit() / con.rollback();

5、关闭连接 conn.close();

采用Spring 事务后,只需要 关注第3步的实现,其他的步骤 都是Spring 完成。

Spring事务的本质其实就是数据库对事务的支持,Spring只提供统一事务管理接口,具体实现都是由各数据库自己实现。

 

事务的ACID

众所周知,事务有四大特性(ACID)

Spring事务详解,看这篇就够了!-mikechen的互联网架构

1.原子性(Atomicity)

事务是一个原子操作,由一系列动作组成。事务的原子性确保动作要么全部完成,要么完全不起作用。

eg:拿转账来说,假设用户A和用户B两者的钱加起来一共是20000,那么不管A和B之间如何转账,转几次账,事务结束后两个用户的钱相加起来应该还得是20000,这就是事务的一致性。

2. 一致性(Consistency)

事务在执行前后,数据库中数据要保持一致性状态。

3.隔离性(Isolation)

隔离性是当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。

即要达到这么一种效果:对于任意两个并发的事务T1和T2,在事务T1看来,T2要么在T1开始之前就已经结束,要么在T1结束之后才开始,这样每个事务都感觉不到有其他事务在并发地执行。

4. 持久性(Durability)

一旦事务完成,数据库的改变必须是持久化的。

 

Spring中的隔离级别

Spring事务详解,看这篇就够了!-mikechen的互联网架构

1、DEFAULT (默认)
这是一个PlatfromTransactionManager默认的隔离级别,使用数据库默认的事务隔离级别。另外四个与JDBC的隔离级别相对应。

2、READ_UNCOMMITTED (读未提交)
这是事务最低的隔离级别,它允许另外一个事务可以看到这个事务未提交的数据。这种隔离级别会产生脏读,不可重复读和幻像读。

3、READ_COMMITTED (读已提交)
保证一个事务修改的数据提交后才能被另外一个事务读取,另外一个事务不能读取该事务未提交的数据。
这种事务隔离级别可以避免脏读出现,但是可能会出现不可重复读和幻像读。

4、REPEATABLE_READ (可重复读)
这种事务隔离级别可以防止脏读、不可重复读,但是可能出现幻像读。它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了不可重复读。

5、 SERIALIZABLE(串行化)
这是花费最高代价但是最可靠的事务隔离级别,事务被处理为顺序执行。除了防止脏读、不可重复读外,还避免了幻像读。

 

Spring事务的传播属性

Spring定义了7个以PROPAGATION_开头的常量表示它的传播属性,如下图所示:

Spring事务详解,看这篇就够了!-mikechen的互联网架构

 

Spring 事务的两种管理方式

Spring事务有两种方式:编程式事务管理、声明式事务管理。

Spring事务详解,看这篇就够了!-mikechen的互联网架构

1.编程式事务

所谓编程式事务指的是通过编码方式实现事务,允许用户在代码中精确定义事务的边界。

Spring框架提供了两种编程式事务方式:

  • 使用TransactionTemplate
  • 使用PlatformTransactionManager

Spring团队通常建议使用TransactionTemplate进行程序化事务管理。

TransactionTemplate采用与其他spring模板相同的方法,它使用一种回调方法,使应用程序代码可以处理获取和释放事务资源,让开发人员更加专注于业务逻辑。

@Autowired
private TransactionTemplate transactionTemplate;
public void testTransaction() {

        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
            @Override
            protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {

                try {

                    // ....  业务代码
                } catch (Exception e){
                    //回滚
                    transactionStatus.setRollbackOnly();
                }

            }
        });
}

 

2. 声明式事务

我们经常使用的 方式,通过AOP 实现,对应用程序 侵入较少,采用注解的方式比较简单方便,省去XML 繁琐的配置。

其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。

声明式事务最大的优点就是不需要通过编程的方式管理事务,这样就不需要在业务逻辑代码中掺杂事务管理的代码,只需在配置文件中做相关的事务规则声明(或通过基于@Transactional注解的方式),便可以将事务规则应用到业务逻辑中。

@Transaction
public void insert(String userName){
    this.jdbcTemplate.update("insert into t_user (name) values (?)", userName);
}

 

Spring事务实现原理

想要了解spring事务的实现原理,一个绕不开的点就是Spring AOP,因为事务就是依靠Spring AOP实现的。

如果对AOP不太了解,建议先看这篇文章:Spring AOP全面详解(超级详细)

Spring是通过AOP实现切面逻辑织入的,如下结合着核心调用源码看下基本实现。

首先看下核心调用接口TransactionInterceptor实现的invoke()方法的源码:

@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
    // 获取需要织入事务逻辑的目标类
    Class<?> targetClass = (invocation.getThis() != null ? 
        AopUtils.getTargetClass(invocation.getThis()) : null);

    // 进行事务逻辑的织入
    return invokeWithinTransaction(invocation.getMethod(), targetClass, 
        invocation::proceed);
}

在 invokeWithinTransaction 中会调用 createTransactionIfNecessary 方法:

protected TransactionInfo createTransactionIfNecessary(
    @Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, 
    final String joinpointIdentification) {
    // 如果TransactionAttribute的名称为空,则创建一个代理的TransactionAttribute,
    // 并且将其名称设置为需要织入事务的方法的名称
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 如果事务属性不为空,并且TransactionManager都存在,
            // 则通过TransactionManager获取当前事务状态的对象
            status = tm.getTransaction(txAttr);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" 
                     + joinpointIdentification 
                     + "] because no transaction manager has been configured");
            }
        }
    }
    
    // 将当前事务属性和事务状态封装为一个TransactionInfo,这里主要做的工作是将事务属性绑定到当前线程
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

这里对事务的创建,首先会判断封装事务属性的对象名称是否为空,如果不为空,则以目标方法的标识符作为其名称,然后通过TransactionManager创建事务。如下是TransactionManager.getTransaction()方法的源码:

@Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {

        //如果TransactionDefinition为空则使用默认配置
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();

        if (isExistingTransaction(transaction)) {
            //如果有存在的事务,则处理存在的事物
            return handleExistingTransaction(def, transaction, debugEnabled);
        }

        // 判断事务是否超时
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }

        // 使用PROPAGATION_MANDATORY这个级别,没有可用的事务则抛异常
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
        //根据隔离级别开始事务
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }

startTransaction 会调用 DataSourceTransactionManager 的 doBegin 方法设置当前连接为手动提交事务

@Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
      //获取数据库连接
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            txObject.setReadOnly(definition.isReadOnly());

            //如果连接是自动提交,则设置成手动
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
        //这里会和数据库通信
                con.setAutoCommit(false);
            }

            prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

再看回 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 方法,这里显示了 aop 如何处理事务

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        
            //获取事务
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

            Object retVal;
         	try {
                //触发后面的拦截器和方法本身
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 捕获异常处理回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
        //重置threadLocal中事务状态
                cleanupTransactionInfo(txInfo);
            }
            //省略
            ...
            return retVal;
        }

再看一下 completeTransactionAfterThrowing 方法,如果是需要回滚的异常则执行回滚,否则执行提交

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                        "] after exception: " + ex);
            }
            //需要回滚的异常
            if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    //执行回滚
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    throw ex2;
                }
            }
            else {
                try {
                    //提交
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    throw ex2;
                }
            }
        }
    }

 

Spring事务实现总结

Spring事务的本质 其实就是 AOP 和 数据库事务,Spring 将数据库的事务操作提取为 切面,通过 aop 在方法执行前后增加数据库事务的操。

1、在方法开始时判断是否开启新事务,需要开启事务则设置事务手动提交 set autocommit=0;

2、在方法执行完成后手动提交事务 commit;

3、在方法抛出指定异常后调用 rollback 回滚事务。

以上!

关注「mikechen的互联网架构」公众号,回复【架构】即可获取mikechen原创99万字进阶架构师300期文章合集

包含:Java多线程与并发系列、分布式架构系列、Spring系列、MySQL系列、微服务系列、Redis系列...等文章合集。

评论交流
    说说你的看法