public class Kafka2Flink { public static void main(String[] args) throws Exception { //获取execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "172.0.102.91:9092,172.0.102.92:9092,172.0.102.93:9092"); properties.setProperty("group.id", "cloudera_mirrormaker"); //构建FlinkKafkaConsumer FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties); //指定偏移量 myConsumer.setStartFromLatest(); //转换成样例类类型 SingleOutputStreamOperator<String> testStream = stream.map(data -> { WifiProbe wifiProbe = null; if (data.contains("507019")) { LogDecomposition deLog = new LogDecomposition(data); String loginTime = deLog.getDate() + deLog.getTime(); System.out.println(loginTime); wifiProbe = new WifiProbe(new Time2Timestamp().change(loginTime), deLog.getAPName(data), deLog.getusrMac(data), deLog.getRssi(data)); } return wifiProbe.toString(); }); env.enableCheckpointing(5000); testStream.print(); env.execute("GET WIFIPROBE KEYS"); }