千家信息网

怎样使用Apache Flink中的Table SQL APIx

发表于:2025-02-12 作者:千家信息网编辑
千家信息网最后更新 2025年02月12日,本篇文章为大家展示了怎样使用Apache Flink中的Table SQL APIx,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。什么是Flink关系型API
千家信息网最后更新 2025年02月12日怎样使用Apache Flink中的Table SQL APIx

本篇文章为大家展示了怎样使用Apache Flink中的Table SQL APIx,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

什么是Flink关系型API?

虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体实现。

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.

Flink提供了三层API,每一层API提供了一个在简洁性和表达力之间的权衡 。

最低层是一个有状态的事件驱动。在这一层进行开发是非常麻烦的。

虽然很多功能基于DataSet和DataStreamAPI是可以完成的,需要熟悉这两套API,而且必须要熟悉Java和Scala,这是有一定的难度的。一个框架如果在使用的过程中没法使用SQL来处理,那么这个框架就有很大的限制。虽然对于开发人员无所谓,但是对于用户来说却不显示。因此SQL是非常面向大众语言。

好比MapReduce使用Hive SQL,Spark使用Spark SQL,Flink使用Flink SQL。

虽然Flink支持批处理/流处理,那么如何做到API层面的统一?

这样Table和SQL应运而生。

这其实就是一个关系型API,操作起来如同操作Mysql一样简单。

Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way.

Apache Flink通过使用Table API和SQL 两大特性,来统一批处理和流处理。 Table API是一个查询API,集成了Scala和Java语言,并且允许使用select filter join等操作。

使用Table SQL API需要额外依赖

java:

                    org.apache.flink            flink-streaming-scala_2.11            ${flink.version}                            org.apache.flink            flink-table-planner_2.11            ${flink.version}                            org.apache.flink            flink-table-api-java-bridge_2.11            ${flink.version}        

scala:

                    org.apache.flink            flink-table-planner_2.11            ${flink.version}                            org.apache.flink            flink-table-api-scala-bridge_2.11            ${flink.version}        

使用Table SQL API编程

首先导入上面的依赖,然后读取sales.csv文件,文件内容如下:

transactionId,customerId,itemId,amountPaid111,1,1,100.0112,2,2,505.0113,1,3,510.0114,2,4,600.0115,3,2,500.0116,4,2,500.0117,1,2,500.0118,1,2,500.0119,1,3,500.0120,1,2,500.0121,2,4,500.0122,1,2,500.0123,1,4,500.0124,1,2,500.0

Scala

object TableSQLAPI {  def main(args: Array[String]): Unit = {    val bEnv = ExecutionEnvironment.getExecutionEnvironment    val bTableEnv = BatchTableEnvironment.create(bEnv)    val filePath="E:/test/sales.csv"    // 已经拿到DataSet    val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)    // DataSet => Table  }  case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double                     )}

首先拿到DataSet,接下来将DataSet转为Table,然后就可以执行SQL了

    // DataSet => Tableval salesTable = bTableEnv.fromDataSet(csv)    // 注册成Table  Table => table    bTableEnv.registerTable("sales", salesTable)    // sql    val resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")    bTableEnv.toDataSet[Row](resultTable).print()

输出结果如下:

4,500.03,500.01,4110.02,1605.0

这种方式只需要使用SQL就可以实现之前写mapreduce的功能。大大方便了开发过程。

Java

package com.vincent.course06;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;import org.apache.flink.types.Row;public class JavaTableSQLAPI {    public static void main(String[] args) throws Exception {        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);        DataSource salesDataSource = bEnv.readCsvFile("E:/test/sales.csv").ignoreFirstLine().                pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid");        Table sales = bTableEnv.fromDataSet(salesDataSource);        bTableEnv.registerTable("sales", sales);        Table resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");        DataSet rowDataSet = bTableEnv.toDataSet(resultTable, Row.class);        rowDataSet.print();    }    public static class Sales {        public String transactionId;        public String customerId;        public String itemId;        public Double amountPaid;        @Override        public String toString() {            return "Sales{" +                    "transactionId='" + transactionId + '\'' +                    ", customerId='" + customerId + '\'' +                    ", itemId='" + itemId + '\'' +                    ", amountPaid=" + amountPaid +                    '}';        }    }}

上述内容就是怎样使用Apache Flink中的Table SQL APIx,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0