(6) Flink Table API&SQL

    科技2022-08-06  93

    Introduction

    DataSet&DataStream API

    需要熟悉两套API:DataSet/DataStream

    MapReduce ==> Hive SQL

    Spark ==> Spark SQL

    Flink ==> SQL

    Flink是支持流处理/批处理,如何做到API层面的统一

    ==> Table & SQL API 关系型API

    Everybody knows SQL.

    实践

    引入相关依赖

    https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/

    编写JavaTableSQLAPI主类 package org.myorg.quickstart.course06; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.types.Row; /** * @author BluthLeee */ public class JavaTableSQLAPI { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); String filePath = "file:///E:/Flink/input/sales.csv"; DataSource<Sales> csv = env.readCsvFile(filePath).ignoreFirstLine().pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid"); Table sales = tableEnv.fromDataSet(csv); tableEnv.createTemporaryView("sales",sales); Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId"); DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class); result.print(); } public static class Sales { public String transactionId; public String customerId; public String itemId; public Double amountPaid; @Override public String toString() { return "Sales{" + "transactionId='" + transactionId + '\'' + ", customerId='" + customerId + '\'' + ", itemId='" + itemId + '\'' + ", amountPaid=" + amountPaid + '}'; } } }

    总结一下,分为以下几个步骤:

    create a TableEnvironment

    BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

    Convert a DataStream or DataSet into a Table

    Table sales = tableEnv.fromDataSet(csv);

    Create a View for Table

    tableEnv.createTemporaryView("sales",sales);

    Execute SQL for TableEnvironment

    Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");

    Convert a Table into a DataSet of Row by specifying a class

    DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class);

    of Row by specifying a class

    DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class);

    Processed: 0.012, SQL: 8