近期的flink作業中,需要對上傳的日誌數據進行大量的校驗。 校驗規則大多比較簡單,僅為字元串長度,數組長度,數據的最大值和最小值,非空判斷等。然而不想寫諸多校驗代碼,容易導致代碼又醜又繁瑣。聯想SpringBoot項目中的參數校驗,於是想著在純maven的項目中引入校驗。 引入依賴 SpringB ...
近期的flink作業中,需要對上傳的日誌數據進行大量的校驗。
校驗規則大多比較簡單,僅為字元串長度,數組長度,數據的最大值和最小值,非空判斷等。然而不想寫諸多校驗代碼,容易導致代碼又醜又繁瑣。聯想SpringBoot項目中的參數校驗,於是想著在純maven的項目中引入校驗。
引入依賴
SpringBoot的基本參數校驗是基於Hibernate Validator實現的,因此在pom中引入以下依賴:
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.1.0.Final</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.1-b11</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator-cdi</artifactId>
<version>6.1.0.Final</version>
</dependency>
添加註解
在需要驗證的實體類中引入校驗註解(不得不說,註解真香).
public class LogEvent {
@NotNull
private Instant timestamp;
@NotNull
private String filepath;
@NotNull
@Length(min = 1, max = 64)
private String region;
@NotNull
private Boolean status;
@NotNull
@Min(-1)
@Max(60 * 1000)
private Integer timeDelay;
@NotNull
@Length(min = 1, max = 64)
private String target;
@Length(max = 1024)
private String message;
@Size(max = 5)
private List<String> tags;
}
參數校驗
因為Validator是thread safe
實現,因此多線程中可以放心的使用。
@Slf4j
public class LogEventUtil {
// thread safe
private static final Validator VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator();
public static boolean validate(LogEvent event) {
Set<ConstraintViolation<LogEvent>> constraintViolations = VALIDATOR.validate(event);
if (!constraintViolations.isEmpty()) {
return false;
}
// 此處省略若幹複雜的校驗規則(臟活不可能一點都不接觸的)
}
}
通過VALIDATOR.validate即可實現對LogEvent的基本校驗。
flink作業引用
寥寥幾筆,即完成數據讀取以及校驗。
private static DataStream<LogEvent> configureKafkaConsumer(final StreamExecutionEnvironment env, ParameterTool parameterTool) {
String topic = parameterTool.get("kafka.topic", "");
Properties kafkaProperties = filterPrefix(parameterTool.getProperties(), "kafka");
return env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProperties))
.map((MapFunction<String, LogEvent>) LogEventUtil::parseLogEvent)
.filter((FilterFunction<LogEvent>) LogEventUtil::validate)
.name("LogEventSourceStream")
.returns(LogEvent.class);
}