neo4j實現表欄位級血緣關係

来源:https://www.cnblogs.com/pandacode/archive/2023/08/22/17648359.html
-Advertisement-
Play Games

需求背景 需要在前端頁面展示當前表欄位的所有上下游血緣關係,以進一步做數據診斷治理。大致效果圖如下: 首先這裡解釋什麼是表欄位血緣關係,SQL 示例: CREATE TABLE IF NOT EXISTS table_b AS SELECT order_id, order_status FROM t ...


需求背景

需要在前端頁面展示當前表欄位的所有上下游血緣關係,以進一步做數據診斷治理。大致效果圖如下:

首先這裡解釋什麼是表欄位血緣關係,SQL 示例:

CREATE TABLE IF NOT EXISTS table_b
AS SELECT order_id, order_status FROM table_a;

如上 DDL 語句中,創建的 table_b 的 order_id 和 order_status 欄位來源於 table_a,代表table_a 就是 table_b 的來源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游欄位,它們之間就存在血緣關係。

INSERT INTO table_c
SELECT a.order_id, b.order_status
FROM table_a a JOIN table_b b ON a.order_id = b.order_id;

如上 DML 語句中,table_c 的 order_id 欄位來源於 table_a,而 order_status 來源於 table_b,表示 table_c 和 table_a、table_b 之間也存在血緣關係。

由上也可看出想要存儲血緣關係,還需要先解析 sql,這塊兒主要使用了開源項目 calcite 的解析器,這篇文章不再展開,本篇主要講如何存儲和如何展示

環境配置

參考另一篇:springboot 配置內嵌式 neo4j

Node 數據結構定義

因為要展示表的欄位之間的血緣關係,所以直接將表欄位作為圖節點存儲,表欄位之間的血緣關係就用圖節點之間的關係表示,具體 node 定義如下:

public class ColumnVertex {
  // 唯一鍵
  private String name;

  public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {
    this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;
  }

  public String getCatalogName() {
    return Long.parseLong(name.split("\\.")[0]);
  }

  public String getDatabaseName() {
    return name.split("\\.")[1];
  }

  public String getTableName() {
    return name.split("\\.")[2];
  }

  public String getColumnName() {
    return name.split("\\.")[3];
  }
}

通用 Service 定義

public interface EmbeddedGraphService {
    // 添加圖節點以及與上游節點之間的關係
    void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);
    // 尋找上游節點
    List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);
    // 尋找下游節點
    List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
}

Service 實現

import javax.annotation.Resource;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.springframework.stereotype.Service;

@Service
public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {

  @Resource private GraphDatabaseService graphDb;

  @Override
  public void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {
    try (Transaction tx = graphDb.beginTx()) {
      tx.execute(
          "MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"
              + " MERGE (u)-[:UPSTREAM]->(c)",
          Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));
      tx.commit();
    }
  }

  @Override
  public List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"
                  + " u.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }

  @Override
  public List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (c:ColumnVertex)-[:UPSTREAM]->(d:ColumnVertex) WHERE c.name = $name RETURN"
                  + " d.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }
}

遍歷圖節點

實現邏輯:

  1. restful 介面入參:當前表(catalogName, databaseName, tableName)
  2. 定義返回給前端的數據結構,採用 nodes 和 edges 方式返回,然後前端再根據節點與邊關係渲染出完整的血緣關係圖
public class ColumnLineageVO {
  List<ColumnLineageNode> nodes;
  List<ColumnLineageEdge> edges;
}

public class ColumnLineageNode {
  private String databaseName;
  private String tableName;
  private List<String> columnNames;
}

public class ColumnLineageEdge {
  private ColumnLineageEdgePoint source;
  private ColumnLineageEdgePoint target;
}

public class ColumnLineageEdgePoint {
  private String databaseName;
  private String tableName;
  private String columnName;
}
  1. 查詢表欄位
  2. 採用遞歸的方式,利用當前表欄位遍歷與當前表欄位關聯的所有上下游圖節點
  3. 將所有節點封裝成 List ColumnLineageVO 返回給前端
public ColumnLineageVO getColumnLineage(Table table) {
    ColumnLineageVO columnLineageVO = new ColumnLineageVO();
    List<ColumnLineageNode> nodes = new ArrayList<>();
    List<ColumnLineageEdge> edges = new ArrayList<>();
    // Deduplication
    Set<String> visitedNodes = new HashSet<>();
    Set<String> visitedEdges = new HashSet<>();
    Map<String, List<ColumnVertex>> upstreamCache = new HashMap<>();
    Map<String, List<ColumnVertex>> downstreamCache = new HashMap<>();

    ColumnLineageNode currentNode =
        ColumnLineageNode.builder()
            .databaseName(table.getDatabaseName())
            .tableName(table.getTableName())
            .type(TableType.EXTERNAL_TABLE.getDesc())
            .build();
    nodes.add(currentNode);
    visitedNodes.add(currentNode.getDatabaseName() + "." + currentNode.getTableName());

    for (String columnName : table.getColumnNames()) {
      ColumnVertex currentVertex =
          new ColumnVertex(
              table.getScriptId(), table.getDatabaseName(), table.getTableName(), columnName);
      traverseUpstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, upstreamCache);
      traverseDownstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, downstreamCache);
    }

    columnLineageVO.setNodes(nodes);
    columnLineageVO.setEdges(edges);
    return columnLineageVO;
  }

private void traverseUpstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> upstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      upstreamVertices = cache.get(currentVertex.getName());
    } else {
      upstreamVertices = embeddedGraphService.findUpstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), upstreamVertices);
    }
    for (ColumnVertex upstreamVertex : upstreamVertices) {
      String nodeKey = upstreamVertex.getDatabaseName() + "." + upstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode upstreamNode =
            ColumnLineageNode.builder()
                .databaseName(upstreamVertex.getDatabaseName())
                .tableName(upstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(upstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          upstreamVertex.getDatabaseName()
              + upstreamVertex.getTableName()
              + upstreamVertex.getColumnName()
              + currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(upstreamVertex, currentVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseUpstreamColumnVertex(upstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }
  
private void traverseDownstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> downstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      downstreamVertices = cache.get(currentVertex.getName());
    } else {
      downstreamVertices = embeddedGraphService.findDownstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), downstreamVertices);
    }
    for (ColumnVertex downstreamVertex : downstreamVertices) {
      String nodeKey = downstreamVertex.getDatabaseName() + "." + downstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode downstreamNode =
            ColumnLineageNode.builder()
                .databaseName(downstreamVertex.getDatabaseName())
                .tableName(downstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(downstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName()
              + downstreamVertex.getDatabaseName()
              + downstreamVertex.getTableName()
              + downstreamVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(currentVertex, downstreamVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseDownstreamColumnVertex(
          downstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }

本文來自博客園,作者:這個殺手冷死了,轉載請註明原文鏈接:https://www.cnblogs.com/pandacode/p/17648359.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • # 應用場景 * 用戶下單5分鐘後,給他發簡訊 * 用戶下單30分鐘後,如果用戶不付款就自動取消訂單 # kafka無死信隊列 kafka本身沒有這種延時隊列的機制,像rabbitmq有自己的死信隊列,當一些消息在一定時間不消費時會發到死信隊列,由死信隊列來處理它們,上面的兩個需求如果是rabbit ...
  • 本文翻譯自國外論壇 medium,原文地址:https://levelup.gitconnected.com/how-i-deleted-more-than-1000-lines-of-code-using-spring-retry-9118de29060 > 使用 Spring Retry 重構代 ...
  • 通過一張圖描述清楚TuGraph Analytics的整體架構和關鍵設計,幫助大家快速瞭解TuGraph Analytics項目輪廓。 ...
  • ![](https://img2023.cnblogs.com/other/1218593/202308/1218593-20230822164212978-1679813836.png) ### 背景 有時候我們需要進行遠程的debug,本文研究如何進行遠程debug,以及使用 IDEA 遠程de ...
  • ## 1 概要 通過引入結構化併發編程的API,簡化併發編程。結構化併發將在不同線程中運行的相關任務組視為單個工作單元,從而簡化錯誤處理和取消操作,提高可靠性,並增強可觀察性。這是一個預覽版的API。 ## 2 歷史 結構化併發是由JEP 428提出的,併在JDK 19中作為孵化API發佈。它在JD ...
  • [TOC] ## 一、mall開源項目 ### 1.1 來源 **mall學習教程**,架構、業務、技術要點全方位解析。mall項目(**50k+star**)是一套電商系統,使用現階段主流技術實現。涵蓋了SpringBoot 2.3.0、MyBatis 3.4.6、Elasticsearch 7. ...
  • 函數是任何一門高級語言中必須要存在的,使用函數式編程可以讓程式可讀性更高,充分發揮了模塊化設計思想的精髓,今天我將帶大家一起來探索函數的實現機理,探索編譯器到底是如何對函數這個關鍵字進行實現的,並使用彙編語言模擬實現函數編程中的參數傳遞調用規範等。說到函數我們必須要提起調用約定這個名詞,而調用約定離... ...
  • 本文是區塊鏈瀏覽器系列的第四篇。 在[上一篇文章](https://mengbin.top/2023-08-13-blockBrowser/)介紹如何解析區塊數據時,使用`session`對客戶端上傳的pb文件進行區分,到期後自動刪除。 在這片文章中,會著重介紹下認證系統的實現,主要分為三部分: - ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...