欢迎光临

我们一直在努力
当前位置:首页 > 互联网 >

开源数据血缘和元数据管理框架DataHub的血缘摄取 V0.12.1版本

日期:
后台-插件-广告管理-首页/栏目/内容广告位一(PC)
后台-插件-广告管理-首页/栏目/内容广告位一(手机)

DataHUb的安装很简单:你有绿色上网就soeasy

前置条件,你已经运行好DataHub整个Docker-Compse服务

打开地址:http://host:9002/ 输入账号DataHub 密码DataHub

查看框架运行路线轨迹

第一步 源数据获取

(1.1)点击这里

(1.2)点击这里

(1.3)点击这里{选择数据源的类型}:以MYSQL示例

(1.4)点击这里{填写数据来源的基本信息}:

方式一:图形窗口填写

方式二:yaml配置填写

切换方式

(1.5)点击这里{数据源的配置填充}:不填充下面的filter默认会把整个数据库的表抓取过来;填充了可以按需抓取

(1.6)点击next{继续配置}:勾上开关继续下一步

[!--empirenews.page--]

(1.7)填写名称{跑起来}:带RUN按钮的

(1.8)漫长的等来{数据源的摄取:时间和你数据库的数据表多少有关}:会有两种结果

(1.9)失败了可以点击红蛇的Fial;查看执行日志日志

(2.0)继续完成配置有改动,然后接着跑;

数据摄取完成之后,点击左上角的图片回到首页

你就看得到具体摄取的MYSQL数据源是什么

点击Mysql图片;进入数据源查看

随便找几个数据库:以NIO示例(主要是这里面的表少)

随便进入一张表:查看数据是否对——-没问题,连注释都带来了

注意DataHub只会摄取数据源,可不会自动帮你分析这个Mysql数据库的血缘关系

所以还需要自己去解析数据表之间的血缘关系
血缘关系也许很懵:咱不管是什么 ,当成数据库表与表之间的ER图即可

偷一张图sqlflow:就长这样:

DataHub可不仅仅只是关系数据库血缘管理,万物皆是数据,数据之间皆有血缘关系

分析这个Mysql数据库的血缘关系:推送到DataHUb[!--empirenews.page--]

(1.1)下载官网的GitHub源码{datahub-0.12.1}|找到血缘分析推送示例代码文件

(1.2)表级别的没什么可说的:UI界面都可以配置:代码也可以操作:但是列级只有代码操作

表级别示例:

列级别示例

(1.3)表级别的血缘示例解读:示例代码是硬设置关系的,自己回写代码可以灵活使用


import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter

# Construct a lineage object.
lineage_mce = builder.make_lineage_mce(
    [
        builder.make_dataset_urn("hive", "fct_users_deleted"),  # Upstream 表的上游关系
    ],
    builder.make_dataset_urn("hive", "logging_events"),  # Downstream 表的下游关系
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mce(lineage_mce)

执行:就 命令窗口 Python 文件名

(1.4)表级别的血缘示例最终效果 fct_users_deleted 两张表 logging_events有个线连起来,箭头表示下游

(1.5)列级的血缘关系:示例代码是硬设置某个表的某个字段和 另外表的某个字段关联

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)


def datasetUrn(dataType,tbl):
    return builder.make_dataset_urn(dataType, tbl,"PROD")


def fldUrn(dataType,tbl, fld):
    return builder.make_schema_field_urn(datasetUrn(dataType,tbl), fld)


 fineGrainedLineages = [
     FineGrainedLineage(
         upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
         upstreams=[
             fldUrn("mysql","datahub.task_info", "mid")
         ],
         downstreamType=FineGrainedLineageDownstreamType.FIELD,
         downstreams=[fldUrn("mysql","datahub.task_info_log", "task_id"),fldUrn("mysql","datahub.task_info_file", "task_info_id")]
     ),
]


# # this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
 upstream = Upstream(
     dataset=datasetUrn("mysql","datahub.task_info"), type=DatasetLineageType.TRANSFORMED
 )

 fieldLineages = UpstreamLineage(
     upstreams=[upstream], fineGrainedLineages=fineGrainedLineages
 )

 lineageMcp = MetadataChangeProposalWrapper(
     # 这里必须刷新的是下游节点|刷新一个展示一个
    entityUrn=datasetUrn("mysql","datahub.task_info_file"),
    aspect=fieldLineages,
 )

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://10.130.1.44:8080")

 # Emit metadata!
emitter.emit_mcp(lineageMcp)

print('Success')

[!--empirenews.page--]

(1.5)列级的血缘关系:示例代码是硬设置某个表的某个字段和 另外表的某个字段关系 效果图

他这个上下游都是List类型,你可以自己写死很多个,或者代码从某些地方获取很多歌塞进去
很坑的是DataHUb需要每一个上下游节点都需要设置刷新一次,才会有完整的链路

例如:上面的例子有三个表:task_info_log 、 task_info 、task_info_file ;
task_info 上游节点:有两个下游节点:task_info_log 、 task_info_file
但是只能设置一个下游节点去刷新:这里代码设置的task_info_file刷新,那么只有task_info_file的血缘关系有,但是task_info_log的就没有展示出来:你需要 entityUrn=datasetUrn("mysql","datahub.task_info_file"),再重新写一次entityUrn=datasetUrn("mysql","datahub.task_info_log"),的刷新,才会两个都出来!

注意:总结 也就是你有多少个下游,就需要刷新多少个下游节点表:才会数据完整血缘关系

(1.6)都是直接先设置好上下游关系,字段即可:示例写死的,你可以动态获取让里面填充数据: Over

有的时候很不友好直接去声明血缘关系:这个硬梳理太痛苦:

解决方案

借助第三方开源框架sqllineage去解析SQL;从SQL自动提炼出上游表和下游表关系;然后自动执行脚本创建

你需要先安装:sqllineage :他是Python的框架,可以借助 pip install sqllineage 去安装

pip install sqllineage -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com

使用:sqllineage + DataHUb的API

此代码来源于网络大神:
核心是先sqllineage分析血缘上下游;然后构建列级血缘,最有还有个优化就是筛选下游所有的表
但是我这里执行最后优化刷新下游所有的表 反而无法生成列级别血缘关系;;只有注释掉才能生成
我的DataHUb版本是 v0.12.1 最新的,不知道是不是版本问题

from sqllineage.runner import LineageRunner
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)


def datasetUrn(dataType,tbl):
    return builder.make_dataset_urn(dataType, tbl,"PROD")


def fldUrn(dataType,tbl, fld):
    return builder.make_schema_field_urn(datasetUrn(dataType,tbl), fld)

 # lineage_emitter_dataset_finegrained_sample.py

# 语法:insert into demo  原始查询语句
sql = """insert
    into
    nio.fee_info (creator,
    tenant_id,
    updator) 
select
    A.creator,
    B.tenant_id,
    B.office_name
from
    nio.archive_ledger_relationship A
left join nio.task_archive_borrowing B>sqllineage 分析再推送DataHUb的效果图

测试用表


-- nio.archive_ledger_relationship definition

CREATE TABLE `archive_ledger_relationship` (
  `task_id` decimal(11,0) DEFAULT NULL COMMENT '委托单ID',
  `archive_ledger_id` decimal(11,0) DEFAULT NULL COMMENT '归档台账ID',
  `archive_type` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '归档项类型',
  `that_table_id` decimal(11,0) DEFAULT NULL COMMENT '归档记录文件ID',
  `that_table_name` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '归档关联表名',
  `mid` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `state_id` decimal(5,0) DEFAULT NULL COMMENT '状态',
  `creator_id` decimal(11,0) DEFAULT NULL COMMENT '录入者Id',
  `creator` varchar(31) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '录入者',
  `create_time` datetime DEFAULT NULL COMMENT '录入时间',
  `updator_id` decimal(11,0) DEFAULT NULL COMMENT '更新者Id',
  `updator` varchar(31) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '更新者',
  `update_time` datetime DEFAULT NULL COMMENT '修改时间',
  `tenant_id` decimal(11,0) DEFAULT NULL COMMENT '租户ID',
  PRIMARY KEY (`mid`)
) ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='归档关联关系记录';

-- nio.fee_info definition
数据库NIO
CREATE TABLE `fee_info` (
  `state_id` decimal(5,0) DEFAULT NULL COMMENT '状态',
  `creator_id` decimal(11,0) DEFAULT NULL COMMENT '录入者Id',
  `creator` varchar(31) DEFAULT NULL COMMENT '录入者',
  `create_time` datetime DEFAULT NULL COMMENT '录入时间',
  `updator_id` decimal(11,0) DEFAULT NULL COMMENT '更新者Id',
  `updator` varchar(31) DEFAULT NULL COMMENT '更新者',
  `update_time` datetime DEFAULT NULL COMMENT '修改时间',
  `tenant_id` decimal(11,0) DEFAULT NULL COMMENT '租户ID',
  `mid` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `fee_item_id` decimal(11,0) DEFAULT NULL COMMENT '收费项ID',
  `fee_item_name` varchar(256) DEFAULT NULL COMMENT '收费项名称',
  `fee_type` decimal(2,0) DEFAULT NULL COMMENT '收费依据',
  `assets_info_id` decimal(11,0) DEFAULT NULL COMMENT '设备ID',
  `assets_info_no` varchar(256) DEFAULT NULL COMMENT '设备编号/试验条目',
  `price` decimal(15,2) DEFAULT NULL COMMENT '单价(元)',
  `unit` decimal(2,0) DEFAULT NULL COMMENT '单位',
  `coefficient` decimal(11,2) DEFAULT NULL COMMENT '系数',
  `start_time` datetime DEFAULT NULL COMMENT '实际开始日期',
  `end_time` datetime DEFAULT NULL COMMENT '实际结束日期',
  `fee_time` decimal(11,2) DEFAULT NULL COMMENT '费用时间',
  `fee` decimal(15,2) DEFAULT NULL COMMENT '费用(元)',
  `status` decimal(2,0) DEFAULT NULL COMMENT '状态',
  `remark` varchar(512) DEFAULT NULL COMMENT '备注',
  `task_info_id` decimal(11,0) DEFAULT NULL COMMENT '任务单ID',
  PRIMARY KEY (`mid`)
) ENGINE=InnoDB AUTO_INCREMENT=156 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='费用信息';
-- nio.task_archive_borrowing definition

CREATE TABLE `task_archive_borrowing` (
  `borrowers` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '借阅人',
  `borrowers_user_id` decimal(11,0) DEFAULT NULL COMMENT '借阅人Id',
  `office_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '实验室',
  `borrowing_days` decimal(11,0) DEFAULT NULL COMMENT '借阅天数',
  `borrowing_desc` varchar(1024) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '借阅原因',
  `borrowing_status` varchar(36) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '借阅状态',
  `borrowing_date` datetime DEFAULT NULL COMMENT '借阅时间',
  `borrowing_return_time` datetime DEFAULT NULL COMMENT '归还时间',
  `borrowing_request_no` varchar(36) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '借阅申请单号',
  `test_category` varchar(36) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '试验类型',
  `approver` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '审批人',
  `approver_id` decimal(11,0) DEFAULT NULL COMMENT '审批人Id',
  `msg` varchar(1024) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '审批结果',
  `mid` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `state_id` decimal(5,0) DEFAULT NULL COMMENT '状态',
  `creator_id` decimal(11,0) DEFAULT NULL COMMENT '录入者Id',
  `creator` varchar(31) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '录入者',
  `create_time` datetime DEFAULT NULL COMMENT '录入时间',
  `updator_id` decimal(11,0) DEFAULT NULL COMMENT '更新者Id',
  `updator` varchar(31) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '更新者',
  `update_time` datetime DEFAULT NULL COMMENT '修改时间',
  `tenant_id` decimal(11,0) DEFAULT NULL COMMENT '租户ID',
  `process_status` decimal(2,0) DEFAULT NULL COMMENT '流程状态',
  `process_definition_id` varchar(256) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '流程定义ID',
  `process_definition_key` varchar(256) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '流程定义KEY',
  `process_instance_id` varchar(256) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '流程实例ID',
  `process_name` varchar(256) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '流程名称',
  `title` varchar(1024) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '任务标题',
  `apply_time` datetime DEFAULT NULL COMMENT '申请时间',
  `applier_id` decimal(11,0) DEFAULT NULL COMMENT '申请者Id',
  `applier_name` varchar(15) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '申请者',
  `applier_org_id` decimal(11,0) DEFAULT NULL COMMENT '申请组织Id',
  `applier_org_name` varchar(256) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '申请者组织',
  `applier_org_level_code` varchar(256) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '申请者组织层级码',
  `url` varchar(1024) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '表单地址',
  PRIMARY KEY (`mid`)
) ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='归档借阅';
[!--empirenews.page--]

测试捯饬结束!!!!

后台-插件-广告管理-首页/栏目/内容广告位二(PC)
后台-插件-广告管理-首页/栏目/内容广告位二(手机)
后台-插件-广告管理-内容广告位三(PC)
后台-插件-广告管理-内容广告位三(手机)

相关阅读

后台-插件-广告管理-内容广告位四(PC)
后台-插件-广告管理-内容广告位四(手机)

聚合标签

热门文章

后台-插件-广告管理-侧边广告位一(PC)
后台-插件-广告管理-侧边广告位一(手机)
  • Windows主机中localhost与127.0.0.1的区别是什么?

  • localhost与127.0.0.1的区别是什么? 相信有人会说是本地IP,曾有人说,用127.0.0.1比localhost好,可以减少一次解析。 这个理解是错误的,其实这两者是有区别的。 localhost也叫l
  • c盘满了怎么清理垃圾而不误删

  • 今天分享的主题是:c盘爆满发出警告如何清理又不误删系统文件。如果你也不会的话就看看下面的经验吧。 c盘满了怎么清理垃圾而不误删 1、很多人在清理c盘垃圾的时候会误删
  • steam怎么退款?

  • 有的时候我们在steam上买了游戏,但是却发现自己的电脑无法加载这款游戏,这时候我们就会想到退款,那么steam如何退款呢?下面小编就来给大家介绍一下。 steam怎么退款? 1、在ste
  • 电脑怎么录屏?如何录制电脑屏幕操作?

  • 如何录制电脑屏幕操作,相信很多朋友们遇到过这种类似的问题,你们对于这类问题如何解决呢?下面就给大家分享一下个人经验,希望可以帮助到大家。 电脑怎么录屏? 方法一:手机录制。
  • 手机如何投屏到电脑?(手机投屏电脑方法)

  • 每次都有新手机发布会,总会提到采用多少英寸的屏幕,但是手机在大的屏幕,也没有手机投屏到电脑、电视的体验爽,下面就一起来看看手机如何投屏到电脑? 手机投屏电脑方法 1、打开
后台-插件-广告管理-侧边广告位二(PC)
后台-插件-广告管理-侧边广告位二(手机)

最新文章

  • 【Redis】一文掌握Redis原理及常见问题

  • Redis是基于内存数据库,操作效率高,提供丰富的数据结构(Redis底层对数据结构还做了优化),可用作数据库,缓存,消息中间件等。如今广泛用于互联网大厂,面试必考点之一,本文从数据结构,到
  • 【经典问题】mysql和redis数据一致性问题

  • 前言MySQL和Redis数据一致性算是个很经典的问题,在之前也看到过很多相关的文章,最近心血来潮,想把一致性问题的解决方案和存在问题都总结一下。不推荐方案1 先更新MySQL,再更新R
  • Mysql事务实现原理

  • 在日常工作中,数据库是我们必须使用的,其中使用最多的也是大部分中小公司的选择是Mysql,跳槽面试中也是必问的,今天我们就说一下Mysql事务MySQL中的事务实现原理主要涉及以下几
后台-插件-广告管理-侧边广告位三(PC)
后台-插件-广告管理-侧边广告位三(手机)