Flink处理实时数据,有脏数据怎么办?
发布网友
发布时间:2022-11-18 15:48
我来回答
共1个回答
热心网友
时间:2024-12-02 11:03
场景描述:Flink在处理实时数据时,假如其中一条数据时脏数据,例如格式错误导致Json转换异常,字段缺少等等,这个时候该怎么处理呢?
解决办法:
这种问题在Spark Sql或者Flink Sql中,最常见的办法就是直接过滤掉。
在实际中,遇到的情况会非常多,则我们可以自定义一个UDF,这个UDF的作用就是用来处理null或者空字符串或者其他各种异常情况的。
官方案例:
在实际工作中,在利用env.addSource方法对接Kafka数据源后,会利用map方法将对应json串转成对象,所以会try catch,即
这样在遇到脏数据时,也不会因为json转换出错导致任务失败。