千家信息网

如何进行spark python编程

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,本篇文章给大家分享的是有关如何进行spark python编程,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。spark应用程序结构Spa
千家信息网最后更新 2024年12月12日如何进行spark python编程

本篇文章给大家分享的是有关如何进行spark python编程,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

spark应用程序结构

Spark应用程序可分两部分:driver部分和executor部分初始化SparkContext和主体程序

A:driver部分

driver部分主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在executor部分运行完毕后,需要将SparkContext关闭。

B:executor部分

Spark应用程序的executor部分是对数据的处理,数据分三种:

  • 原生数据,包含输入的数据和输出的数据

    • 生成Scala标量数据,如count(返回RDD中元素的个数)、reduce、fold/aggregate;返回几个标量,如take(返回前几个元素)。

    • 生成Scala集合数据集,如collect(把RDD中的所有元素倒入 Scala集合类型)、lookup(查找对应key的所有值)。

    • 生成hadoop数据集,如saveAsTextFile、saveAsSequenceFile

    • scala集合数据集,如Array(1,2,3,4,5),Spark使用parallelize方法转换成RDD。

    • hadoop数据集,Spark支持存储在hadoop上的文件和hadoop支持的其他文件系统,如本地文件、HBase、SequenceFile和Hadoop的输入格式。例如Spark使用txtFile方法可以将本地文件或HDFS文件转换成RDD。

    • 对于输入原生数据,Spark目前提供了两种:

    • 对于输出数据,Spark除了支持以上两种数据,还支持scala标量

  • RDD,Spark进行并行运算的基本单位,其细节参见RDD 细解。RDD提供了四种算子:

    • 窄依赖算子

    • 宽依赖算子,宽依赖会涉及shuffle类,在DAG图解析时以此为边界产生Stage,如图所示。

    • 输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatMap;

    • 输入输出一对一,但结果RDD的分区结构发生了变化,如union、coalesce;

    • 从输入中选择部分元素的算子,如filter、distinct、subtract、sample。

    • 对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;

    • 对两个RDD基于key进行join和重组,如join、cogroup。

    • 输入算子,将原生数据转换成RDD,如parallelize、txtFile等

    • 转换算子,最主要的算子,是Spark生成DAG图的对象,转换算子并不立即执行,在触发行动算子后再提交给driver处理,生成DAG图 --> Stage --> Task --> Worker执行。按转化算子在DAG图中作用,可以分成两种:

    • 缓存算子,对于要多次使用的RDD,可以缓冲加快运行速度,对重要数据可以采用多备份缓存。

    • 行动算子,将运算结果RDD转换成原生数据,如count、reduce、collect、saveAsTextFile等。

  • 共享变量,在Spark运行时,一个函数传递给RDD内的patition操作时,该函数所用到的变量在每个运算节点上都复制并维护了一份,并且各个节点之间不会相互影响。但是在Spark Application中,可能需要共享一些变量,提供Task或驱动程序使用。Spark提供了两种共享变量:

    • 广播变量,可以缓存到各个节点的共享变量,通常为只读,使用方法:

    • >>> from pyspark.context import SparkContext                    >>> sc = SparkContext('local', 'test')                           >>> b = sc.broadcast([1, 2, 3, 4, 5])                                    >>> b.value[1, 2, 3, 4, 5]                                                        >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    • 累计器,只支持加法操作的变量,可以实现计数器和变量求和。用户可以调用SparkContext.accumulator(v)创建一个初始值为v的累加器,而运行在集群上的Task可以使用"+="操作,但这些任务却不能读取;只有驱动程序才能获取累加器的值。使用方法:

python编程

实验项目

sogou日志数据分析

实验数据来源:sogou精简版数据下载地址

数据格式说明:

访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL

其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID。

以上数据格式是官方说明,实际上该数据集中排名和顺序号之间不是\t分割,而是空格分割。

一个session内查询次数最多的用户的session与相应的查询次数

import sys  from pyspark import SparkContext    if __name__ == "__main__":      if len(sys.argv) != 2:          print >> sys.stderr, "Usage: SogouC "          exit(-1)      sc = SparkContext(appName="SogouC")      sgRDD = sc.textFile(sys.argv[1])      print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10)      sc.stop()

虚拟集群中任意节点运行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt

运行结果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]

以上就是如何进行spark python编程,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

数据 算子 部分 变量 用户 输入 程序 运行 文件 结果 支持 生成 元素 应用程序 方法 节点 应用 查询 输出 编程 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全法网络运营者的解释 网络安全ow 山河棋牌服务器 服务器环境开启了安全模式 域名服务器 域名系统 网络安全维护员是做什么的 有关网络安全图片素材 云服务器保障条款 杭州苹果软件开发有哪些流程 计算机网络技术的职业描述 快鱼云企业级高可用云服务器 马云说的网络安全 保定直销软件开发公司排名 数据库对现实生活的意义 反恐精英csae怎么创建服务器 软件开发团队建设需求 软件开发七大浪费 机房闲置服务器赚钱 湖南国产信创服务器 优秀软件开发人员应具有的 哪一项不是软件开发的模型 广东网络安全学会 高职网络技术通信技术就业 校园网络安全自查工作报告 杭州同欣网络技术有限公司披露 西安亿阳信通网络技术有限公司 海陵区多功能网络技术多少钱 江苏网络安全知识竞赛官网 web服务器安全设置毕业论 我的世界服务器是怎么样的
0