新一代大数据计算引擎 Flink从入门到实战 (21) - 项目实战(7)- 完成2个流关联数据的清洗

    科技2022-08-19  119

    1 完成两个流关联数据的清洗

    package test.flink.scala.scalaproject import java.text.SimpleDateFormat import java.util import java.util.{Date, Properties} import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.util.Collector import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.slf4j.LoggerFactory import scala.collection.mutable import scala.collection.mutable.ArrayBuffer object LogAnalysis { // 生产上记录日志建议采用这种方法 val logger = LoggerFactory.getLogger("LogAnalysis") def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val topic = "tzbtest" val prop = new Properties() prop.setProperty("bootstrap.servers", "master:9092"); prop.setProperty("group.id", "test-tzb-group") // 接收 kafka 的数据 val consumer = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop) // 接收 kafka 的数据 val data = env.addSource(consumer) val logData = data.map(x => { val splits = x.split("\t") val level = splits(2) val timeStr = splits(3) var time = 0l try { val sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") time = sourceFormat.parse(timeStr).getTime } catch { case e: Exception => { logger.error(s"time parse error: $timeStr", e.getMessage) } } val domain = splits(5) val traffic = splits(6).toLong // 返回 tuple (level, time, domain, traffic) }).filter(_._2 != 0).filter(_._1 == "E") .map(x => { (x._2, x._3, x._4) // 1 level(抛弃) 2 time 3 domain 4 traffic }) //logData.print().setParallelism(1) val mysqlData = env.addSource(new MySQLSource) // mysqlData.print() val connectData = logData.connect(mysqlData) .flatMap(new CoFlatMapFunction[(Long, String, Long), mutable.HashMap[String, String], String] { var userDomainMap = mutable.HashMap[String, String]() // 处理日志的 override def flatMap1(value: (Long, String, Long), out: Collector[String]): Unit = { val domain = value._2 val userId = userDomainMap.getOrElse(domain, "") println("~~~~~" + userId) out.collect(value._1 + "\t" + value._2 + "\t" + value._3 + "\t" + userId) } // 处理 mysql override def flatMap2(value: mutable.HashMap[String, String], out: Collector[String]): Unit = { userDomainMap = value } }) connectData.print() env.execute("LogAnalysis") } }

    启动生产者

    处理后的结果

    2 生产上并行度的设置

    并行度最好通过参数传入
    env 设置并行度addSource 设置并行度
    Processed: 0.009, SQL: 9