Flinkx local模式启动过程

    科技2025-10-06  8

    1. flinkx脚本中的最后一行调用启动类,并传参

    nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &

    上述命令会调用com.dtstack.flinkx.Launcher类中的main方法,在main函数中,根据传入的 mode参数值为local,从而调用com.dtstack.flinkx.Main类中的main方法,启动程序

     

    2. main方法中解析传递的args参数

    2.1 解析jobPath指定的任务配置文件

    解析参数 -job 执行的任务json文件中的内容,返回 DataTransferConfig 对象

    // 解析jobPath指定的任务配置文件 DataTransferConfig config = DataTransferConfig.parse(job);

    参数job是json配置文件的json字符串内容,如下:

    注意:在本地IDEA中直接调用main方法时,需要手动对json文件内容进行修复(直接通过调用flinkx启动脚本,则不需要手动修复执行下面的两行),如下通过Launcher类启动中的两行代码(Flinkx 启动类 Launcher 运行流程解析),就是对json内容进行修复。

    否则会在parse方法中出现异常

    Exception in thread "main" com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Unexpected value at line 1 column 332 path $..setting.restore.restoreColumnName at com.google.gson.Gson.fromJson(Gson.java:902) at com.google.gson.Gson.fromJson(Gson.java:852) at com.google.gson.Gson.fromJson(Gson.java:801) at com.google.gson.Gson.fromJson(Gson.java:773) at com.dtstack.flinkx.config.DataTransferConfig.parse(DataTransferConfig.java:104) at com.dtstack.flinkx.Main.main(Main.java:90) Caused by: com.google.gson.stream.MalformedJsonException: Unexpected value at line 1 column 332 path $..setting.restore.restoreColumnName at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1559) at com.google.gson.stream.JsonReader.doPeek(JsonReader.java:564) at com.google.gson.stream.JsonReader.peek(JsonReader.java:425) at com.google.gson.internal.bind.ObjectTypeAdapter.read(ObjectTypeAdapter.java:55) at com.google.gson.internal.bind.ObjectTypeAdapter.read(ObjectTypeAdapter.java:70) at com.google.gson.internal.bind.ObjectTypeAdapter.read(ObjectTypeAdapter.java:70) at com.google.gson.internal.bind.ObjectTypeAdapter.read(ObjectTypeAdapter.java:70) at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.read(TypeAdapterRuntimeTypeWrapper.java:41) at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:187) at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:145) at com.google.gson.Gson.fromJson(Gson.java:887) ... 5 more

     

    在DataTransferConfig类的parse方法中,将json文件解析为Map<String,Object>对象,并校验配置文件

     

    2.2 执行DataTransferConfig类的构造方法

    在构造方法中,完成两件事情

    (1)调用父类构造方法

    (2)创建JobConfig对象

    public DataTransferConfig(Map<String, Object> map) { super(map); job = new JobConfig((Map<String, Object>) map.get("job")); }

    DataTransferConfig集成抽象父类 AbstractConfig,父类主要有一个internalMap,类型为 Map<String,Object>,同时实现了一些 setXXX、getXXX,便于根据key设置或者获取参数值。

    Processed: 0.009, SQL: 9