RDS与POLARDB归档到X-Pack Spark计算的方法
发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家
千家信息网最后更新 2024年09月22日RDS与POLARDB归档到X-Pack Spark计算的方法
本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。
RDS & POLARDB分表归档到X-Pack Spark步骤
一键关联POLARDB到Spark集群
POLARDB表存储
在database 'test1'中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、...
具体的建表语句如下:
*请左右滑动阅览
CREATE TABLE `test1` ( `a` int(11) NOT NULL, `b` time DEFAULT NULL, `c` double DEFAULT NULL, PRIMARY KEY (`a`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
归档到Spark的调试
x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。
1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。
2、创建交互式查询
以pyspark为例,下面是具体归档demo的代码:
*请左右滑动阅览
spark.sql("drop table sparktest").show()# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区# CREATE TABLE `test1` (# `a` int(11) NOT NULL,# `b` time DEFAULT NULL,# `c` double DEFAULT NULL,# PRIMARY KEY (`a`)# ) ENGINE=InnoDB DEFAULT CHARSET=utf8for num in range(1, 4): #构造polardb的表名 dbtable = "test1." + "test" + str(num) #spark外表关联polardb对应的表 externalPolarDBTableNow = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \ .option("dbtable", dbtable) \ .option("user", "name") \ .option("password", "xxx*") \ .load().registerTempTable("polardbTableTemp") #生成本次polardb表数据要写入的spark表的分区信息 (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num)) #执行导数据sql spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) " "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show() #删除临时的spark映射polardb表的catalog spark.catalog.dropTempView("polardbTableTemp") #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除 spark.sql("show partitions sparktest").show(1000, False) spark.sql("select count(*) from sparktest").show()
归档作业上生产
交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例:
/polardb/polardbArchiving.py 内容如下:
*请左右滑动阅览
# -*- coding: UTF-8 -*-from __future__ import print_functionimport sysfrom operator import addfrom pyspark.sql import SparkSessionif __name__ == "__main__": spark = SparkSession \ .builder \ .appName("PolardbArchiving") \ .enableHiveSupport() \ .getOrCreate() spark.sql("drop table sparktest").show() # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致 spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show() #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range(1, 4): #构造polardb的表名 dbtable = "test1.">
"RDS与POLARDB归档到X-Pack Spark计算的方法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
数据
存储
查询
交互式
作业
内容
方法
一致
例子
字段
实际
小时
方式
更多
知识
类型
过程
建一
关联
处理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
移动服务器密码是什么意思
电子软件开发成果报告书
ibm服务器的存储
ERP数据库备份需要多久
个人接软件开发项目流程
软件开发行业发展现在
ios换服务器下的游戏换回来
软件开发提测单包含哪些内容
十堰良好软件开发诚信为本
fm2022数据库怎么选
网络安全渗透毕业设计
如何保护web服务器安全
网络安全的态度感知
合肥专业的软件开发培训
社保软件开发
有关图书借阅的数据库设计
高并发服务器发展趋势
我的世界做服务器mod需要买吗
表格锁定两个条件引数据库
网络技术是理工吗
数据库读取数据迟缓原因
香港服务器可以做关键词优化吗
数据库查询含有王字的人
网络安全公司简介
网站数据库图片保存方案
杭州数字多媒体软件开发
厦门走启科技互联网有限公司
如何搭建后台服务器
fm2022数据库怎么选
网络安全教育心得400