1 完成两个流关联数据的清洗
package test
.flink
.scala
.scalaproject
import java
.text
.SimpleDateFormat
import java
.util
import java
.util
.{Date
, Properties
}
import org
.apache
.flink
.api
.common
.functions
.RuntimeContext
import org
.apache
.flink
.api
.common
.serialization
.SimpleStringSchema
import org
.apache
.flink
.api
.java
.tuple
.Tuple
import org
.apache
.flink
.api
.scala
.createTypeInformation
import org
.apache
.flink
.streaming
.api
.TimeCharacteristic
import org
.apache
.flink
.streaming
.api
.functions
.AssignerWithPeriodicWatermarks
import org
.apache
.flink
.streaming
.api
.functions
.co
.CoFlatMapFunction
import org
.apache
.flink
.streaming
.api
.scala
.StreamExecutionEnvironment
import org
.apache
.flink
.streaming
.api
.scala
.function
.WindowFunction
import org
.apache
.flink
.streaming
.api
.watermark
.Watermark
import org
.apache
.flink
.streaming
.api
.windowing
.assigners
.TumblingEventTimeWindows
import org
.apache
.flink
.streaming
.api
.windowing
.time
.Time
import org
.apache
.flink
.streaming
.api
.windowing
.windows
.TimeWindow
import org
.apache
.flink
.streaming
.connectors
.elasticsearch
.{ElasticsearchSinkFunction
, RequestIndexer
}
import org
.apache
.flink
.streaming
.connectors
.elasticsearch6
.ElasticsearchSink
import org
.apache
.flink
.streaming
.connectors
.kafka
.FlinkKafkaConsumer011
import org
.apache
.flink
.util
.Collector
import org
.apache
.http
.HttpHost
import org
.elasticsearch
.action
.index
.IndexRequest
import org
.elasticsearch
.client
.Requests
import org
.slf4j
.LoggerFactory
import scala
.collection
.mutable
import scala
.collection
.mutable
.ArrayBuffer
object LogAnalysis
{
val logger
= LoggerFactory
.getLogger
("LogAnalysis")
def main
(args
: Array
[String]): Unit = {
val env
= StreamExecutionEnvironment
.getExecutionEnvironment
env
.setStreamTimeCharacteristic
(TimeCharacteristic
.EventTime
)
val topic
= "tzbtest"
val prop
= new Properties
()
prop
.setProperty
("bootstrap.servers", "master:9092");
prop
.setProperty
("group.id", "test-tzb-group")
val consumer
= new FlinkKafkaConsumer011
[String](topic
, new SimpleStringSchema
(), prop
)
val data
= env
.addSource
(consumer
)
val logData
= data
.map
(x
=> {
val splits
= x
.split
("\t")
val level
= splits
(2)
val timeStr
= splits
(3)
var time
= 0l
try {
val sourceFormat
= new SimpleDateFormat
("yyyy-MM-dd HH:mm:ss")
time
= sourceFormat
.parse
(timeStr
).getTime
} catch {
case e
: Exception
=> {
logger
.error
(s
"time parse error: $timeStr", e
.getMessage
)
}
}
val domain
= splits
(5)
val traffic
= splits
(6).toLong
(level
, time
, domain
, traffic
)
}).filter
(_
._2
!= 0).filter
(_
._1
== "E")
.map
(x
=> {
(x
._2
, x
._3
, x
._4
)
})
val mysqlData
= env
.addSource
(new MySQLSource
)
val connectData
= logData
.connect
(mysqlData
)
.flatMap
(new CoFlatMapFunction
[(Long, String, Long), mutable
.HashMap
[String, String], String] {
var userDomainMap
= mutable
.HashMap
[String, String]()
override def flatMap1
(value
: (Long, String, Long), out
: Collector
[String]): Unit = {
val domain
= value
._2
val userId
= userDomainMap
.getOrElse
(domain
, "")
println
("~~~~~" + userId
)
out
.collect
(value
._1
+ "\t" + value
._2
+ "\t" + value
._3
+ "\t" + userId
)
}
override def flatMap2
(value
: mutable
.HashMap
[String, String], out
: Collector
[String]): Unit = {
userDomainMap
= value
}
})
connectData
.print
()
env
.execute
("LogAnalysis")
}
}
启动生产者
处理后的结果
2 生产上并行度的设置
并行度最好通过参数传入
env 设置并行度addSource 设置并行度