新一代大数据计算引擎 Flink从入门到实战 (20) - 项目实战(6)- 功能2

    科技2022-08-20  114

    1 用户 id 和域名映射

    create table user_domain_config( id int unsigned auto_increment, user_id varchar(50) not null, domain varchar(50) not null, primary key (id) ) insert into user_domain_config(user_id,domain) values('8000000','v1.go2yd.com'); insert into user_domain_config(user_id,domain) values('8000000','v2.go2yd.com'); insert into user_domain_config(user_id,domain) values('8000000','v3.go2yd.com'); insert into user_domain_config(user_id,domain) values('8000000','v4.go2yd.com'); insert into user_domain_config(user_id,domain) values('8000000','vmi.go2yd.com');

    2 功能二

    在做数据清洗的时候,不仅需要处理 row 日志,还需要关联 MySQL 表里的数据。自定义一个 Flink 去读 MySQL 数据的数据源,然后把2个 Stream 关联起来。

    2.1 自定义数据源

    package test.flink.scala.scalaproject import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import scala.collection.mutable class MySQLSource extends RichParallelSourceFunction[mutable.HashMap[String,String]]{ var connection:Connection = null var ps:PreparedStatement = null val sql = "select user_id,domain from user_domain_config" // 建立连接 override def open(parameters: Configuration): Unit = { val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/flink_mooc" val user = "root" val password = "root" Class.forName(driver) connection = DriverManager.getConnection(url,user,password) ps = connection.prepareStatement(sql) } override def close(): Unit = { if(ps != null){ ps.close() } if(connection != null){ connection.close() } } override def run(sourceContext: SourceFunction.SourceContext[mutable.HashMap[String, String]]): Unit = { val res = ps.executeQuery() val resMap = new mutable.HashMap[String,String]() while (res.next()){ resMap.put(res.getString("domain"),res.getString("user_id")) } sourceContext.collect(resMap) } override def cancel(): Unit ={ } }
    测试 package test.flink.scala.scalaproject import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object MySQLSourceTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.addSource(new MySQLSource).setParallelism(1) data.print() env.execute("MySQLSourceTest") } }

    Processed: 0.016, SQL: 9