需求背景 需要在前端頁面展示當前表欄位的所有上下游血緣關係,以進一步做數據診斷治理。大致效果圖如下: 首先這裡解釋什麼是表欄位血緣關係,SQL 示例: CREATE TABLE IF NOT EXISTS table_b AS SELECT order_id, order_status FROM t ...
需求背景
需要在前端頁面展示當前表欄位的所有上下游血緣關係,以進一步做數據診斷治理。大致效果圖如下:
首先這裡解釋什麼是表欄位血緣關係,SQL 示例:
CREATE TABLE IF NOT EXISTS table_b
AS SELECT order_id, order_status FROM table_a;
如上 DDL 語句中,創建的 table_b 的 order_id 和 order_status 欄位來源於 table_a,代表table_a 就是 table_b 的來源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游欄位,它們之間就存在血緣關係。
INSERT INTO table_c
SELECT a.order_id, b.order_status
FROM table_a a JOIN table_b b ON a.order_id = b.order_id;
如上 DML 語句中,table_c 的 order_id 欄位來源於 table_a,而 order_status 來源於 table_b,表示 table_c 和 table_a、table_b 之間也存在血緣關係。
由上也可看出想要存儲血緣關係,還需要先解析 sql,這塊兒主要使用了開源項目 calcite 的解析器,這篇文章不再展開,本篇主要講如何存儲和如何展示
環境配置
參考另一篇:springboot 配置內嵌式 neo4j
Node 數據結構定義
因為要展示表的欄位之間的血緣關係,所以直接將表欄位作為圖節點存儲,表欄位之間的血緣關係就用圖節點之間的關係表示,具體 node 定義如下:
public class ColumnVertex {
// 唯一鍵
private String name;
public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {
this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;
}
public String getCatalogName() {
return Long.parseLong(name.split("\\.")[0]);
}
public String getDatabaseName() {
return name.split("\\.")[1];
}
public String getTableName() {
return name.split("\\.")[2];
}
public String getColumnName() {
return name.split("\\.")[3];
}
}
通用 Service 定義
public interface EmbeddedGraphService {
// 添加圖節點以及與上游節點之間的關係
void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);
// 尋找上游節點
List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);
// 尋找下游節點
List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
}
Service 實現
import javax.annotation.Resource;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.springframework.stereotype.Service;
@Service
public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {
@Resource private GraphDatabaseService graphDb;
@Override
public void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {
try (Transaction tx = graphDb.beginTx()) {
tx.execute(
"MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"
+ " MERGE (u)-[:UPSTREAM]->(c)",
Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));
tx.commit();
}
}
@Override
public List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {
List<ColumnVertex> result = new ArrayList<>();
try (Transaction tx = graphDb.beginTx()) {
Result queryResult =
tx.execute(
"MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"
+ " u.name AS name",
Map.of("name", currentVertex.getName()));
while (queryResult.hasNext()) {
Map<String, Object> row = queryResult.next();
result.add(new ColumnVertex().setName((String) row.get("name")));
}
tx.commit();
}
return result;
}
@Override
public List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {
List<ColumnVertex> result = new ArrayList<>();
try (Transaction tx = graphDb.beginTx()) {
Result queryResult =
tx.execute(
"MATCH (c:ColumnVertex)-[:UPSTREAM]->(d:ColumnVertex) WHERE c.name = $name RETURN"
+ " d.name AS name",
Map.of("name", currentVertex.getName()));
while (queryResult.hasNext()) {
Map<String, Object> row = queryResult.next();
result.add(new ColumnVertex().setName((String) row.get("name")));
}
tx.commit();
}
return result;
}
}
遍歷圖節點
實現邏輯:
- restful 介面入參:當前表(catalogName, databaseName, tableName)
- 定義返回給前端的數據結構,採用 nodes 和 edges 方式返回,然後前端再根據節點與邊關係渲染出完整的血緣關係圖
public class ColumnLineageVO {
List<ColumnLineageNode> nodes;
List<ColumnLineageEdge> edges;
}
public class ColumnLineageNode {
private String databaseName;
private String tableName;
private List<String> columnNames;
}
public class ColumnLineageEdge {
private ColumnLineageEdgePoint source;
private ColumnLineageEdgePoint target;
}
public class ColumnLineageEdgePoint {
private String databaseName;
private String tableName;
private String columnName;
}
- 查詢表欄位
- 採用遞歸的方式,利用當前表欄位遍歷與當前表欄位關聯的所有上下游圖節點
- 將所有節點封裝成 List ColumnLineageVO 返回給前端
public ColumnLineageVO getColumnLineage(Table table) {
ColumnLineageVO columnLineageVO = new ColumnLineageVO();
List<ColumnLineageNode> nodes = new ArrayList<>();
List<ColumnLineageEdge> edges = new ArrayList<>();
// Deduplication
Set<String> visitedNodes = new HashSet<>();
Set<String> visitedEdges = new HashSet<>();
Map<String, List<ColumnVertex>> upstreamCache = new HashMap<>();
Map<String, List<ColumnVertex>> downstreamCache = new HashMap<>();
ColumnLineageNode currentNode =
ColumnLineageNode.builder()
.databaseName(table.getDatabaseName())
.tableName(table.getTableName())
.type(TableType.EXTERNAL_TABLE.getDesc())
.build();
nodes.add(currentNode);
visitedNodes.add(currentNode.getDatabaseName() + "." + currentNode.getTableName());
for (String columnName : table.getColumnNames()) {
ColumnVertex currentVertex =
new ColumnVertex(
table.getScriptId(), table.getDatabaseName(), table.getTableName(), columnName);
traverseUpstreamColumnVertex(
currentVertex, nodes, edges, visitedNodes, visitedEdges, upstreamCache);
traverseDownstreamColumnVertex(
currentVertex, nodes, edges, visitedNodes, visitedEdges, downstreamCache);
}
columnLineageVO.setNodes(nodes);
columnLineageVO.setEdges(edges);
return columnLineageVO;
}
private void traverseUpstreamColumnVertex(
ColumnVertex currentVertex,
List<ColumnLineageNode> nodes,
List<ColumnLineageEdge> edges,
Set<String> visitedNodes,
Set<String> visitedEdges,
Map<String, List<ColumnVertex>> cache) {
List<ColumnVertex> upstreamVertices;
if (cache.containsKey(currentVertex.getName())) {
upstreamVertices = cache.get(currentVertex.getName());
} else {
upstreamVertices = embeddedGraphService.findUpstreamColumnVertex(currentVertex);
cache.put(currentVertex.getName(), upstreamVertices);
}
for (ColumnVertex upstreamVertex : upstreamVertices) {
String nodeKey = upstreamVertex.getDatabaseName() + "." + upstreamVertex.getTableName();
if (!visitedNodes.contains(nodeKey)) {
ColumnLineageNode upstreamNode =
ColumnLineageNode.builder()
.databaseName(upstreamVertex.getDatabaseName())
.tableName(upstreamVertex.getTableName())
.type(TableType.EXTERNAL_TABLE.getDesc())
.build();
nodes.add(upstreamNode);
visitedNodes.add(nodeKey);
}
String edgeKey =
upstreamVertex.getDatabaseName()
+ upstreamVertex.getTableName()
+ upstreamVertex.getColumnName()
+ currentVertex.getDatabaseName()
+ currentVertex.getTableName()
+ currentVertex.getColumnName();
if (!visitedEdges.contains(edgeKey)) {
ColumnLineageEdge edge = createEdge(upstreamVertex, currentVertex);
edges.add(edge);
visitedEdges.add(edgeKey);
}
traverseUpstreamColumnVertex(upstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
}
}
private void traverseDownstreamColumnVertex(
ColumnVertex currentVertex,
List<ColumnLineageNode> nodes,
List<ColumnLineageEdge> edges,
Set<String> visitedNodes,
Set<String> visitedEdges,
Map<String, List<ColumnVertex>> cache) {
List<ColumnVertex> downstreamVertices;
if (cache.containsKey(currentVertex.getName())) {
downstreamVertices = cache.get(currentVertex.getName());
} else {
downstreamVertices = embeddedGraphService.findDownstreamColumnVertex(currentVertex);
cache.put(currentVertex.getName(), downstreamVertices);
}
for (ColumnVertex downstreamVertex : downstreamVertices) {
String nodeKey = downstreamVertex.getDatabaseName() + "." + downstreamVertex.getTableName();
if (!visitedNodes.contains(nodeKey)) {
ColumnLineageNode downstreamNode =
ColumnLineageNode.builder()
.databaseName(downstreamVertex.getDatabaseName())
.tableName(downstreamVertex.getTableName())
.type(TableType.EXTERNAL_TABLE.getDesc())
.build();
nodes.add(downstreamNode);
visitedNodes.add(nodeKey);
}
String edgeKey =
currentVertex.getDatabaseName()
+ currentVertex.getTableName()
+ currentVertex.getColumnName()
+ downstreamVertex.getDatabaseName()
+ downstreamVertex.getTableName()
+ downstreamVertex.getColumnName();
if (!visitedEdges.contains(edgeKey)) {
ColumnLineageEdge edge = createEdge(currentVertex, downstreamVertex);
edges.add(edge);
visitedEdges.add(edgeKey);
}
traverseDownstreamColumnVertex(
downstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
}
}
本文來自博客園,作者:這個殺手冷死了,轉載請註明原文鏈接:https://www.cnblogs.com/pandacode/p/17648359.html