Catalog(目錄)提供了關于數據庫、表格和訪問數據所需的信息的元數據,以及統一的 API 來管理元數據,驗證連接,讓元數據對 Sources(數據源)、Sinks(數據匯)和 Web 可訪問。
Catalog 讓用戶能夠引用其數據系統中的現有元數據,并自動映射到 SeaTunnel 的對應元數據。總之,Catalog 大大簡化了使用用戶現有系統開始使用 SeaTunnel 的步驟,并顯著增強了用戶體驗。
Catalog 功能的重要性
目前,許多現有功能都是基于 Catalog 實現的,例如 CDC(變更數據捕獲)多表同步功能,我們使用 Catalog 獲取表格和字段列表。
【資料圖】
Apache SeaTunnel 目前正在設計一個叫做 SaveMode 的功能,它是由連接器實現的,用于支持目標表中現有表格結構和數據的處理。這些功能也是基于 Catalog 實現的。
Catalog 是如何設計的?如何實現一個新的 Catalog?以下是詳細介紹。
Catalog API
初始化操作
注意:目錄名稱目前沒有被使用,預計會提供給 Web 后端進行保存和查詢。
Javapublic interface CatalogFactory extends Factory { String factoryIdentifier(); OptionRule optionRule(); Catalog createCatalog(String catalogName, ReadonlyConfig options); } public interface Catalog extends AutoCloseable { void open() throws CatalogException; void close() throws CatalogException; }
數據庫操作
javapublic interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 數據庫 // -------------------------------------------------------------------------------------------- String getDefaultDatabase() throws CatalogException; boolean databaseExists(String databaseName) throws CatalogException; List listDatabases() throws CatalogException; void createDatabase(String databaseName, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException; void dropDatabase(String databaseName, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException; }
表格操作
javapublic interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 表格 // -------------------------------------------------------------------------------------------- List listTables(String databaseName) throws CatalogException, DatabaseNotExistException; boolean tableExists(TablePath tablePath) throws CatalogException; CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException; void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; }
這里是一個已經實現的示例。
MySQL Catalog
MySQL Catalog 的使用方式:
- username [String] 連接到數據庫服務器時要使用的數據庫名稱。
- password [String] 連接到數據庫服務器時要使用的密碼。
- base-url [String] URL 必須包含數據庫,例如 "jdbc:mysql://localhost:5432/db" 或 "jdbc:mysql://localhost:5432/db?useSSL=true"。
- table-names [List] 要捕獲的數據庫表格名稱列表。表格名稱需要包括數據庫名稱,例如:database_name.table_name。
- database-pattern [String] 要捕獲的數據庫名稱的正則表達式。
- table-pattern [String] 要捕獲的數據庫表格名稱的正則表達式。表格名稱需要包括數據庫名稱,例如:database_.\.table_.。
配置文件配置
conf[source/sink] { [connector-factory-id] { catalog { factory = "MySQL" username = "test" password = "123456" base-url = "jdbc:mysql://localhost:5432/db" table-names = [ "db.table" ] } } }
如何使用 Catalog
對于支持 Catalog 的連接器,我們將打開一個 Catalog 參數來配置所使用的 Catalog:
示例
sqlenv { "job.mode"=STREAMING "job.name"="cdc_mysql_to_mysql" "checkpoint.interval"="2000" "custom_parameters"="" } source { MySQL-CDC { parallelism = 1 catalog { factory = "MySQL" # 默認情況下,Catalog 將使用與連接器同名的選項 } username = "mysqluser" password = "mysqlpw" database-names = ["seatunnel-test"] table-pattern = "seatunnel-test\\.orders_\\d+" base-url = "jdbc:mysql://localhost:54508/seatunnel-test" } } sink { jdbc { url = "jdbc:mysql://localhost:4000/test" driver = "com.mysql.cj.jdbc.Driver" catalog { factory = "MySQL" username = "root" password = "" base-url = "jdbc:mysql://localhost:4000/test" table-pattern = "seatunnel-test2\\.orders_\\d+" } user = "root" password = "" query = "insert into sink(age, name) values(?,?)" } }
未來規劃
目前,我們只實現了部分 Catalog。未來,我們計劃擴大 Catalog 的實現范圍,包括更多支持 Catalog 的連接器,這將使更多的連接器支持 SaveMode 和自動表格創建等功能。
Apache SeaTunnel 是一個分布式、高性能、易擴展、用于海量數據(離線&實時)同步和轉化的數據集成平臺
倉庫地址: https://github.com/apache/seatunnel
網址:https://seatunnel.apache.org/
Proposal:https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal
Apache SeaTunnel 下載地址:https://seatunnel.apache.org/download
衷心歡迎更多人加入!
我們相信,在「Community Over Code」(社區大于代碼)、「Open and Cooperation」(開放協作)、「Meritocracy」(精英管理)、以及「多樣性與共識決策」等 The Apache Way 的指引下,我們將迎來更加多元化和包容的社區生態,共建開源精神帶來的技術進步!
我們誠邀各位有志于讓本土開源立足全球的伙伴加入 SeaTunnel 貢獻者大家庭,一起共建開源!
- 提交問題和建議:https://github.com/apache/seatunnel/issues
- 貢獻代碼: https://github.com/apache/seatunnel/pulls
- 訂閱社區開發郵件列表 : dev-subscribe@seatunnel.apache.org
- 開發郵件列表:dev@seatunnel.apache.org
- 加入 Slack: https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
- 關注 Twitter: https://twitter.com/ASFSeaTunnel
本文由 白鯨開源 提供發布支持!

- 業之峰618年中大促:送給老爸的父親節禮物!6月18日,又逢一年一度的父親節,業之峰裝飾集團祝全天下的老...
- 全球醫療投資實踐導語:醫療行業專業度強,投資門檻高,全球醫療投資難度更加...
- QDII基金火了,這只基金過去一年業績同類前三不知不覺中,QDII基金火了一把。數據顯示,萬得QDII基金指數...
- 業之峰618年中大促倒計時,自營家裝4.0首次加入!自從今年4月22日發布以來,自營家裝4 0還是第一次參加業之峰...
- 尊享8大裝修特權!業之峰618年中大促即將舉行要想享受業之峰裝飾高品質的家裝服務,什么時候最合適?答案...
- visa信用卡是什么?不出國visa信用卡有什么用?
2023-06-16 16:10:22
- 跌停能賣出嗎?漲停跌停還能交易嗎?
2023-06-14 15:46:11
- 按揭轉抵押有什么優缺點?按揭轉抵押有什么風險?
2023-06-09 16:31:22
- 什么是除權價?股票前復權和不復權哪個準確?
2023-06-02 16:50:36
- 股票XD會持續幾天?xd股票分紅到股票賬戶嗎?
2023-06-01 16:50:07