千家信息网

如何通过AWS EMR降低集群计算成本

发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,如何通过AWS EMR降低集群计算成本,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。AWS EMR是一个计算集群。可以通过ta创建自定义
千家信息网最后更新 2025年01月27日如何通过AWS EMR降低集群计算成本

如何通过AWS EMR降低集群计算成本,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

AWS EMR是一个计算集群。可以通过ta创建自定义配置的虚拟机,并自动安装所需计算框架(Spark,Hadoop,Hive等),以便用来进行大数据计算。

1. 项目背景

公司目前有一个项目,通过爬虫收集数据,离线计算得到用户画像,并将最终结果写入rds,通过api向外展示数据。

2. 架构演进

2.1 技术栈
  • 计算框架 Spark

  • 调度框架 Airflow

  • 数据存储 Hadoop,Mysql

  • 数仓工具 Hive,Presto

  • 辅助工具 Zepplin

  • 脚本语言 Java,Scala,Python

2.2 第一版

环境

我们在某云厂商开了6台虚拟器(4核8G),spark on yarn模式运行,其中1台作为主节点,运行hadoop主节点和airflow调度程序,其余作为数据节点。

计算过程

  • 通过Spark Streaming将数据落地到Hadoop

  • Airflow定时向主节点通过Spark-submit方式提交命令

  • Spark计算后将最终结果写入Mysql

  • 平时开发人员可以在Zepplin进行查询

效果

计算流程可以正常进行

思考

通过一段时间的观察分析,我们发现

  • 大部分计算任务都能在较短时间内完成

  • 机器每天闲置时间很长

  • 业务没有很高的实时性要求

  • 高配置虚拟器成本很高

结论

基于现状,我们希望能有个即开即用的系统,就像电脑一样,要用就打开,用完就关闭。经过调研,最终选择了AWS的EMR。

2.3 第二版

环境

在将系统迁移到AWS EMR之后,在AWS上开了一台虚拟器(1核2G)运行Airflow和Kinesis

这台虚拟器需要一直运行,但Airflow本身不需要高配置

计算过程

  • 通过Kinesis将数据落到S3

  • Airflow定时发起任务

    • 发起创建EMR请求

      可自定义机器配置,要安装的计算框架,也可覆盖框架配置。可通过Python脚本检测集群是否创建成功

    • 提交计算任务

  • 关闭集群

效果

计算流程可以正常进行,但不需要长开机器了,只需要一台低配来触发定时任务即可

思考

通过一段时间的观察

  • EMR费用比起虚拟器,确实便宜很多

  • 可以通过console台查看集群状态,控制集群开关

  • 不方便的地方,平时要查看Hadoop的数据,需要自己写脚本拉取,不能使用辅助工具了

Talk is cheap, show me the code

准备工作

  • 注册AWS账号,登录

  • 开通EMR,S3

    开通S3的目的是为了持久化数据,因为EMR集群本身不带额外硬盘,需要外部介质储存

  • 开通AWS内网可访问的Mysql

    如果不用Hive,可跳过这一步,同理,需要外部介质储存Hive的数据结构

  • 准备创建EMR集群的脚本

    这里有个坑,开始我们使用的AWS SDK来做这件事,但无法自定义计算框架配置(应该是BUG),最初我们通过修改SDK源码解决了这个问题,但后来发现基本没用到SDK其他功能时,我们将这部分代码提成了单独的文件,由于使用了Airflow进行调度,所以决定用了Python

  • 编写Spark任务,打包上传至S3

EMR LIB

# coding: UTF-8import boto3, json, requests, requestsfrom datetime import datetimedef get_region():    # 这个地址不用改    r = requests.get("http://169.254.169.254/latest/dynamic/instance-identity/document")    response_json = r.json()    return response_json.get('region')def client(region_name):    global emr    emr = boto3.client('emr', region_name=region_name)# 创建EMRdef create_cluster(name):    param = {        # 修改需要的框架        "Applications":[{            "Name":"Hadoop"        },{            "Name":"Hive"        },{            "Name":"Spark"        }],        # 这里的名字会显示到控制台        "Name":name,        "ServiceRole":"EMR_DefaultRole",        "Tags":[],        "ReleaseLabel":"emr-5.26.0",        "Instances":{            "TerminationProtected":False,            "EmrManagedMasterSecurityGroup":"sg-0085fba9c3a6818f5",            "InstanceGroups":[{                "InstanceCount":1,                "Name":"主实例组 - 1",                "InstanceRole":"MASTER",                "EbsConfiguration":{                    "EbsBlockDeviceConfigs":[{                        "VolumeSpecification":{                            "SizeInGB":32,                            "VolumeType":"gp2"                        },                        "VolumesPerInstance":1                    }]                },                # 修改需要的硬件配置                "InstanceType":"m4.large",                "Market":"ON_DEMAND",                "Configurations":[{                    # 修改Hive的meta源                    "Classification":"hive-site",                    "Properties":{                        "javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",                        "javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",                        "javax.jdo.option.ConnectionUserName":"user",                        "javax.jdo.option.ConnectionPassword":"pwd"                    }                },{                    "Classification":"yarn-env",                    "Properties":{},                    "Configurations":[{                        "Classification":"export",                        "Properties":{                            "AWS_REGION":"cn-northwest-1",                            "S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",                            "S3_USE_HTTPS":"0",                            "S3_VERIFY_SSL":"0"                        }                    }]                }]            },{                "InstanceRole":"CORE",                "InstanceCount":1,                "Name":"核心实例组 - 2",                "Market":"ON_DEMAND",                # 修改需要的硬件配置                "InstanceType":"r5d.2xlarge",                "Configurations":[{                    "Classification":"hive-site",                    "Properties":{                        "javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",                        "javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",                        "javax.jdo.option.ConnectionUserName":"user",                        "javax.jdo.option.ConnectionPassword":"pwd"                    }                },{                    "Classification":"yarn-env",                    "Properties":{},                    "Configurations":[{                        "Classification":"export",                        "Properties":{                            "AWS_REGION":"cn-northwest-1",                            "S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",                            "S3_USE_HTTPS":"0",                            "S3_VERIFY_SSL":"0"                        }                    }]                }]            },{                # 修改需要的工作节点数                "InstanceCount":4,                "Name":"任务实例组 - 4",                "InstanceRole":"TASK",                "EbsConfiguration":{                    "EbsBlockDeviceConfigs":[{                        "VolumeSpecification":{                            "SizeInGB":32,                            "VolumeType":"gp2"                        },                        "VolumesPerInstance":4                    }]                },                # 修改需要的硬件配置                "InstanceType":"r5d.2xlarge",                "Market":"ON_DEMAND",                "Configurations":[{                    "Classification":"hive-site",                    "Properties":{                        "javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",                        "javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",                        "javax.jdo.option.ConnectionUserName":"user",                        "javax.jdo.option.ConnectionPassword":"pwd"                    }                },{                    "Classification":"yarn-env",                    "Properties":{},                    "Configurations":[{                        "Classification":"export",                        "Properties":{                            "AWS_REGION":"cn-northwest-1",                            "S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",                            "S3_USE_HTTPS":"0",                            "S3_VERIFY_SSL":"0"                        }                    }]                }]            }],            "KeepJobFlowAliveWhenNoSteps":True,            "Ec2SubnetId":"subnet-027bff297ea95039b",            "Ec2KeyName":"hifive.airflow",            "EmrManagedSlaveSecurityGroup":"sg-05a0e076ee7babb9e"        },        "JobFlowRole":"EMR_EC2_DefaultRole",        "Steps":[{            "HadoopJarStep":{                "Args":["state-pusher-script"],                "Jar":"command-runner.jar"            },            "Name":"Setup Hadoop Debugging"        }],        "ScaleDownBehavior":"TERMINATE_AT_TASK_COMPLETION",        "VisibleToAllUsers":True,        "EbsRootVolumeSize":10,        "LogUri":"s3n://aws-logs-550775287661-cn-northwest-1/elasticmapreduce/",        "AutoScalingRole":"EMR_AutoScaling_DefaultRole"    }    cluster_response = emr.run_job_flow(**param)    return cluster_response['JobFlowId']# 获取EMR访问入口def get_cluster_dns(cluster_id):    response = emr.describe_cluster(ClusterId=cluster_id)    return response['Cluster']['MasterPublicDnsName']# 等待集群创建完成def wait_for_cluster_creation(cluster_id):    emr.get_waiter('cluster_running').wait(ClusterId=cluster_id)# 关闭EMRdef terminate_cluster(cluster_id):    emr.terminate_job_flows(JobFlowIds=[cluster_id])

调用测试

# 创建6台机器的集群(1 master,1 core,4 worker)cluster_id = create_cluster("biz_daily_2020_10_09")# 阻塞直到创建成功wait_for_cluster_creation(cluster_id)# dns相当于虚拟机的ssh地址,每次都不同# ssh登录这个地址可以提交spark命令了,这里使用Airflow的SSHOperator模拟登录并提交命令cluster_dns = get_cluster_dns(cluster_id)# 关闭集群terminate_cluster(cluster_id)

3. 其他坑

  • Airflow 1.9.0的时间模板{{ ds }}生成的是格林尼治时间,要改为我国时间,需手动加8小时,不知道新版本是否支持本地时间。

  • ssh登录dns用户名hadoop,这个用户是AWS生成的,似乎无法修改。

看完上述内容,你们掌握如何通过AWS EMR降低集群计算成本的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0