1 用户 id 和域名映射
create table user_domain_config
(
id
int unsigned auto_increment,
user_id
varchar(50) not null,
domain
varchar(50) not null,
primary key (id
)
)
insert into user_domain_config
(user_id
,domain
) values('8000000','v1.go2yd.com');
insert into user_domain_config
(user_id
,domain
) values('8000000','v2.go2yd.com');
insert into user_domain_config
(user_id
,domain
) values('8000000','v3.go2yd.com');
insert into user_domain_config
(user_id
,domain
) values('8000000','v4.go2yd.com');
insert into user_domain_config
(user_id
,domain
) values('8000000','vmi.go2yd.com');
2 功能二
在做数据清洗的时候,不仅需要处理 row 日志,还需要关联 MySQL 表里的数据。自定义一个 Flink 去读 MySQL 数据的数据源,然后把2个 Stream 关联起来。
2.1 自定义数据源
package test
.flink
.scala
.scalaproject
import java
.sql
.{Connection
, DriverManager
, PreparedStatement
}
import org
.apache
.flink
.configuration
.Configuration
import org
.apache
.flink
.streaming
.api
.functions
.source
.{RichParallelSourceFunction
, SourceFunction
}
import scala
.collection
.mutable
class MySQLSource
extends RichParallelSourceFunction
[mutable
.HashMap
[String,String]]{
var connection
:Connection
= null
var ps
:PreparedStatement
= null
val sql
= "select user_id,domain from user_domain_config"
override def open
(parameters
: Configuration
): Unit = {
val driver
= "com.mysql.jdbc.Driver"
val url
= "jdbc:mysql://localhost:3306/flink_mooc"
val user
= "root"
val password
= "root"
Class
.forName
(driver
)
connection
= DriverManager
.getConnection
(url
,user
,password
)
ps
= connection
.prepareStatement
(sql
)
}
override def close
(): Unit = {
if(ps
!= null){
ps
.close
()
}
if(connection
!= null){
connection
.close
()
}
}
override def run
(sourceContext
: SourceFunction
.SourceContext
[mutable
.HashMap
[String, String]]): Unit = {
val res
= ps
.executeQuery
()
val resMap
= new mutable
.HashMap
[String,String]()
while (res
.next
()){
resMap
.put
(res
.getString
("domain"),res
.getString
("user_id"))
}
sourceContext
.collect
(resMap
)
}
override def cancel
(): Unit ={
}
}
测试
package test
.flink
.scala
.scalaproject
import org
.apache
.flink
.api
.scala
.createTypeInformation
import org
.apache
.flink
.streaming
.api
.scala
.StreamExecutionEnvironment
object MySQLSourceTest
{
def main
(args
: Array
[String]): Unit = {
val env
= StreamExecutionEnvironment
.getExecutionEnvironment
val data
= env
.addSource
(new MySQLSource
).setParallelism
(1)
data
.print
()
env
.execute
("MySQLSourceTest")
}
}