ShardingContent的功能有哪些
发表于:2025-02-10 作者:千家信息网编辑
千家信息网最后更新 2025年02月10日,这期内容当中小编将会给大家带来有关ShardingContent的功能有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。ShardingContent主要做了那些
千家信息网最后更新 2025年02月10日ShardingContent的功能有哪些
这期内容当中小编将会给大家带来有关ShardingContent的功能有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
ShardingContent主要做了那些功能呢?主要有两部分:
数据源分片元数据
主要根据数据源连接获取对应的url,通过解析url参数来封装数据源分片元数据;数据源分片元数据主要后续SQL路由DCL(比如:授权、创建用户等)操作使用
表分片元数据
主要根据数据节点来获取真实表的元数据;而表分片元数据主要后续SQL解析填充使用
源码分析
1.ShardingContext构造,主要分析ShardingTableMetaData
public ShardingContext(final MapdataSourceMap, final ShardingRule shardingRule, final DatabaseType databaseType, final Properties props) throws SQLException { this.shardingRule = shardingRule; //获取数据源原始元数据信息 this.cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); //数据源类型 this.databaseType = databaseType; //sharding 配置参数 //比如:sql打印、线程池大小配置等 shardingProperties = new ShardingProperties(null == props ? new Properties() : props); //Statement、PrepareStatement执行线程池大小 //一个分片数据源将使用独立的线程池,它不会在同一个JVM中共享线程池甚至不同的数据源 //默认无限制 int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); //执行引擎 executeEngine = new ShardingExecuteEngine(executorSize); //数据源分片元数据 //以mysql为例,建立连接获取mysql url,将解析后的url参数信息封装到ShardingDataSourceMetaData ShardingDataSourceMetaData shardingDataSourceMetaData = new ShardingDataSourceMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType); //表分片元数据 //以mysql为例,会建立连接获取表的元信息(字段、字段类型、索引) ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(getTableMetaDataInitializer(dataSourceMap, shardingDataSourceMetaData).load(shardingRule)); //封装数据源分片元数据、表分片元数据 metaData = new ShardingMetaData(shardingDataSourceMetaData, shardingTableMetaData); //解析结果缓存 parsingResultCache = new ParsingResultCache(); }// private TableMetaDataInitializer getTableMetaDataInitializer(final Map dataSourceMap, final ShardingDataSourceMetaData shardingDataSourceMetaData) { return new TableMetaDataInitializer(shardingDataSourceMetaData, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap), shardingProperties. getValue(ShardingPropertiesConstant.MAX_CONNECTIONS_SIZE_PER_QUERY), shardingProperties. getValue(ShardingPropertiesConstant.CHECK_TABLE_METADATA_ENABLED)); }
2.加载TableMetaDataInitializer#load
public TableMetaDataInitializer(final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine executeEngine, final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) { //数据源分片元数据 this.shardingDataSourceMetaData = shardingDataSourceMetaData; //数据源连接管理器 this.connectionManager = connectionManager; //表元数据加载器 tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData); } /** * Load table meta data. * * @param logicTableName logic table name * @param shardingRule sharding rule * @return table meta data */ @SneakyThrows public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) { return tableMetaDataLoader.load(logicTableName, shardingRule); } /** * Load all table meta data. * * @param shardingRule sharding rule * @return all table meta data */ @SneakyThrows public Mapload(final ShardingRule shardingRule) { Map result = new HashMap<>(); //加载分片表 result.putAll(loadShardingTables(shardingRule)); //加载未分片表 result.putAll(loadDefaultTables(shardingRule)); return result; } private Map loadShardingTables(final ShardingRule shardingRule) throws SQLException { Map result = new HashMap<>(shardingRule.getTableRules().size(), 1); for (TableRule each : shardingRule.getTableRules()) { //加载逻辑表对应真实表的元数据 //逻辑表:表元数据 result.put(each.getLogicTable(), tableMetaDataLoader.load(each.getLogicTable(), shardingRule)); } return result; } private Map loadDefaultTables(final ShardingRule shardingRule) throws SQLException { Map result = new HashMap<>(shardingRule.getTableRules().size(), 1); //查询默认数据源,没有则查找主库 Optional actualDefaultDataSourceName = shardingRule.findActualDefaultDataSourceName(); if (actualDefaultDataSourceName.isPresent()) { //获取所有表元数据 //真实表:表元数据 for (String each : getAllTableNames(actualDefaultDataSourceName.get())) { result.put(each, tableMetaDataLoader.load(each, shardingRule)); } } return result; } private Collection getAllTableNames(final String dataSourceName) throws SQLException { Collection result = new LinkedHashSet<>(); DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName); String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName(); try (Connection connection = connectionManager.getConnection(dataSourceName); ResultSet resultSet = connection.getMetaData().getTables(catalog, getCurrentSchemaName(connection), null, new String[]{"TABLE"})) { while (resultSet.next()) { String tableName = resultSet.getString("TABLE_NAME"); if (!tableName.contains("$") && !tableName.contains("/")) { result.add(tableName); } } } return result; } private String getCurrentSchemaName(final Connection connection) throws SQLException { try { return connection.getSchema(); } catch (final AbstractMethodError | SQLFeatureNotSupportedException ignore) { return null; } }
3.加载表元数据TableMetaDataLoader#load
/** * Load table meta data. * * @param logicTableName logic table name * @param shardingRule sharding rule * @return table meta data * @throws SQLException SQL exception */ public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) throws SQLException { //获取表元数据 ListactualTableMetaDataList = load(getDataNodeGroups(logicTableName, shardingRule), shardingRule.getShardingDataSourceNames()); //检查actualTableMetaDataList的元数据 checkUniformed(logicTableName, actualTableMetaDataList); return actualTableMetaDataList.iterator().next(); } private List load(final Map > dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException { //将封装好的数据节点组提交给执行引擎执行 return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback () { @Override public Collection execute(final Collection dataNodes, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { String dataSourceName = dataNodes.iterator().next().getDataSourceName(); DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName); String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName(); return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes); } }); } private Collection load(final String dataSourceName, final String catalog, final Collection dataNodes) throws SQLException { Collection result = new LinkedList<>(); try (Connection connection = connectionManager.getConnection(dataSourceName)) { for (DataNode each : dataNodes) { //获取表元数据 result.add(createTableMetaData(connection, catalog, each.getTableName())); } } return result; } private Map > getDataNodeGroups(final String logicTableName, final ShardingRule shardingRule) { //根据逻辑表获取对应的数据源:真实表数据节点 //比如: //ds_0 -> [ds_0:t_order_0, ds_0:t_order_1] //ds_1 -> [ds_1.t_order_0, ds_1.t_order_1] Map > result = shardingRule.getTableRule(logicTableName).getDataNodeGroups(); //默认false,设置为true会处理所有数据节点真实表 if (isCheckingMetaData) { return result; } //返回一个数据节点即可 String firstKey = result.keySet().iterator().next(); return Collections.singletonMap(firstKey, Collections.singletonList(result.get(firstKey).get(0))); } /** * 将数据节点组封装成分片执行组 * * @param dataNodeGroups 数据节点组 * * ds_0 -> [ds_0:t_order_0, ds_0:t_order_1] ** @return */ private Collection> getDataNodeGroups(final Map > dataNodeGroups) { Collection > result = new LinkedList<>(); //遍历对应数据源下的数据节点 for (Entry > entry : dataNodeGroups.entrySet()) { //封装分片执行组ShardingExecuteGroup result.addAll(getDataNodeGroups(entry.getValue())); } return result; } private Collection > getDataNodeGroups(final List dataNodes) { Collection > result = new LinkedList<>(); //maxConnectionsSizePerQuery最大查询连接数默认为1 //将dataNodes换分Math.max份 for (List each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) { result.add(new ShardingExecuteGroup<>(each)); } return result; } private TableMetaData createTableMetaData(final Connection connection, final String catalog, final String actualTableName) throws SQLException { //判断表是否存在 if (isTableExist(connection, catalog, actualTableName)) { //封装表元数据 return new TableMetaData(getColumnMetaDataList(connection, catalog, actualTableName), getLogicIndexes(connection, catalog, actualTableName)); } return new TableMetaData(Collections. emptyList(), Collections. emptySet()); } private boolean isTableExist(final Connection connection, final String catalog, final String actualTableName) throws SQLException { try (ResultSet resultSet = connection.getMetaData().getTables(catalog, null, actualTableName, null)) { return resultSet.next(); } } /** * 获取表字段元数据 * * @param connection 连接 * @param catalog schema * @param actualTableName 真实表 * @return * @throws SQLException */ private List getColumnMetaDataList(final Connection connection, final String catalog, final String actualTableName) throws SQLException { List result = new LinkedList<>(); Collection primaryKeys = getPrimaryKeys(connection, catalog, actualTableName); try (ResultSet resultSet = connection.getMetaData().getColumns(catalog, null, actualTableName, "%")) { while (resultSet.next()) { String columnName = resultSet.getString("COLUMN_NAME"); String columnType = resultSet.getString("TYPE_NAME"); result.add(new ColumnMetaData(columnName, columnType, primaryKeys.contains(columnName))); } } return result; } /** * 获取表主键 */ private Collection getPrimaryKeys(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(catalog, null, actualTableName)) { while (resultSet.next()) { result.add(resultSet.getString("COLUMN_NAME")); } } return result; } /** * 获取表索引 */ private Collection getLogicIndexes(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getIndexInfo(catalog, catalog, actualTableName, false, false)) { while (resultSet.next()) { Optional logicIndex = getLogicIndex(resultSet.getString("INDEX_NAME"), actualTableName); if (logicIndex.isPresent()) { result.add(logicIndex.get()); } } } return result; } private Optional getLogicIndex(final String actualIndexName, final String actualTableName) { //索引要以`_tableName`命名,比如: //idx_t_order String indexNameSuffix = "_" + actualTableName; if (actualIndexName.contains(indexNameSuffix)) { return Optional.of(actualIndexName.replace(indexNameSuffix, "")); } return Optional.absent(); }
4.执行ShardingExecuteEngine#groupExecute
/** * Execute for group. * * @param inputGroups input groups * @param callback sharding execute callback * @param type of input value * @paramtype of return value * @return execute result * @throws SQLException throw if execute failure */ public List groupExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback callback) throws SQLException { return groupExecute(inputGroups, null, callback, false); } /** * Execute for group. * * @param inputGroups input groups * @param firstCallback first sharding execute callback * @param callback sharding execute callback * @param serial whether using multi thread execute or not * @param type of input value * @param type of return value * @return execute result * @throws SQLException throw if execute failure */ public List groupExecute( final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback, final boolean serial) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } //serial: 串行 //parallel: 并行 return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback); } private List serialExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator > inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup firstInputs = inputGroupsIterator.next(); //单独执行第一个组 //当firstCallback不为空时使用firstCallback,否则使用callback List result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback)); //遍历执行 for (ShardingExecuteGroup each : Lists.newArrayList(inputGroupsIterator)) { result.addAll(syncGroupExecute(each, callback)); } return result; } private List parallelExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator > inputGroupsIterator = inputGroups.iterator(); //获取第一个组 ShardingExecuteGroup firstInputs = inputGroupsIterator.next(); //将剩余组提交到线程池中执行 Collection >> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); //执行第一个组,合并同步执行、异步执行结果 return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } /** * 异步执行 */ private Collection >> asyncGroupExecute(final List > inputGroups, final ShardingGroupExecuteCallback callback) { Collection >> result = new LinkedList<>(); for (ShardingExecuteGroup each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; } private ListenableFuture > asyncGroupExecute(final ShardingExecuteGroup inputGroup, final ShardingGroupExecuteCallback callback) { final Map dataMap = ShardingExecuteDataMap.getDataMap(); //提交到线程池 return executorService.submit(new Callable >() { @Override public Collection call() throws SQLException { return callback.execute(inputGroup.getInputs(), false, dataMap); } }); } /** * 同步执行 */ private Collection syncGroupExecute(final ShardingExecuteGroup executeGroup, final ShardingGroupExecuteCallback callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap()); } private List getGroupResults(final Collection firstResults, final Collection >> restFutures) throws SQLException { List result = new LinkedList<>(firstResults); for (ListenableFuture > each : restFutures) { try { result.addAll(each.get()); } catch (final InterruptedException | ExecutionException ex) { return throwException(ex); } } return result; }
上述就是小编为大家分享的ShardingContent的功能有哪些了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
数据
数据源
节点
封装
线程
分析
功能
信息
参数
字段
索引
逻辑
内容
大小
引擎
类型
结果
同步
查询
配置
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
主备服务器数据库同步
重庆黔江生鲜信息软件开发
数据库双竖线和concat
软件开发工工资待遇
虹口区常规网络技术服务质量保证
贵州省征兵网络技术
哈利波特服务器不同可以当室友吗
服务器python脚本查看编码
关于软件开发
阿里云服务器外网无法访问
厦门数据库面试
腾讯软件开发需要什么资质
李海威网络安全
java软件开发需要学多久
网络安全发展论文答辩3分钟简述
软件开发需要哪些专业的人
济南云度网络技术有限公司
员工人事变动数据库设计
中信银行软件开发中心工作内容
广东第三方软件开发定做
计算机网络技术简称什么
虹口区常规网络技术服务质量保证
服务器升级改造方案
hp服务器 raid驱动
南京应用软件开发多少钱
公司服务器账户设置权限管理
服务器总被攻击
千峰教育网络安全视频下载
河北软件开发品质售后无忧
计算机三级网络技术重点