FlinkSQL源碼閱讀-schema管理

来源:https://www.cnblogs.com/0x12345678/archive/2020/06/22/13174976.html
-Advertisement-
Play Games

在Flink SQL中, 元數據的管理分為三層: catalog-> database-> table, 我們知道Flink SQL是依托calcite框架來進行SQL執行樹生產,校驗,優化等等, 所以本文講介紹FlinkSQL是如何來結合Calcite來進行元數據管理的. calcite開放的介面 ...


在Flink SQL中, 元數據的管理分為三層: catalog-> database-> table,
我們知道Flink SQL是依托calcite框架來進行SQL執行樹生產,校驗,優化等等, 所以本文講介紹FlinkSQL是如何來結合Calcite來進行元數據管理的.

calcite開放的介面

public interface Schema {
    Table getTable(String name);

    Schema getSubSchema(String name);

    ....
}

如介面所示, Schema介面,可以通過table名來獲得一張表, 可以通過schema名來獲得一個子schema.

public interface Table {
    RelDataType getRowType(RelDataTypeFactory typeFactory);
    ....
}

看Table的介面, 主要就是返回table的RelDataType.

Flink的相關實現

接下來,我們來看下Flink是如何實現這些介面的:

public class CatalogManagerCalciteSchema extends FlinkSchema {
	@Override
	public Schema getSubSchema(String schemaName) {
		if (catalogManager.schemaExists(name)) {
			return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
		} else {
			return null;
		}
	}
}

public class CatalogCalciteSchema extends FlinkSchema {
    @Override
    public Schema getSubSchema(String schemaName) {
        if (catalogManager.schemaExists(catalogName, schemaName)) {
            return new DatabasecalciteSchema(schemaName, catalogNmae, catalogManager, isStreamingMode);
        }
    }
}
public class DatabaseCalciteSchema extends FlinkSchema {
    private final String databaseName;
    private final String catalogName;
    private final CatalogManager catalogManager;

    @Override
    public Table getTable(String tableName) {
		ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);
		return catalogManager.getTable(identifier)
			.map(result -> {
				CatalogBaseTable table = result.getTable();
				FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier);
				return new CatalogSchemaTable(identifier,
					table,
					statistic,
					catalogManager.getCatalog(catalogName)
						.flatMap(Catalog::getTableFactory)
						.orElse(null),
					isStreamingMode,
					result.isTemporary());
			})
			.orElse(null);
    }

    @Override
    public Schema getSubSchema(String name) {
        return null;
    }
}

很容易發現,CatalogSchema返回DatabaseSchema, DatabaseSchema返回Table,
這樣就容易理解,Flink的三層結構是怎樣的了. 同時, 具體的元數據實際上都是在catalogManager中。

DatabaseSchema中返回的Table類型為CatalogSchemaTable,我們來看下具體的結結構是怎樣的,
上文中也提到了,Table介面主為getRowType函數, 用於返回某個table的type信息。
TableSchema是Flink內部用於保存各個欄位的類型信息的類, 通過相關的轉化函數,轉換為calcite的type類型.

public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
    
	private final ObjectIdentifier tableIdentifier;
	private final CatalogBaseTable catalogBaseTable;
	private final FlinkStatistic statistic;
	private final boolean isStreamingMode;
	private final boolean isTemporary;
    ...
	private static RelDataType getRowType(RelDataTypeFactory typeFactory,
			CatalogBaseTable catalogBaseTable,
			boolean isStreamingMode) {
		final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
		TableSchema tableSchema = catalogBaseTable.getSchema();
		final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
		if (!isStreamingMode
			&& catalogBaseTable instanceof ConnectorCatalogTable
			&& ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) {
			// If the table source is bounded, materialize the time attributes to normal TIMESTAMP type.
			// Now for ConnectorCatalogTable, there is no way to
			// deduce if it is bounded in the table environment, so the data types in TableSchema
			// always patched with TimeAttribute.
			// See ConnectorCatalogTable#calculateSourceSchema
			// for details.

			// Remove the patched time attributes type to let the TableSourceTable handle it.
			// We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
			// TODO: Fix FLINK-14844.
			for (int i = 0; i < fieldDataTypes.length; i++) {
				LogicalType lt = fieldDataTypes[i].getLogicalType();
				if (lt instanceof TimestampType
					&& (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
					|| ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
					int precision = ((TimestampType) lt).getPrecision();
					fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
				}
			}
		}
		return TableSourceUtil.getSourceRowType(flinkTypeFactory,
			tableSchema,
			scala.Option.empty(),
			isStreamingMode);
	}
}

CatalogBaseTable介面定義如下, Flink的Table的參數(schema參數,connector參數)都可以最終表示為一個map.

public interface CatalogBaseTable {
	/**
	 * Get the properties of the table.
	 *
	 * @return property map of the table/view
	 */
	Map<String, String> getProperties();

	/**
	 * Get the schema of the table.
	 *
	 * @return schema of the table/view.
	 */
	TableSchema getSchema();

	/**
	 * Get comment of the table or view.
	 *
	 * @return comment of the table/view.
	 */
	String getComment();

	/**
	 * Get a deep copy of the CatalogBaseTable instance.
	 *
	 * @return a copy of the CatalogBaseTable instance
	 */
	CatalogBaseTable copy();

	/**
	 * Get a brief description of the table or view.
	 *
	 * @return an optional short description of the table/view
	 */
	Optional<String> getDescription();

	/**
	 * Get a detailed description of the table or view.
	 *
	 * @return an optional long description of the table/view
	 */
	Optional<String> getDetailedDescription();
}

FlinkSchema的使用

上面都是的相關介面都是Flink用於適配calcite框架元數據的相關實現。
那麼這些類具體是在哪裡調用的? 已經什麼時候會被調用到?
calcite中的schema,主要是在validate過程中, 獲得對應table的欄位信息, 對應的function的返回值信息,
確保SQL的欄位名, 欄位類型是正確的.
類的依賴關係為:
validator ---> schemaReader ---> schema

FlinkPlannerImpl.scala中

  private def createSqlValidator(catalogReader: CatalogReader) = {
    val validator = new FlinkCalciteSqlValidator(
      operatorTable,
      catalogReader,
      typeFactory)
    validator.setIdentifierExpansion(true)
    // Disable implicit type coercion for now.
    validator.setEnableTypeCoercion(false)
    validator
  }

PlanningConfigurationBuilder.java

	private CatalogReader createCatalogReader(
			boolean lenientCaseSensitivity,
			String currentCatalog,
			String currentDatabase) {
		SqlParser.Config sqlParserConfig = getSqlParserConfig();
		final boolean caseSensitive;
		if (lenientCaseSensitivity) {
			caseSensitive = false;
		} else {
			caseSensitive = sqlParserConfig.caseSensitive();
		}

		SqlParser.Config parserConfig = SqlParser.configBuilder(sqlParserConfig)
			.setCaseSensitive(caseSensitive)
			.build();

		return new CatalogReader(
			rootSchema,
			asList(
				asList(currentCatalog, currentDatabase),
				singletonList(currentCatalog)
			),
			typeFactory,
			CalciteConfig.connectionConfig(parserConfig));
	}

綜上所訴, 我們就知道了Flink是如何來利用calcite的schema來管理Flink的table信息的.


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言:銳浪報表是一種中國式報表的報表開發工具。博主使用銳浪報表有一段時間了,積累了一些經驗希望能幫助你快速掌握並使用 第一章:集成項目 首先我們先去銳浪報表官網下載並安裝銳浪報表。 創建WPF應用程式。(C/S端使用銳浪報表基本都一樣) 添加銳浪報表的引用,在資源管理器目錄中找到引用並右鍵,點擊添加 ...
  • 這裡分享嵌入式領域有用有趣的項目/工具以及一些熱點新聞,農曆年分二十四節氣,希望在每個交節之日準時發佈一期。 ...
  • 一 REPLICATION CONTROLLERS 1.1 RC概述 RC確保pod指定數量的副本一直運行。如果pod被殺死或被管理員顯式刪除,複製控制器將自動部署相應的pod。類似地,如果運行的pod數量超過所需的數量,它會根據需要刪除pod,以匹配指定的副本計數。 RC的定義主要包括: 所需的副 ...
  • 安裝zsh 和 oh-my-zsh安裝zshyum install zsh安裝gityum install git切換預設shellchsh -s /bin/zshclone from GitHubgit clone git://github.com/robbyrussell/oh-my-zsh.g... ...
  • 我們都知道,Vim 是 Linux 下一種非常重要的文本編輯器,我們可以用它來看代碼、改代碼,很多高手直接將 Vim 打造成一款強大的 IDE 用來寫代碼。 但是,對於新手而言,Vim 相對於其它編輯器而言,更難入門。Vim 是無圖形界面的編輯器,一切操作全靠指令,而且指令又非常多非常雜,學習起來非 ...
  • Dockerfile介紹及常用指令,包括FROM,RUN,還提及了 COPY,ADD,EXPOSE,WORKDIR等,其實 Dockerfile 功能很強大,它提供了十多個指令。 ...
  • 一 持久存儲 1.1 持久存儲概述 預設情況下,運行容器使用容器內的臨時存儲。Pods由一個或多個容器組成,這些容器一起部署,共用相同的存儲和其他資源,可以在任何時候創建、啟動、停止或銷毀。使用臨時存儲意味著,當容器停止時,寫入容器內的文件系統的數據將丟失。 當容器在停止時也需要持久的保存數據時,O ...
  • 之前的兩篇文章 Nginx 變數介紹以及利用 Nginx 變數做防盜鏈 講的是 Nginx 有哪些變數以及一個常見的應用。那麼如此靈活的 Nginx 怎麼能不支持自定義變數呢,今天的文章就來說一下自定義變數的幾個模塊以及 Nginx 的 keepalive 特性。 通過映射新變數提供更多的可能性:m ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...