(5) DataStream API编程

    科技2022-08-06  107

    Source

    从官网的表述中,可以明白:

    Flink处理可以从预定义的一些source function中读取,也可以定制自己的source

    通过StreamExecutionEnvironment.addSource(sourceFunction) 可以将source添加到程序中,addSource(sourceFunction) 的源代码如下:

    /** * Adds a Data Source to the streaming topology. * * <p>By default sources have a parallelism of 1. To enable parallel execution, the user defined source should * implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link * org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}. In these cases the resulting source * will have the parallelism of the environment. To change this afterwards call {@link * org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)} * * @param function * the user defined function * @param <OUT> * type of the returned stream * @return the data stream constructed */ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) { return addSource(function, "Custom Source"); }

    从注释中可以也可以看出,我们可以自定义方法继承或实现SourceFunction,ParallelSourceFunction ,RichParallelSourceFunction。它们的区别主要是并行度不同。

    实践

    需求

    socket发送数据过来,把String类型转换成对象,然后把Java对象保存到MySQL数据库中

    步骤

    创建Java类StudentMySQL创建数据库

    可以通过MySQL5.7 Command Line Client输入SQL命令

    create database imooc_flink; create table student ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(11), primary key (id) ) 编写SinkToMySQL类 package org.myorg.quickstart.course05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * @author BluthLeee */ public class SinkToMySQL extends RichSinkFunction<Student> { Connection connection; PreparedStatement pstmt; private Connection getConnection() { Connection conn = null; try { Class.forName("com.mysql.jdbc.Driver"); String url = "jdbc:mysql://localhost:3306/imooc_flink?useUnicode=true&" + "characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false" + "&serverTimezone=UTC"; conn = DriverManager.getConnection(url,"root","lishun666"); System.out.println("连接成功"); } catch (Exception e) { e.printStackTrace(); } return conn; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id,name,age) values (?,?,?)"; pstmt = connection.prepareStatement(sql); } @Override public void invoke(Student value, Context context) throws Exception { pstmt.setInt(1,value.getId()); pstmt.setString(2,value.getName()); pstmt.setInt(3,value.getAge()); pstmt.executeUpdate(); } @Override public void close() throws Exception { super.close(); if(pstmt != null) { pstmt.close(); } if(connection != null) { connection.close(); } } }

    这里我报错的地方是NullPointException,通过设置断点,从两个部分解决问题。

    需要引入mysql-connector-java-8.0.13 Jar包,可以通过IDEA加入。可参考https://blog.csdn.net/XiufengWu/article/details/78252639设置String url = "jdbc:mysql://localhost:3306/imooc_flink?useUnicode=true&""characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC"; ,详细参考https://stackoverflow.com/questions/50036361/i-want-to-connect-to-db-on-java-but-there-is-some-errors

    需要注意的是运行成功后提示:Loading class ‘com.mysql.jdbc.Driver’. This is deprecated. The new driver class is ‘com.mysql.cj.jdbc.Driver’. The driver is automatically registered via the SPI and manual loading of the driver.

    只要将换成Class.forName("com.mysql.cj.jdbc.Driver");即可

    编写主类 JavaCustomSinkToMySQL package org.myorg.quickstart.course05; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author BluthLeee */ public class JavaCustomSinkToMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("localhost", 7777); SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() { @Override public Student map(String value) throws Exception { String[] splits = value.split(","); Student stu = new Student(); stu.setId(Integer.parseInt(splits[0])); stu.setName(splits[1]); stu.setAge(Integer.parseInt(splits[2])); return stu; } }); studentStream.addSink(new SinkToMySQL()); env.execute("JavaCustomSinkToMySQL"); } }

    运行程序

    在命令行CMD中监听端口7777

    nc -l -p 7777

    执行主类JavaCustomSinkToMySQL

    在命令行CMD中输入样例

    `1,ali,13 2 回车

    在MySQL命令行中,输入select * from student;,观察结果

    可以看到一开始student表中没有记录的,通过在CMD源源不断的输入记录,可以在MySQL命令行界面实时看到已经存入数据库中

    Processed: 0.010, SQL: 8