千家信息网

ShardingContent的功能有哪些

发表于:2024-10-23 作者:千家信息网编辑
千家信息网最后更新 2024年10月23日,这期内容当中小编将会给大家带来有关ShardingContent的功能有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。ShardingContent主要做了那些
千家信息网最后更新 2024年10月23日ShardingContent的功能有哪些

这期内容当中小编将会给大家带来有关ShardingContent的功能有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

ShardingContent主要做了那些功能呢?主要有两部分:

  • 数据源分片元数据

主要根据数据源连接获取对应的url,通过解析url参数来封装数据源分片元数据;数据源分片元数据主要后续SQL路由DCL(比如:授权、创建用户等)操作使用

  • 表分片元数据

主要根据数据节点来获取真实表的元数据;而表分片元数据主要后续SQL解析填充使用

源码分析

1.ShardingContext构造,主要分析ShardingTableMetaData

public  ShardingContext(final Map dataSourceMap, final ShardingRule shardingRule, final DatabaseType databaseType, final Properties props) throws SQLException {        this.shardingRule = shardingRule;        //获取数据源原始元数据信息        this.cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);        //数据源类型        this.databaseType = databaseType;        //sharding 配置参数        //比如:sql打印、线程池大小配置等        shardingProperties = new ShardingProperties(null == props ? new Properties() : props);        //Statement、PrepareStatement执行线程池大小        //一个分片数据源将使用独立的线程池,它不会在同一个JVM中共享线程池甚至不同的数据源        //默认无限制        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);        //执行引擎        executeEngine = new ShardingExecuteEngine(executorSize);        //数据源分片元数据        //以mysql为例,建立连接获取mysql url,将解析后的url参数信息封装到ShardingDataSourceMetaData        ShardingDataSourceMetaData shardingDataSourceMetaData = new ShardingDataSourceMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType);        //表分片元数据        //以mysql为例,会建立连接获取表的元信息(字段、字段类型、索引)        ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(getTableMetaDataInitializer(dataSourceMap, shardingDataSourceMetaData).load(shardingRule));        //封装数据源分片元数据、表分片元数据        metaData = new ShardingMetaData(shardingDataSourceMetaData, shardingTableMetaData);        //解析结果缓存        parsingResultCache = new ParsingResultCache();    }//    private TableMetaDataInitializer getTableMetaDataInitializer(final Map dataSourceMap, final ShardingDataSourceMetaData shardingDataSourceMetaData) {        return new TableMetaDataInitializer(shardingDataSourceMetaData, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap),                shardingProperties.getValue(ShardingPropertiesConstant.MAX_CONNECTIONS_SIZE_PER_QUERY),                shardingProperties.getValue(ShardingPropertiesConstant.CHECK_TABLE_METADATA_ENABLED));    }

2.加载TableMetaDataInitializer#load

    public TableMetaDataInitializer(final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine executeEngine,                                     final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) {        //数据源分片元数据        this.shardingDataSourceMetaData = shardingDataSourceMetaData;        //数据源连接管理器        this.connectionManager = connectionManager;        //表元数据加载器        tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData);    }    /**     * Load table meta data.     *     * @param logicTableName logic table name     * @param shardingRule sharding rule     * @return table meta data     */    @SneakyThrows    public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) {        return tableMetaDataLoader.load(logicTableName, shardingRule);    }        /**     * Load all table meta data.     *      * @param shardingRule sharding rule     * @return all table meta data     */    @SneakyThrows    public Map load(final ShardingRule shardingRule) {        Map result = new HashMap<>();        //加载分片表        result.putAll(loadShardingTables(shardingRule));        //加载未分片表        result.putAll(loadDefaultTables(shardingRule));        return result;    }        private Map loadShardingTables(final ShardingRule shardingRule) throws SQLException {        Map result = new HashMap<>(shardingRule.getTableRules().size(), 1);        for (TableRule each : shardingRule.getTableRules()) {            //加载逻辑表对应真实表的元数据            //逻辑表:表元数据            result.put(each.getLogicTable(), tableMetaDataLoader.load(each.getLogicTable(), shardingRule));        }        return result;    }        private Map loadDefaultTables(final ShardingRule shardingRule) throws SQLException {        Map result = new HashMap<>(shardingRule.getTableRules().size(), 1);        //查询默认数据源,没有则查找主库        Optional actualDefaultDataSourceName = shardingRule.findActualDefaultDataSourceName();        if (actualDefaultDataSourceName.isPresent()) {            //获取所有表元数据            //真实表:表元数据            for (String each : getAllTableNames(actualDefaultDataSourceName.get())) {                result.put(each, tableMetaDataLoader.load(each, shardingRule));            }        }        return result;    }        private Collection getAllTableNames(final String dataSourceName) throws SQLException {        Collection result = new LinkedHashSet<>();        DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);        String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();        try (Connection connection = connectionManager.getConnection(dataSourceName);             ResultSet resultSet = connection.getMetaData().getTables(catalog, getCurrentSchemaName(connection), null, new String[]{"TABLE"})) {            while (resultSet.next()) {                String tableName = resultSet.getString("TABLE_NAME");                if (!tableName.contains("$") && !tableName.contains("/")) {                    result.add(tableName);                }            }        }        return result;    }        private String getCurrentSchemaName(final Connection connection) throws SQLException {        try {            return connection.getSchema();        } catch (final AbstractMethodError | SQLFeatureNotSupportedException ignore) {            return null;        }    }

3.加载表元数据TableMetaDataLoader#load

    /**     * Load table meta data.     *     * @param logicTableName logic table name     * @param shardingRule sharding rule     * @return table meta data     * @throws SQLException SQL exception     */    public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) throws SQLException {        //获取表元数据        List actualTableMetaDataList = load(getDataNodeGroups(logicTableName, shardingRule), shardingRule.getShardingDataSourceNames());        //检查actualTableMetaDataList的元数据        checkUniformed(logicTableName, actualTableMetaDataList);        return actualTableMetaDataList.iterator().next();    }        private List load(final Map> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException {        //将封装好的数据节点组提交给执行引擎执行        return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback() {                        @Override            public Collection execute(final Collection dataNodes, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException {                String dataSourceName = dataNodes.iterator().next().getDataSourceName();                DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);                String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();                return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes);            }        });    }        private Collection load(final String dataSourceName, final String catalog, final Collection dataNodes) throws SQLException {        Collection result = new LinkedList<>();        try (Connection connection = connectionManager.getConnection(dataSourceName)) {            for (DataNode each : dataNodes) {                //获取表元数据                result.add(createTableMetaData(connection, catalog, each.getTableName()));            }        }        return result;    }        private Map> getDataNodeGroups(final String logicTableName, final ShardingRule shardingRule) {        //根据逻辑表获取对应的数据源:真实表数据节点        //比如:        //ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]        //ds_1 -> [ds_1.t_order_0, ds_1.t_order_1]        Map> result = shardingRule.getTableRule(logicTableName).getDataNodeGroups();        //默认false,设置为true会处理所有数据节点真实表        if (isCheckingMetaData) {            return result;        }        //返回一个数据节点即可        String firstKey = result.keySet().iterator().next();        return Collections.singletonMap(firstKey, Collections.singletonList(result.get(firstKey).get(0)));    }    /**     * 将数据节点组封装成分片执行组     *     * @param dataNodeGroups 数据节点组     * 
     *      ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]     * 
* @return */ private Collection> getDataNodeGroups(final Map> dataNodeGroups) { Collection> result = new LinkedList<>(); //遍历对应数据源下的数据节点 for (Entry> entry : dataNodeGroups.entrySet()) { //封装分片执行组ShardingExecuteGroup result.addAll(getDataNodeGroups(entry.getValue())); } return result; } private Collection> getDataNodeGroups(final List dataNodes) { Collection> result = new LinkedList<>(); //maxConnectionsSizePerQuery最大查询连接数默认为1 //将dataNodes换分Math.max份 for (List each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) { result.add(new ShardingExecuteGroup<>(each)); } return result; } private TableMetaData createTableMetaData(final Connection connection, final String catalog, final String actualTableName) throws SQLException { //判断表是否存在 if (isTableExist(connection, catalog, actualTableName)) { //封装表元数据 return new TableMetaData(getColumnMetaDataList(connection, catalog, actualTableName), getLogicIndexes(connection, catalog, actualTableName)); } return new TableMetaData(Collections.emptyList(), Collections.emptySet()); } private boolean isTableExist(final Connection connection, final String catalog, final String actualTableName) throws SQLException { try (ResultSet resultSet = connection.getMetaData().getTables(catalog, null, actualTableName, null)) { return resultSet.next(); } } /** * 获取表字段元数据 * * @param connection 连接 * @param catalog schema * @param actualTableName 真实表 * @return * @throws SQLException */ private List getColumnMetaDataList(final Connection connection, final String catalog, final String actualTableName) throws SQLException { List result = new LinkedList<>(); Collection primaryKeys = getPrimaryKeys(connection, catalog, actualTableName); try (ResultSet resultSet = connection.getMetaData().getColumns(catalog, null, actualTableName, "%")) { while (resultSet.next()) { String columnName = resultSet.getString("COLUMN_NAME"); String columnType = resultSet.getString("TYPE_NAME"); result.add(new ColumnMetaData(columnName, columnType, primaryKeys.contains(columnName))); } } return result; } /** * 获取表主键 */ private Collection getPrimaryKeys(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(catalog, null, actualTableName)) { while (resultSet.next()) { result.add(resultSet.getString("COLUMN_NAME")); } } return result; } /** * 获取表索引 */ private Collection getLogicIndexes(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getIndexInfo(catalog, catalog, actualTableName, false, false)) { while (resultSet.next()) { Optional logicIndex = getLogicIndex(resultSet.getString("INDEX_NAME"), actualTableName); if (logicIndex.isPresent()) { result.add(logicIndex.get()); } } } return result; } private Optional getLogicIndex(final String actualIndexName, final String actualTableName) { //索引要以`_tableName`命名,比如: //idx_t_order String indexNameSuffix = "_" + actualTableName; if (actualIndexName.contains(indexNameSuffix)) { return Optional.of(actualIndexName.replace(indexNameSuffix, "")); } return Optional.absent(); }

4.执行ShardingExecuteEngine#groupExecute

    /**     * Execute for group.     *     * @param inputGroups input groups     * @param callback sharding execute callback     * @param  type of input value     * @param  type of return value     * @return execute result     * @throws SQLException throw if execute failure     */    public  List groupExecute(final Collection> inputGroups, final ShardingGroupExecuteCallback callback) throws SQLException {        return groupExecute(inputGroups, null, callback, false);    }        /**     * Execute for group.     *     * @param inputGroups input groups     * @param firstCallback first sharding execute callback     * @param callback sharding execute callback     * @param serial whether using multi thread execute or not     * @param  type of input value     * @param  type of return value     * @return execute result     * @throws SQLException throw if execute failure     */    public  List groupExecute(        final Collection> inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback, final boolean serial)        throws SQLException {        if (inputGroups.isEmpty()) {            return Collections.emptyList();        }        //serial: 串行        //parallel: 并行        return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);    }        private  List serialExecute(final Collection> inputGroups, final ShardingGroupExecuteCallback firstCallback,                                         final ShardingGroupExecuteCallback callback) throws SQLException {        Iterator> inputGroupsIterator = inputGroups.iterator();        ShardingExecuteGroup firstInputs = inputGroupsIterator.next();        //单独执行第一个组        //当firstCallback不为空时使用firstCallback,否则使用callback        List result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback));        //遍历执行        for (ShardingExecuteGroup each : Lists.newArrayList(inputGroupsIterator)) {            result.addAll(syncGroupExecute(each, callback));        }        return result;    }        private  List parallelExecute(final Collection> inputGroups, final ShardingGroupExecuteCallback firstCallback,                                           final ShardingGroupExecuteCallback callback) throws SQLException {        Iterator> inputGroupsIterator = inputGroups.iterator();        //获取第一个组        ShardingExecuteGroup firstInputs = inputGroupsIterator.next();        //将剩余组提交到线程池中执行        Collection>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);        //执行第一个组,合并同步执行、异步执行结果        return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);    }    /**     * 异步执行     */    private  Collection>> asyncGroupExecute(final List> inputGroups, final ShardingGroupExecuteCallback callback) {        Collection>> result = new LinkedList<>();        for (ShardingExecuteGroup each : inputGroups) {            result.add(asyncGroupExecute(each, callback));        }        return result;    }        private  ListenableFuture> asyncGroupExecute(final ShardingExecuteGroup inputGroup, final ShardingGroupExecuteCallback callback) {        final Map dataMap = ShardingExecuteDataMap.getDataMap();        //提交到线程池        return executorService.submit(new Callable>() {                        @Override            public Collection call() throws SQLException {                return callback.execute(inputGroup.getInputs(), false, dataMap);            }        });    }    /**     * 同步执行     */    private  Collection syncGroupExecute(final ShardingExecuteGroup executeGroup, final ShardingGroupExecuteCallback callback) throws SQLException {        return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap());    }        private  List getGroupResults(final Collection firstResults, final Collection>> restFutures) throws SQLException {        List result = new LinkedList<>(firstResults);        for (ListenableFuture> each : restFutures) {            try {                result.addAll(each.get());            } catch (final InterruptedException | ExecutionException ex) {                return throwException(ex);            }        }        return result;    }

上述就是小编为大家分享的ShardingContent的功能有哪些了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0