RDS与POLARDB归档到X-Pack Spark计算的方法
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家
千家信息网最后更新 2025年01月24日RDS与POLARDB归档到X-Pack Spark计算的方法
本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。
RDS & POLARDB分表归档到X-Pack Spark步骤
一键关联POLARDB到Spark集群
POLARDB表存储
在database 'test1'中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、...
具体的建表语句如下:
*请左右滑动阅览
CREATE TABLE `test1` ( `a` int(11) NOT NULL, `b` time DEFAULT NULL, `c` double DEFAULT NULL, PRIMARY KEY (`a`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
归档到Spark的调试
x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。
1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。
2、创建交互式查询
以pyspark为例,下面是具体归档demo的代码:
*请左右滑动阅览
spark.sql("drop table sparktest").show()# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区# CREATE TABLE `test1` (# `a` int(11) NOT NULL,# `b` time DEFAULT NULL,# `c` double DEFAULT NULL,# PRIMARY KEY (`a`)# ) ENGINE=InnoDB DEFAULT CHARSET=utf8for num in range(1, 4): #构造polardb的表名 dbtable = "test1." + "test" + str(num) #spark外表关联polardb对应的表 externalPolarDBTableNow = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \ .option("dbtable", dbtable) \ .option("user", "name") \ .option("password", "xxx*") \ .load().registerTempTable("polardbTableTemp") #生成本次polardb表数据要写入的spark表的分区信息 (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num)) #执行导数据sql spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) " "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show() #删除临时的spark映射polardb表的catalog spark.catalog.dropTempView("polardbTableTemp") #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除 spark.sql("show partitions sparktest").show(1000, False) spark.sql("select count(*) from sparktest").show()
归档作业上生产
交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例:
/polardb/polardbArchiving.py 内容如下:
*请左右滑动阅览
# -*- coding: UTF-8 -*-from __future__ import print_functionimport sysfrom operator import addfrom pyspark.sql import SparkSessionif __name__ == "__main__": spark = SparkSession \ .builder \ .appName("PolardbArchiving") \ .enableHiveSupport() \ .getOrCreate() spark.sql("drop table sparktest").show() # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致 spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show() #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range(1, 4): #构造polardb的表名 dbtable = "test1.">
"RDS与POLARDB归档到X-Pack Spark计算的方法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
数据
存储
查询
交互式
作业
内容
方法
一致
例子
字段
实际
小时
方式
更多
知识
类型
过程
建一
关联
处理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器的10s是什么意思
色盲能学计算机网络技术专业吗
互联网公司核心科技
mysql数据库表主键
二级域名指向服务器
北京品牌软件开发成本价
汽车电控软件开发
数据库实现分布式锁的方式
卸载旧的MYSQL数据库服务
查看当前mysql数据库
应用服务器 安全配置
草料二维码连接自有数据库
oracle空间数据库系统
怎么找奥奇传说的服务器
兵团监狱网络安全
掌握的数据库语言
win10网络安全查找密码
河南达双影网络技术服务有限公司
数字媒体和网络技术
网络安全知识线上竞答活动简报
网吧服务器做家用机
化学品理化性质数据库
电商数据库使用什么系统
网络安全博览会体验
c语言是什么软件开发的
数据库考勤表更新
e5 双路服务器
河北君坤互联网科技有限公司电话
宁波市网络安全大赛具体内容
上饶网络技术有限公司