千家信息网

sharding-jdbc中ShardingTransactionManager有什么用

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章将为大家详细讲解有关sharding-jdbc中ShardingTransactionManager有什么用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有
千家信息网最后更新 2025年01月24日sharding-jdbc中ShardingTransactionManager有什么用

这篇文章将为大家详细讲解有关sharding-jdbc中ShardingTransactionManager有什么用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

ShardingTransactionManager

incubator-shardingsphere-4.0.0-RC1/sharding-transaction/sharding-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingTransactionManager.java

public interface ShardingTransactionManager extends AutoCloseable {        /**     * Initialize sharding transaction manager.     *     * @param databaseType database type     * @param resourceDataSources resource data sources     */    void init(DatabaseType databaseType, Collection resourceDataSources);        /**     * Get transaction type.     *     * @return transaction type     */    TransactionType getTransactionType();        /**     * Judge is in transaction or not.     *      * @return in transaction or not     */    boolean isInTransaction();        /**     * Get transactional connection.     *     * @param dataSourceName data source name     * @return connection     * @throws SQLException SQL exception     */    Connection getConnection(String dataSourceName) throws SQLException;        /**     * Begin transaction.     */    void begin();        /**     * Commit transaction.     */    void commit();        /**     * Rollback transaction.     */    void rollback();}
  • ShardingTransactionManager继承了AutoCloseable接口,它定义了init、getTransactionType、isInTransaction、getConnection、begin、commit、rollback方法

XAShardingTransactionManager

incubator-shardingsphere-4.0.0-RC1/sharding-transaction/sharding-transaction-2pc/sharding-transaction-xa/sharding-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingTransactionManager.java

public final class XAShardingTransactionManager implements ShardingTransactionManager {        private final Map singleXADataSourceMap = new HashMap<>();        private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();        private ThreadLocal> enlistedXAResource = new ThreadLocal>() {        @Override        public List initialValue() {            return new LinkedList<>();        }    };        @Override    public void init(final DatabaseType databaseType, final Collection resourceDataSources) {        for (ResourceDataSource each : resourceDataSources) {            DataSource dataSource = each.getDataSource();            if (dataSource instanceof AtomikosDataSourceBean) {                continue;            }            SingleXADataSource singleXADataSource = new SingleXADataSource(databaseType, each.getUniqueResourceName(), dataSource);            singleXADataSourceMap.put(each.getOriginalName(), singleXADataSource);            xaTransactionManager.registerRecoveryResource(each.getUniqueResourceName(), singleXADataSource.getXaDataSource());        }        xaTransactionManager.init();    }        @Override    public TransactionType getTransactionType() {        return TransactionType.XA;    }        @SneakyThrows    @Override    public boolean isInTransaction() {        return Status.STATUS_NO_TRANSACTION != xaTransactionManager.getTransactionManager().getStatus();    }        @SneakyThrows    @Override    public Connection getConnection(final String dataSourceName) {        SingleXAConnection singleXAConnection = singleXADataSourceMap.get(dataSourceName).getXAConnection();        if (!enlistedXAResource.get().contains(dataSourceName)) {            xaTransactionManager.enlistResource(singleXAConnection.getXAResource());            enlistedXAResource.get().add(dataSourceName);        }        return singleXAConnection.getConnection();    }        @SneakyThrows    @Override    public void begin() {        xaTransactionManager.getTransactionManager().begin();    }        @SneakyThrows    @Override    public void commit() {        try {            xaTransactionManager.getTransactionManager().commit();        } finally {            enlistedXAResource.remove();        }    }        @SneakyThrows    @Override    public void rollback() {        try {            xaTransactionManager.getTransactionManager().rollback();        } finally {            enlistedXAResource.remove();        }    }        @Override    public void close() throws Exception {        for (SingleXADataSource each : singleXADataSourceMap.values()) {            xaTransactionManager.removeRecoveryResource(each.getResourceName(), each.getXaDataSource());        }        singleXADataSourceMap.clear();        xaTransactionManager.close();        enlistedXAResource = null;    }}
  • XAShardingTransactionManager实现了ShardingTransactionManager接口,它拥有singleXADataSourceMap、xaTransactionManager、enlistedXAResource三个属性,其init方法对这三个属性进行了初始化

  • getTransactionType方法返回的是TransactionType.XA;isInTransaction方法则依据xaTransactionManager.getTransactionManager().getStatus()来判断;getConnection方法从singleXADataSourceMap取出singleXAConnection,然后执行其getConnection方法

  • begin方法执行的是xaTransactionManager.getTransactionManager().begin();commit方法则调用了xaTransactionManager.getTransactionManager().commit(),其会在finally中执行enlistedXAResource.remove();rollback方法调用了xaTransactionManager.getTransactionManager().rollback(),其也在finally中执行enlistedXAResource.remove();close方法则执行xaTransactionManager.removeRecoveryResource、singleXADataSourceMap.clear()、xaTransactionManager.close()

XAShardingTransactionManagerTest

incubator-shardingsphere-4.0.0-RC1/sharding-transaction/sharding-transaction-2pc/sharding-transaction-xa/sharding-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingTransactionManagerTest.java

@RunWith(MockitoJUnitRunner.class)public final class XAShardingTransactionManagerTest {        private XAShardingTransactionManager xaShardingTransactionManager = new XAShardingTransactionManager();        @Mock    private XATransactionManager xaTransactionManager;        @Mock    private TransactionManager transactionManager;        @Before    public void setUp() {        when(xaTransactionManager.getTransactionManager()).thenReturn(transactionManager);        ReflectiveUtil.setProperty(xaShardingTransactionManager, "xaTransactionManager", xaTransactionManager);    }        @Test    public void assertGetTransactionType() {        assertThat(xaShardingTransactionManager.getTransactionType(), is(TransactionType.XA));    }        @Test    public void assertRegisterXATransactionalDataSources() {        Collection resourceDataSources = createResourceDataSources(DruidXADataSource.class, DatabaseType.MySQL);        xaShardingTransactionManager.init(DatabaseType.MySQL, resourceDataSources);        for (ResourceDataSource each : resourceDataSources) {            verify(xaTransactionManager).registerRecoveryResource(each.getUniqueResourceName(), (XADataSource) each.getDataSource());        }    }        @Test    public void assertRegisterAtomikosDataSourceBeans() {        xaShardingTransactionManager.init(DatabaseType.MySQL, createAtomikosDataSourceBeanResource());        verify(xaTransactionManager, times(0)).registerRecoveryResource(anyString(), any(XADataSource.class));    }        @Test    public void assertRegisterNoneXATransactionalDAtaSources() {        Collection resourceDataSources = createResourceDataSources(HikariDataSource.class, DatabaseType.MySQL);        xaShardingTransactionManager.init(DatabaseType.MySQL, resourceDataSources);        Map cachedXADatasourceMap = getCachedSingleXADataSourceMap();        assertThat(cachedXADatasourceMap.size(), is(2));    }        @Test    public void assertIsInTransaction() throws SystemException {        when(transactionManager.getStatus()).thenReturn(Status.STATUS_ACTIVE);        assertTrue(xaShardingTransactionManager.isInTransaction());    }        @Test    public void assertIsNotInTransaction() throws SystemException {        when(transactionManager.getStatus()).thenReturn(Status.STATUS_NO_TRANSACTION);        assertFalse(xaShardingTransactionManager.isInTransaction());    }        @Test    public void assertGetConnection() {        setCachedSingleXADataSourceMap("ds1");        Connection actual = xaShardingTransactionManager.getConnection("ds1");        assertThat(actual, instanceOf(Connection.class));        verify(xaTransactionManager).enlistResource(any(SingleXAResource.class));    }        @Test    public void assertGetConnectionWithoutEnlist() {        setCachedSingleXADataSourceMap("ds1");        Connection actual = xaShardingTransactionManager.getConnection("ds1");        assertThat(actual, instanceOf(Connection.class));        xaShardingTransactionManager.getConnection("ds1");        assertThat(actual, instanceOf(Connection.class));        verify(xaTransactionManager).enlistResource(any(SingleXAResource.class));    }        @Test    public void assertClose() throws Exception {        setCachedSingleXADataSourceMap("ds1");        xaShardingTransactionManager.close();        Map cachedSingleXADataSourceMap = getCachedSingleXADataSourceMap();        verify(xaTransactionManager).removeRecoveryResource(anyString(), any(XADataSource.class));        assertThat(cachedSingleXADataSourceMap.size(), is(0));    }        @SneakyThrows    @SuppressWarnings("unchecked")    private Map getCachedSingleXADataSourceMap() {        Field field = xaShardingTransactionManager.getClass().getDeclaredField("singleXADataSourceMap");        field.setAccessible(true);        return (Map) field.get(xaShardingTransactionManager);    }        @SneakyThrows    private void setCachedSingleXADataSourceMap(final String datasourceName) {        Field field = xaShardingTransactionManager.getClass().getDeclaredField("singleXADataSourceMap");        field.setAccessible(true);        field.set(xaShardingTransactionManager, createMockSingleXADataSourceMap(datasourceName));    }        @SneakyThrows    private Map createMockSingleXADataSourceMap(final String datasourceName) {        SingleXADataSource singleXADataSource = mock(SingleXADataSource.class);        SingleXAConnection singleXAConnection = mock(SingleXAConnection.class);        XADataSource xaDataSource = mock(XADataSource.class);        SingleXAResource singleXAResource = mock(SingleXAResource.class);        Connection connection = mock(Connection.class);        when(singleXAConnection.getConnection()).thenReturn(connection);        when(singleXAConnection.getXAResource()).thenReturn(singleXAResource);        when(singleXADataSource.getXAConnection()).thenReturn(singleXAConnection);        when(singleXADataSource.getResourceName()).thenReturn(datasourceName);        when(singleXADataSource.getXaDataSource()).thenReturn(xaDataSource);        Map result = new HashMap<>();        result.put(datasourceName, singleXADataSource);        return result;    }        private Collection createResourceDataSources(final Class dataSourceClass, final DatabaseType databaseType) {        List result = new LinkedList<>();        result.add(new ResourceDataSource("ds1", DataSourceUtils.build(dataSourceClass, databaseType, "demo_ds_1")));        result.add(new ResourceDataSource("ds2", DataSourceUtils.build(dataSourceClass, databaseType, "demo_ds_2")));        return result;    }        private Collection createAtomikosDataSourceBeanResource() {        List result = new LinkedList<>();        result.add(new ResourceDataSource("ds1", new AtomikosDataSourceBean()));        result.add(new ResourceDataSource("ds2", new AtomikosDataSourceBean()));        return result;    }}
  • 这里mock了xaTransactionManager、transactionManager

小结

  • XAShardingTransactionManager实现了ShardingTransactionManager接口,它拥有singleXADataSourceMap、xaTransactionManager、enlistedXAResource三个属性,其init方法对这三个属性进行了初始化

  • getTransactionType方法返回的是TransactionType.XA;isInTransaction方法则依据xaTransactionManager.getTransactionManager().getStatus()来判断;getConnection方法从singleXADataSourceMap取出singleXAConnection,然后执行其getConnection方法

  • begin方法执行的是xaTransactionManager.getTransactionManager().begin();commit方法则调用了xaTransactionManager.getTransactionManager().commit(),其会在finally中执行enlistedXAResource.remove();rollback方法调用了xaTransactionManager.getTransactionManager().rollback(),其也在finally中执行enlistedXAResource.remove();close方法则执行xaTransactionManager.removeRecoveryResource、singleXADataSourceMap.clear()、xaTransactionManager.close()

关于sharding-jdbc中ShardingTransactionManager有什么用就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0