基於FLink實現的實時安全檢測(一段時間內連續登錄失敗20次後,下一次登錄成功場景)

来源:https://www.cnblogs.com/wxm2270/archive/2023/02/23/17147207.html
-Advertisement-
Play Games

研發背景 公司安全部目前針對內部系統的網路訪問日誌的安全審計,大部分都是T+1時效,每日當天,啟動Python編寫的定時任務,完成昨日的日誌審計和檢測,定時任務運行完成後,統一進行企業微信告警推送。這種方案在目前的網路環境和人員規模下,呈現兩個痛點,一是面對日益頻繁的網路攻擊、釣魚鏈接,T+1的定時 ...


研發背景

    公司安全部目前針對內部系統的網路訪問日誌的安全審計,大部分都是T+1時效,每日當天,啟動Python編寫的定時任務,完成昨日的日誌審計和檢測,定時任務運行完成後,統一進行企業微信告警推送。這種方案在目前的網路環境和人員規模下,呈現兩個痛點,一是面對日益頻繁的網路攻擊、釣魚鏈接,T+1的定時任務,難以及時進行告警,因此也難以有效避免如關鍵信息泄露等問題,二是目前以Python為主的單機定時任務,針對不同場景的處理時效,從一小時到十幾小時不等,效率低下。為解決以上問題,本人協助公司安全部同時對告警採集平臺進行改造,由之前的python單機任務處理,切換到基於Flink集群的並行處理,且告警推送時效,由之前的T+1天,提升到秒級實時告警。本次改造涉及網路日誌審計的多個常見場景,如埠掃描、黑名單統計、異常流量、連續惡意登錄等。本次以一段時間內連續登錄失敗20次後,下一次登錄成功場景來進行介紹。

場景描述

    針對一個內部系統,如郵件系統,公司員工的訪問行為日誌,存放於kafka,我們希望對於一個用戶賬號在同一個IP下,任意的3分鐘時間內,連續登錄郵件系統20次失敗,下一次登錄成功,這種場景能夠及時獲取並推送到企業微信某個指定的安全介面人。kafka中的數據,能夠通過某個關鍵字,區分當前網路訪問是否一次登錄事件,且有訪問時間(也就是事件時間)。在解析到符合需求的用戶賬號之後,第一時間進行企業微信告警推送,並將其這段時間內的訪問行為,寫入下游ElasticSearch。

組件版本

  • Flink-1.14.4
  • Java8
  • ElasticSearch-7.3.2
  • Kafka-2.12_2.8.1

日誌結構

IP和賬號皆為測試使用。

{
   "user": "wangxm",
   "client_ip": "110.68.6.182",
   "source": "login",
   "loginname": "[email protected]",
   "IP": "110.8.148.58",
   "timestamp": "17:58:12",
   "@timestamp": "2022-04-20T09:58:13.647Z",
   "ip": "110.7.231.25",
   "clienttype": "POP3",
   "result": "success",
   "@version": "1"
 }

 

技術方案

    上述場景,可考慮使用FlinkCEP及Flink的滑動視窗進行實現。由於本人在採用FlinkCEP的方案進行代碼編寫調試後,發現並不能滿足,因此改用滑動視窗進行實現。

 

關鍵代碼

主入口類

    主入口類,創建了flink環境、設置了基礎參數,創建了kafkaSource,接入消息後,進行了映射、過濾,並設置了水位線,進行了分組,之後設置了滑動視窗,在視窗內進行了事件統計,將複合條件的事件收集返回並寫入ElasticSearch。

    針對map、filter、keyBy、window等運算元,都單獨進行了編寫,後面會一一列出來。

 

package com.data.dev.flink.mailTopic.main;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.elasticsearch.ElasticSearchInfo;
import com.data.dev.elasticsearch.SinkToEs;
import com.data.dev.flink.FlinkEnv;
import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*;
import com.data.dev.kafka.KafkaSourceBuilder;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Duration;


/**
 * Flink處理在3分鐘內連續登錄失敗20次後登錄成功的場景
 * 採用滑動視窗來實現
 * @author wangxiaomin 2022-06-01
 */

@Slf4j
public class MailMsg extends BaseBean {

    /**
     * Flink作業名稱
     */
    public static final  String JobName = "告警採集平臺——連續登錄失敗後登錄成功告警";
    /**
     * Kafka消息名
     */
    public static final  String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic";

    public MailMsg(){
        log.info("初始化滑動視窗場景告警程式");
    }

    /**
     * 執行邏輯統計場景,實現告警推送
     */
    public static void execute(){


        //① 創建Flink執行環境並設置checkpoint等必要的參數
        StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();
        KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ;
        DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName);


        //② 篩選登錄消息,創建初始登錄事件流
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map運算元加工");
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter運算元加工");

        //③ 設置水位線
        WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime()));
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位線");

        //④ 設置主鍵
        KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector());

        //⑥ 轉化為滑動視窗
        WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L)));

        //⑦ 在視窗內進行邏輯統計
        SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs  = loginWindowDs.process(new WindowProcessFuncImpl()).name("視窗處理邏輯");

        //⑧ 將結果轉化為通用DataStream<String>格式
        SingleOutputStreamOperator<String> resultDs  = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("視窗結果轉化為標準格式");

        //⑨ 將最終結果寫入ES
        resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build());

        //⑩ 提交Flink集群進行執行
        FlinkEnv.envExec(env,JobName);

    }
}

 

mapper運算元

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  邏輯統計場景告警推送ES消息體
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

filter運算元

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction;


/**
 * ② 消費mail主題的消息,過濾其中login的事件
 * @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> {
    @Override
    public boolean filter(MailMsg mailMsg) {
        if("login".equals(mailMsg.getSource())) {
            log.info("篩選原始的login事件:【" + mailMsg + "】");
        }
        return "login".equals(mailMsg.getSource());
    }
}

keyBy運算元

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.functions.KeySelector;

/**
 * CEP 編程,需要進行key選取
 */
@Slf4j
public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> {
    @Override
    public String getKey(MailMsg mailMsg) {
        return mailMsg.getUser() + "@" + mailMsg.getClient_ip();
    }
}

視窗函數(核心代碼)

    這裡我們主要考慮使用一個事件列表,用來存儲每一個視窗期內得到的連續登錄,當檢測到登陸失敗的事件,即存入事件列表中,之後判斷下一次登錄失敗事件,如果檢測到登錄成功事件,但此時登錄失敗的次數不足20次,則清空loginEventList,等待下一次檢測。一旦符合視窗內連續登錄失敗超過20次且下一次登錄成功這個事件,則清空此時的loginEventList並將當前登錄成功的事件進行告警推送。

 

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.utils.HttpUtils;
import com.data.dev.utils.IPUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 *  滑動視窗內複雜事件解析邏輯實現
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class WindowProcessFuncImpl extends  ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable {
    @Override
    public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) {

        List<MailMsg> loginEventList = new ArrayList<>();
        MailMsgAlarm mailMsgAlarm;
        for (MailMsg mailMsg : iterable) {
            log.info("收集到的登錄事件【" + mailMsg + "】");
if (mailMsg.getResult().equals("fail")) { //開始檢測當前視窗內的事件,並將失敗的事件收集到loginEventList log.info("開始檢測當前視窗內的事件,並將失敗的事件收集到loginEventList"); loginEventList.add(mailMsg); } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果檢測到登錄成功事件,但此時登錄失敗的次數不足20次,則清空loginEventList,等待下一次檢測 log.info("檢測到登錄成功事件,但此時登錄失敗的次數為【" + loginEventList.size() + "】不足20次,清空loginEventList,等待下一次檢測"); loginEventList.clear(); } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) { mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg); log.info("檢測到登錄成功的事件,此時視窗內連續登錄失敗的次數為【" + mailMsgAlarm.getFailTimes() + "】"); //一旦符合視窗內連續登錄失敗超過20次且下一次登錄成功這個事件,則清空此時的loginEventList並將當前登錄成功的事件進行告警推送; loginEventList.clear(); doAlarmPush(mailMsgAlarm); collector.collect(mailMsgAlarm);//將當前登錄成功的事件進行收集上報 } else { log.info(mailMsg.getUser() + "當前已連續:【" + loginEventList.size() + "】 次登錄失敗"); } } } /** * 2022年6月17日15:03:06 * @param eventList:當前視窗內的事件列表 * @param eventCurrent:當前登錄成功的事件 * @return mailMsgAlarm:告警消息體 */ public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){ String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip(); String loginFailStartTime = eventList.get(0).getTimestamp_datetime(); String loginSuccessTime = eventCurrent.getTimestamp_datetime(); int loginFailTimes = eventList.size(); MailMsgAlarm mailMsgAlarm = new MailMsgAlarm(); mailMsgAlarm.setMailMsg(eventCurrent); mailMsgAlarm.setAlarmKey(alarmKey); mailMsgAlarm.setStartTime(loginFailStartTime); mailMsgAlarm.setEndTime(loginSuccessTime); mailMsgAlarm.setFailTimes(loginFailTimes); return mailMsgAlarm; } /** * 2022年6月17日14:47:53 * @param mailMsgAlarm :當前構建的需要告警的事件 */ public void doAlarmPush(MailMsgAlarm mailMsgAlarm){ String userKey = mailMsgAlarm.getAlarmKey(); String clientIp = mailMsgAlarm.mailMsg.getClient_ip(); boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp); if(isWhiteListIp){//如果是白名單IP,不告警 log.info("當前登錄用戶【" + userKey + "】屬於白名單IP"); }else { //IP歸屬查詢結果、企業微信推送告警 String user = HttpUtils.getUserByClientIp(clientIp); HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString()); } } }

最後一次map運算元

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  邏輯統計場景告警推送ES消息體
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

ElasticSearch工具類

package com.data.dev.elasticsearch;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.key.ElasticSearchKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 2022年6月17日15:15:06
 * @author wangxiaoming-ghq
 * Flink流計算結果寫入ES公共方法
 */
@Slf4j
public class SinkToEs extends BaseBean {
    public static final long serialVersionUID = 2L;
    private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps();
    private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST);
    private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD);
    private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME);
    private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT);

    /**
     * 2022年6月17日15:17:55
     * 獲取ES連接信息
     * @return esInfoMap:ES連接信息持久化
     */
    public static HashMap<String,String > getElasticSearchInfo(){
        log.info("獲取ES連接信息:【 " + "HOST="+HOST + "PORT="+PORT+"USERNAME="+USERNAME+"PASSWORD=********" + " 】");
        HashMap<String,String> esInfoMap = new HashMap<>();
        esInfoMap.put(ElasticSearchKey.HOST,HOST);
        esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD);
        esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME);
        esInfoMap.put(ElasticSearchKey.PORT,PORT);

        return esInfoMap;
    }

    /**
     * @param esIndexName:寫入索引名稱
     * @param esType:寫入索引類型
     * @return ElasticsearchSink.Builder<String>:構建器
     */
    public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){
        HashMap<String, String> esInfoMap = getElasticSearchInfo();
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), "http"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest() {
                        Map<String, String> json = new HashMap<>();
                        //log.info("寫入ES的data:【"+json+"】");
                        IndexRequest index  = Requests.indexRequest()
                                .index(esIndexName)
                                .type(esType)
                                .source(json);
                        return index;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest());
                    }
                }
        );


        //定義es的連接配置  帶用戶名密碼
        RestClientFactory restClientFactory = restClientBuilder -> {
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials(
                            String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)),
                            String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD))
                    )
            );
            restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                httpAsyncClientBuilder.disableAuthCaching();
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            });
        };

        esSinkBuilder.setRestClientFactory(restClientFactory);
        return esSinkBuilder;
    }

}

事件實體類

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;


/**
 * @author wangxiaoming-ghq 2022-05-15
 * 邏輯統計場景告警事件
 */
@Data
public class MailMsgAlarm extends BaseBean {


    /**
     * 當前登錄成功的事件
     */
   public  MailMsg mailMsg;

    /**
     * 當前捕獲的告警主鍵:username@client_ip
     */
   public  String alarmKey;

    /**
     * 第一次登錄失敗的事件時間
     */
   public  String startTime;

    /**
     * 連續登錄失敗後下一次登錄成功的事件時間
     */
   public  String endTime;

    /**
     * 連續登錄失敗的次數
     */
   public  int failTimes;

    @Override
    public String toString() {
        return "{" +
                "  'mailMsg_login_success':'" + mailMsg + "'" +
                ", 'alarmKey':'" + alarmKey + "'" +
                ", 'start_login_time_in3min':'"  +startTime + "'" +
                ", 'end_login_time_in3min':'"  +endTime + "'" +
                ", 'login_fail_times':'"  +failTimes +  "'" +
                "}";
    }

    public MailMsgAlarm() {
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsgAlarm)) return false;
        MailMsgAlarm that = (MailMsgAlarm) o;
        return getFailTimes() == that.getFailTimes() && getMailMsg().equals(that.getMailMsg()) && getAlarmKey().equals(that.getAlarmKey()) && getStartTime().equals(that.getStartTime()) && getEndTime().equals(that.getEndTime());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getMailMsg(), getAlarmKey(), getStartTime(), getEndTime(), getFailTimes());
    }
}

消息實體類

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;

/**
 * {
 *   "user": "wangxm",
 *   "client_ip": "110.68.6.182",
 *   "source": "login",
 *   "loginname": "[email protected]",
 *   "IP": "110.8.148.58",
 *   "timestamp": "17:58:12",
 *   "@timestamp": "2022-04-20T09:58:13.647Z",
 *   "ip": "110.7.231.25",
 *   "clienttype": "POP3",
 *   "result": "success",
 *   "@version": "1"
 * }
 *
 * user登錄用戶
 * client_ip 來源ip
 * source 類型
 * loginname 登錄用戶郵箱地址
 * ip 目標前端ip
 * timestamp 發送時間
 * &#064;timestamp  發送日期時間
 * IP 郵件日誌發送來源IP
 * clienttype 客戶端登錄類型
 * result 登錄狀態
 */

@Data
public class MailMsg extends BaseBean {
    public String user;
    public String client_ip;
    public String source;
    public String loginName;
    public String mailSenderSourceIp;
    public String timestamp_time;
    public String timestamp_datetime;
    public String ip;
    public String clientType;
    public String result;
    public String version;

    public MailMsg() {
    }

    public MailMsg(String user, String client_ip, String source, String loginName, String mailSenderSourceIp, String timestamp_time, String timestamp_datetime, String ip, String clientType, String result, String version) {
        this.user = user;
        this.client_ip = client_ip;
        this.source = source;
        this.loginName = loginName;
        this.mailSenderSourceIp = mailSenderSourceIp;
        this.timestamp_time = timestamp_time;
        this.timestamp_datetime = timestamp_datetime;
        this.ip = ip;
        this.clientType = clientType;
        this.result = result;
        this.version = version;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsg)) return false;
        MailMsg mailMsg = (MailMsg) o;
        return getUser().equals(mailMsg.getUser()) && getClient_ip().equals(mailMsg.getClient_ip()) && getSource().equals(mailMsg.getSource()) && getLoginName().equals(mailMsg.getLoginName()) && getMailSenderSourceIp().equals(mailMsg.getMailSenderSourceIp()) && getTimestamp_time().equals(mailMsg.getTimestamp_time()) && getTimestamp_datetime().equals(mailMsg.getTimestamp_datetime()) && getIp().equals(mailMsg.getIp()) && getClientType().equals(mailMsg.getClientType()) && getResult().equals(mailMsg.getResult()) && getVersion().equals(mailMsg.getVersion());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getUser(), getClient_ip(), getSource(), getLoginName(), getMailSenderSourceIp(), getTimestamp_time(), getTimestamp_datetime(), getIp(), getClientType(), getResult(), getVersion());
    }

    @Override
    public String toString() {
        return "{" +
                "  'user':'" + user + "'" +
                ", 'client_ip':'" + client_ip  + "'" +
                ", 'source':'" + source  + "'" +
                ", 'loginName':'" + loginName  + "'" +
                ", 'IP':'" + mailSenderSourceIp + "'" +
                ", 'timestamp':'" + timestamp_time + "'" +
                ", '@timestamp':'" + timestamp_datetime + "'" +
                ", 'ip':'"  + "'" +
                ", 'clientType':'" + clientType  + "'" +
                ", 'result':'" + result  + "'" +
                ", 'version':'" + version + "'" +
                "}";
    }

}

 源代碼已去掉敏感信息,地址:https://gitee.com/wangxm-2270/alarmCollectByFlink.git


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 應用場景 在軟體系統中,經常會需要將一些現成的對象放到新的環境中使用,但是新的環境要求的介面,是這些現存對象所不能滿足的。如何能利用現有的對象,又能滿足新的引用環境所需的介面? 適配器優點 更好的復用性。如果功能已經存在,只是介面不相容,通過適配器模式就可以讓這些功能得到更好的復用。 適配器缺點 由 ...
  • 1.說說你知道的ORM框架? 2.請問對EFCore有瞭解嗎? 3.說說EFCore查詢的性能調優小技巧? 4.EFCore 如果通過數據生成實體和DbContext? 5.說說對SaveChanges的理解? 6.說說對EFCore中EntityState的理解。? 7.說說什麼是導航屬性和引用屬 ...
  • 概述 面臨一個複雜對象的創建工作,通常由各個部分的子對象用一定的演算法構成。子部件(對象)比較多,對象不能當作一個完整的對象或者產品使用(郵件:發件人,收件人、抄送人、主題、郵件內容)子部件需要按照一定的順序賦值才有一定的意義,在某個子部件沒有賦值之前,另一個子部件就無法賦值。 類圖 註:該類圖來源網 ...
  • Excel 公式引用當前單元格左側單元格 引用當前單元格左側的第一個單元格:=OFFSET(INDIRECT(ADDRESS(ROW(), COLUMN())),0,-1)。 ROW()返回當前單元格的行號,COLUMN()返回當前單元格的列號。 ADDRESS函數可以根據指定行號和列號獲得工作表中 ...
  • # MySQL調優 ## 資料庫優化常見方案 1. 優化shema,sql語句+索引2. 加緩存,memcached,redis3. 主從複製,讀寫分離4. 垂直拆分5. 水平拆分 為了知道怎麼優化SQL,必須先清楚SQL的生命周期 ## SQL生命周期 1. 應用伺服器連接資料庫伺服器,建立一個T ...
  • 閱識風雲是華為雲信息大咖,擅長將複雜信息多元化呈現,其出品的一張圖(雲圖說)、深入淺出的博文(雲小課)或短視頻(雲視廳)總有一款能讓您快速上手華為雲。更多精彩內容請單擊此處。 摘要:Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式數據的能力。本文介 ...
  • SQL中的排序 使用關鍵字:ORDER BY ORDER BY 欄位名後使用ASC升序表示;使用DESC表示降序。 ORDER BY 後面可以使用列的別名進行排序(列的別名只能在ORDER BY中使用,不能再HWERE後使用) WHERE需要再FROM後,ORDER BY前聲明!! 多級排序,ORD ...
  • 1.1 技術發展 redis是用來解決性能問題的資料庫 技術的分類: 解決功能性問題:Java、Jsp、RDBMS、Tomcat、HTML、Linux、JDBC、SVN 解決擴展性問題:Struts、Spring、SpringMVC、Hibernate、Mybatis 解決性能問題:NoSQL、Ja ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...