# EventListenerProvider初始 keycloak提供的事件處理機制,可以通過實現EventListenerProvider介面來實現自定義的事件處理邏輯。在keycloak啟動時,會通過ServiceLoader機制載入所有的EventListenerProvider實現類,並將 ...
EventListenerProvider初始
keycloak提供的事件處理機制,可以通過實現EventListenerProvider介面來實現自定義的事件處理邏輯。在keycloak啟動時,會通過ServiceLoader機制載入所有的EventListenerProvider實現類,並將其註冊到keycloak的事件處理機制中。
- 構造方法,在每個keycloak後臺操作時,它都會重新構建實例
- OnEvent方法,在事件發生時執行,不會出現類載入問題,因為這樣類已經被載入了
EventListenerProviderFactory
EventListenerProviderFactory是進行事件處理器的生產工廠,用於創建EventListenerProvider實例。在keycloak啟動時,會通過ServiceLoader機制載入所有的EventListenerProviderFactory實現類,並將其註冊到keycloak的事件處理機制中。
- init方法:keycloak啟動時會執行,用於初始化EventListenerProviderFactory實例,可以在此方法中進行一些初始化操作。
- postInit方法:keycloak啟動時會執行,在init方法之後,會執行這個方法
- create方法:在kc後臺開啟這個EventListenerProviderFactory之後,每次請求都會執行這個create方法,對於它生產的provider對象,可能考慮使用單例的方式, 避免每次請求都創建一個新的對象
- close方法:在keycloak程式關閉後或者當前事件被註冊時,這個方法才會執行
問題
- 問題描述:在EventListenerProviderFactory的init方法中,通過kafka發送消息,會出現類載入問題,因為在keycloak啟動時,kafka的類的載入器還沒有被載入,所以會出現類載入問題。
- 解決:需要將類載入器這塊,修改成當前類載入器去載入對應的文件,如下代碼解決了類無法載入的問題
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
try {
this.executorService = Executors.newFixedThreadPool(2);
Properties kafkaProperties = new Properties();
kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ConfigFactory.getInstance().getStrPropertyValue("kafka.host"));
kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "kc-ListenerProviderFactory");
kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
kafkaProperties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
kafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
kafkaProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
kafkaProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
// 需要使用當前類載入器,否則會出現無法載入StringDeserializer的情況
Class<?> stringDeserializerClass =
getClass().getClassLoader().loadClass("org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
this.kafkaConsumerAdd = new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_add");
executorService.submit(kafkaConsumerAdd);
this.kafkaConsumerRemove=new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_remove");
executorService.submit(kafkaConsumerRemove);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
- 對於kafka-clients實現消費者的話,代碼還是比較簡單的
public class KcKafkaConsumer implements Runnable {
private static final Logger logger = Logger.getLogger(ConfigFactory.class);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KeycloakSessionFactory keycloakSessionFactory;
private KafkaConsumer<String, String> kafkaConsumer;
public KcKafkaConsumer(KeycloakSessionFactory keycloakSessionFactory, Properties properties, String topic)
throws ClassNotFoundException {
this.keycloakSessionFactory = keycloakSessionFactory;
this.kafkaConsumer = new KafkaConsumer<>(properties);
this.kafkaConsumer.subscribe(Collections.singleton(topic));
}
@Override
public void run() {
try {
while (!closed.get()) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
// 處理Kafka消息
for (ConsumerRecord<String, String> record : records) {
System.out.println("Topic:" + record.topic() + ",Received message: " + record.value());
//TODO: 處理Kafka消息的具體邏輯
}
}
} finally {
kafkaConsumer.close();
}
}
public void shutdown() {
closed.set(true);
kafkaConsumer.close();
}
}
作者:倉儲大叔,張占嶺,
榮譽:微軟MVP
QQ:853066980
支付寶掃一掃,為大叔打賞!