DataSet&DataStream API
需要熟悉两套API:DataSet/DataStream
MapReduce ==> Hive SQL
Spark ==> Spark SQL
Flink ==> SQL
Flink是支持流处理/批处理,如何做到API层面的统一
==> Table & SQL API 关系型API
Everybody knows SQL.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/
编写JavaTableSQLAPI主类 package org.myorg.quickstart.course06; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.types.Row; /** * @author BluthLeee */ public class JavaTableSQLAPI { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); String filePath = "file:///E:/Flink/input/sales.csv"; DataSource<Sales> csv = env.readCsvFile(filePath).ignoreFirstLine().pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid"); Table sales = tableEnv.fromDataSet(csv); tableEnv.createTemporaryView("sales",sales); Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId"); DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class); result.print(); } public static class Sales { public String transactionId; public String customerId; public String itemId; public Double amountPaid; @Override public String toString() { return "Sales{" + "transactionId='" + transactionId + '\'' + ", customerId='" + customerId + '\'' + ", itemId='" + itemId + '\'' + ", amountPaid=" + amountPaid + '}'; } } }总结一下,分为以下几个步骤:
create a TableEnvironmentBatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
Convert a DataStream or DataSet into a TableTable sales = tableEnv.fromDataSet(csv);
Create a View for TabletableEnv.createTemporaryView("sales",sales);
Execute SQL for TableEnvironmentTable resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");
Convert a Table into a DataSet of Row by specifying a classDataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class);
of Row by specifying a class
DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class);