天天看點

LinkedIn DataHub 中對 Apache doris 中繼資料的采集

作者:燈惉

实现 doris 的 source

参考 mysql 的 source 来实现 doris 的 source, 因为直接使用 mysql 的 source 存在如下的几个小问题:

  • doris 的一些列类型不被支持,界面会显示为 NullType
  • doris 里的视图(view)没法被正常采集,导致视图的界面没有列的信息

doris 自己的一些列类型的支持

按如下注册就可以了:

LARGEINT = make_sqlalchemy_type("LARGEINT")
BITMAP = make_sqlalchemy_type("BITMAP")
HLL = make_sqlalchemy_type("HLL")
HYPERLOGLOG = make_sqlalchemy_type("HYPERLOGLOG")

register_custom_type(LARGEINT, NumberTypeClass)
register_custom_type(BITMAP)
register_custom_type(HLL)
register_custom_type(HYPERLOGLOG)           

视图(view)采集的支持

问题原因是 SQLAlchemy 中 MySQLDialect 的 _setup_parser 方法对建表语句是不是一个视图的判断是一个这样的正则:"^CREATE (?:ALGORITHM)?.* VIEW",而 doris 内部和 mysql 的表示是不一样的,doris 的用 "^CREATE VIEW" 去匹配就可以了。

MySQLDialect 中的 _setup_parser 方法:

@reflection.cache
    def _setup_parser(self, connection, table_name, schema=None, **kw):
        charset = self._connection_charset
        parser = self._tabledef_parser
        full_name = ".".join(
            self.identifier_preparer._quote_free_identifiers(
                schema, table_name
            )
        )
        sql = self._show_create_table(
            connection, None, charset, full_name=full_name
        )
        if re.match(r"^CREATE (?:ALGORITHM)?.* VIEW", sql):
            # Adapt views to something table-like.
            columns = self._describe_table(
                connection, None, charset, full_name=full_name
            )
            sql = parser._describe_to_create(table_name, columns)
        return parser.parse(sql, charset)           

DorisSource 类继承 SQLAlchemySource,重写 _process_view 方法,在 _process_view 里用自己实现的 doris_setup_parser 方法替换掉 MySQLDialect 的 _setup_parser 方法:

setup_parser = python_types.MethodType(doris_setup_parser, inspector.bind.dialect) 
inspector.bind.dialect._setup_parser = setup_parser           

doris_setup_parser 方法的实现在下面的完整代码中。

Doris source 的完整实现代码

metadata-ingestion/src/datahub/ingestion/source/sql/doris.py

# This import verifies that the dependencies are available.

import pymysql  # noqa: F401
from pydantic.fields import Field
from sqlalchemy.dialects.mysql import base
from sqlalchemy.dialects.mysql.base import MySQLDialect
from pydantic.fields import Field
from sqlalchemy import create_engine, dialects, inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.sql import sqltypes as types
import types as python_types
from typing import (
    Dict,
    Iterable,
    Optional,
    Union,
    cast
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
import logging
from datahub.ingestion.source.state.sql_common_state import (
    BaseSQLAlchemyCheckpointState,
)
from datahub.metadata.schema_classes import (
    ChangeTypeClass,
    DatasetPropertiesClass,
    SubTypesClass,
    ViewPropertiesClass,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper

from datahub.ingestion.api.decorators import (
    SourceCapability,
    SupportStatus,
    capability,
    config_class,
    platform_name,
    support_status,
)
from datahub.ingestion.source.sql.sql_common import (
    BasicSQLAlchemyConfig,
    SQLAlchemySource,
    make_sqlalchemy_type,
    register_custom_type,
    SqlWorkUnit,
    get_schema_metadata,
    make_dataset_urn_with_platform_instance
)

from datahub.metadata.com.linkedin.pegasus2avro.schema import (
    NumberTypeClass
)

GEOMETRY = make_sqlalchemy_type("GEOMETRY")
POINT = make_sqlalchemy_type("POINT")
LINESTRING = make_sqlalchemy_type("LINESTRING")
POLYGON = make_sqlalchemy_type("POLYGON")
DECIMAL128 = make_sqlalchemy_type("DECIMAL128")
LARGEINT = make_sqlalchemy_type("LARGEINT")
BITMAP = make_sqlalchemy_type("BITMAP")
HLL = make_sqlalchemy_type("HLL")
HYPERLOGLOG = make_sqlalchemy_type("HYPERLOGLOG")

register_custom_type(GEOMETRY)
register_custom_type(POINT)
register_custom_type(LINESTRING)
register_custom_type(POLYGON)
register_custom_type(DECIMAL128)
register_custom_type(LARGEINT, NumberTypeClass)
register_custom_type(BITMAP)
register_custom_type(HLL)
register_custom_type(HYPERLOGLOG)

base.ischema_names["geometry"] = GEOMETRY
base.ischema_names["point"] = POINT
base.ischema_names["linestring"] = LINESTRING
base.ischema_names["polygon"] = POLYGON
base.ischema_names["decimal128"] = DECIMAL128
base.ischema_names["largeint"] = LARGEINT
base.ischema_names["bitmap"] = BITMAP
base.ischema_names["hll"] = HLL
base.ischema_names["hyperloglog"] = HYPERLOGLOG


logger: logging.Logger = logging.getLogger(__name__)

class DorisConfig(BasicSQLAlchemyConfig):
    # defaults
    host_port = Field(default="localhost:3306", description="MySQL host URL.")
    scheme = "mysql+pymysql"

    def get_identifier(self, *, schema: str, table: str) -> str:
        regular = f"{schema}.{table}"
        if self.database_alias:
            return f"{self.database_alias}.{table}"
        else:
            return regular


@platform_name("Doris")
@config_class(DorisConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class DorisSource(SQLAlchemySource):
    """
    This plugin extracts the following:

    Metadata for databases, schemas, and tables
    Column types and schema associated with each table
    Table, row, and column statistics via optional SQL profiling
    """

    def __init__(self, config, ctx):
        super().__init__(config, ctx, self.get_platform())

    def get_platform(self):
        return "doris"

    @classmethod
    def create(cls, config_dict, ctx):
        print(">>>> doris")
        config = DorisConfig.parse_obj(config_dict)
        return cls(config, ctx)


    def _process_view(
        self,
        dataset_name: str,
        inspector: Inspector,
        schema: str,
        view: str,
        sql_config: BasicSQLAlchemyConfig,
    ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
        try:
            setup_parser = python_types.MethodType(doris_setup_parser, inspector.bind.dialect) 
            inspector.bind.dialect._setup_parser = setup_parser
            columns = inspector.get_columns(view, schema)
        except KeyError:
            # For certain types of views, we are unable to fetch the list of columns.
            self.report.report_warning(
                dataset_name, "unable to get schema for this view"
            )
            schema_metadata = None
        else:
            schema_fields = self.get_schema_fields(dataset_name, columns)
            schema_metadata = get_schema_metadata(
                self.report,
                dataset_name,
                self.platform,
                columns,
                canonical_schema=schema_fields,
            )
        try:
            # SQLALchemy stubs are incomplete and missing this method.
            # PR: https://github.com/dropbox/sqlalchemy-stubs/pull/223.
            view_info: dict = inspector.get_table_comment(view, schema)  # type: ignore
        except NotImplementedError:
            description: Optional[str] = None
            properties: Dict[str, str] = {}
        except ProgrammingError as pe:
            # Snowflake needs schema names quoted when fetching table comments.
            logger.debug(
                f"Encountered ProgrammingError. Retrying with quoted schema name for schema {schema} and view {view}",
                pe,
            )
            description = None
            properties = {}
            view_info: dict = inspector.get_table_comment(view, f'"{schema}"')  # type: ignore
        else:
            description = view_info["text"]

            # The "properties" field is a non-standard addition to SQLAlchemy's interface.
            properties = view_info.get("properties", {})
        try:
            view_definition = inspector.get_view_definition(view, schema)
            if view_definition is None:
                view_definition = ""
            else:
                # Some dialects return a TextClause instead of a raw string,
                # so we need to convert them to a string.
                view_definition = str(view_definition)
        except NotImplementedError:
            view_definition = ""
        properties["view_definition"] = view_definition
        properties["is_view"] = "True"
        dataset_urn = make_dataset_urn_with_platform_instance(
            self.platform,
            dataset_name,
            self.config.platform_instance,
            self.config.env,
        )
        dataset_snapshot = DatasetSnapshot(
            urn=dataset_urn,
            aspects=[StatusClass(removed=False)],
        )
        db_name = self.get_db_name(inspector)
        yield from self.add_table_to_schema_container(dataset_urn, db_name, schema)
        if self.is_stateful_ingestion_configured():
            cur_checkpoint = self.get_current_checkpoint(
                self.get_default_ingestion_job_id()
            )
            if cur_checkpoint is not None:
                checkpoint_state = cast(
                    BaseSQLAlchemyCheckpointState, cur_checkpoint.state
                )
                checkpoint_state.add_view_urn(dataset_urn)
        dataset_properties = DatasetPropertiesClass(
            name=view,
            description=description,
            customProperties=properties,
        )
        dataset_snapshot.aspects.append(dataset_properties)
        if schema_metadata:
            dataset_snapshot.aspects.append(schema_metadata)
        mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
        wu = SqlWorkUnit(id=dataset_name, mce=mce)
        self.report.report_workunit(wu)
        yield wu
        dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
        if dpi_aspect:
            yield dpi_aspect
        subtypes_aspect = MetadataWorkUnit(
            id=f"{view}-subtypes",
            mcp=MetadataChangeProposalWrapper(
                entityType="dataset",
                changeType=ChangeTypeClass.UPSERT,
                entityUrn=dataset_urn,
                aspectName="subTypes",
                aspect=SubTypesClass(typeNames=["view"]),
            ),
        )
        self.report.report_workunit(subtypes_aspect)
        yield subtypes_aspect
        if "view_definition" in properties:
            view_definition_string = properties["view_definition"]
            view_properties_aspect = ViewPropertiesClass(
                materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
            )
            view_properties_wu = MetadataWorkUnit(
                id=f"{view}-viewProperties",
                mcp=MetadataChangeProposalWrapper(
                    entityType="dataset",
                    changeType=ChangeTypeClass.UPSERT,
                    entityUrn=dataset_urn,
                    aspectName="viewProperties",
                    aspect=view_properties_aspect,
                ),
            )
            self.report.report_workunit(view_properties_wu)
            yield view_properties_wu

        yield from self._get_domain_wu(
            dataset_name=dataset_name,
            entity_urn=dataset_urn,
            entity_type="dataset",
            sql_config=sql_config,
        )


import re
def doris_setup_parser(self, connection, table_name, schema=None, **kw):
    charset = self._connection_charset
    parser = self._tabledef_parser
    full_name = ".".join(
        self.identifier_preparer._quote_free_identifiers(
            schema, table_name
        )
    )
    sql = self._show_create_table(
        connection, None, charset, full_name=full_name
    )
    if re.match(r"^CREATE VIEW", sql):
        # Adapt views to something table-like.
        columns = self._describe_table(
            connection, None, charset, full_name=full_name
        )
        sql = parser._describe_to_create(table_name, columns)
    return parser.parse(sql.lower(), charset)           

最后让 doris source 可以被找到

对 metadata-ingestion/setup.py 文件进行修改,在"datahub.ingestion.source.plugins" 中增加 "doris = datahub.ingestion.source.sql.doris:DorisSource"。

...

entry_points = {
    "console_scripts": ["datahub = datahub.entrypoints:main"],
    "datahub.ingestion.source.plugins": [
        ... 
        "doris = datahub.ingestion.source.sql.doris:DorisSource",
        ...
    ],

...           

最后的最后

recipe.yml 的配置例子:

source:
  type: doris
  config:
    platform_instance: doris_dev

    host_port: 'doris_fe_ip:9030'
    database: data_market
    # Credentials
    username: doris_username
    password: doris_password

    include_tables: true
    include_views: true
    schema_pattern:
        deny:
            - sys
            - mysql
            - information_schema
            - performance_schema
sink:
  # sink configs
  type: datahub-rest
  config:
    server: 'http://gms-server-ip:8080'