千家信息网

RDS与POLARDB归档到X-Pack Spark计算的方法

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家
千家信息网最后更新 2025年01月24日RDS与POLARDB归档到X-Pack Spark计算的方法

本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。

RDS & POLARDB分表归档到X-Pack Spark步骤

一键关联POLARDB到Spark集群

POLARDB表存储

在database 'test1'中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、...

具体的建表语句如下:

*请左右滑动阅览

 CREATE TABLE `test1` ( `a` int(11) NOT NULL,                        `b` time DEFAULT NULL,                         `c` double DEFAULT NULL,                         PRIMARY KEY (`a`)    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

归档到Spark的调试

x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。

1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。

2、创建交互式查询

以pyspark为例,下面是具体归档demo的代码:

*请左右滑动阅览

spark.sql("drop table sparktest").show()# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "      "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区# CREATE TABLE `test1` (#     `a` int(11) NOT NULL,#                     `b` time DEFAULT NULL,#                                      `c` double DEFAULT NULL,#                                                         PRIMARY KEY (`a`)# ) ENGINE=InnoDB DEFAULT CHARSET=utf8for num in range(1, 4):     #构造polardb的表名    dbtable = "test1." + "test" + str(num)    #spark外表关联polardb对应的表    externalPolarDBTableNow = spark.read \        .format("jdbc") \        .option("driver", "com.mysql.jdbc.Driver") \        .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \        .option("dbtable", dbtable) \        .option("user", "name") \        .option("password", "xxx*") \        .load().registerTempTable("polardbTableTemp")    #生成本次polardb表数据要写入的spark表的分区信息    (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))    #执行导数据sql     spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "          "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()    #删除临时的spark映射polardb表的catalog    spark.catalog.dropTempView("polardbTableTemp")    #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除    spark.sql("show partitions sparktest").show(1000, False)    spark.sql("select count(*) from sparktest").show()

归档作业上生产

交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例:

/polardb/polardbArchiving.py 内容如下:

*请左右滑动阅览

# -*- coding: UTF-8 -*-from __future__ import print_functionimport sysfrom operator import addfrom pyspark.sql import SparkSessionif __name__ == "__main__":    spark = SparkSession \        .builder \        .appName("PolardbArchiving") \        .enableHiveSupport() \        .getOrCreate()    spark.sql("drop table sparktest").show()    # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()    #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区    # CREATE TABLE `test1` (    #     `a` int(11) NOT NULL,    #      `b` time DEFAULT NULL,    #      `c` double DEFAULT NULL,    #       PRIMARY KEY (`a`)    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8    for num in range(1, 4):        #构造polardb的表名        dbtable = "test1.">

"RDS与POLARDB归档到X-Pack Spark计算的方法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0