å®ç° doris ç source
åè mysql ç source æ¥å®ç° doris ç source, å 为ç´æ¥ä½¿ç¨ mysql ç source åå¨å¦ä¸çå 个å°é®é¢ï¼
- doris çä¸äºåç±»åä¸è¢«æ¯æï¼çé¢ä¼æ¾ç¤ºä¸º NullType
- doris éçè§å¾ï¼viewï¼æ²¡æ³è¢«æ£å¸¸ééï¼å¯¼è´è§å¾ççé¢æ²¡æåçä¿¡æ¯
doris èªå·±çä¸äºåç±»åçæ¯æ
æå¦ä¸æ³¨åå°±å¯ä»¥äºï¼
LARGEINT = make_sqlalchemy_type("LARGEINT")
BITMAP = make_sqlalchemy_type("BITMAP")
HLL = make_sqlalchemy_type("HLL")
HYPERLOGLOG = make_sqlalchemy_type("HYPERLOGLOG")
register_custom_type(LARGEINT, NumberTypeClass)
register_custom_type(BITMAP)
register_custom_type(HLL)
register_custom_type(HYPERLOGLOG)
è§å¾ï¼viewï¼ééçæ¯æ
é®é¢åå æ¯ SQLAlchemy ä¸ MySQLDialect ç _setup_parser æ¹æ³å¯¹å»ºè¡¨è¯å¥æ¯ä¸æ¯ä¸ä¸ªè§å¾çå¤ææ¯ä¸ä¸ªè¿æ ·çæ£åï¼"^CREATE (?:ALGORITHM)?.* VIEW"ï¼è doris å é¨å mysql ç表示æ¯ä¸ä¸æ ·çï¼doris çç¨ "^CREATE VIEW" å»å¹é å°±å¯ä»¥äºã
MySQLDialect ä¸ç _setup_parser æ¹æ³ï¼
@reflection.cache
def _setup_parser(self, connection, table_name, schema=None, **kw):
charset = self._connection_charset
parser = self._tabledef_parser
full_name = ".".join(
self.identifier_preparer._quote_free_identifiers(
schema, table_name
)
)
sql = self._show_create_table(
connection, None, charset, full_name=full_name
)
if re.match(r"^CREATE (?:ALGORITHM)?.* VIEW", sql):
# Adapt views to something table-like.
columns = self._describe_table(
connection, None, charset, full_name=full_name
)
sql = parser._describe_to_create(table_name, columns)
return parser.parse(sql, charset)
DorisSource ç±»ç»§æ¿ SQLAlchemySourceï¼éå _process_view æ¹æ³ï¼å¨ _process_view éç¨èªå·±å®ç°ç doris_setup_parser æ¹æ³æ¿æ¢æ MySQLDialect ç _setup_parser æ¹æ³ï¼
setup_parser = python_types.MethodType(doris_setup_parser, inspector.bind.dialect)
inspector.bind.dialect._setup_parser = setup_parser
doris_setup_parser æ¹æ³çå®ç°å¨ä¸é¢çå®æ´ä»£ç ä¸ã
Doris source çå®æ´å®ç°ä»£ç
metadata-ingestion/src/datahub/ingestion/source/sql/doris.py
# This import verifies that the dependencies are available.
import pymysql # noqa: F401
from pydantic.fields import Field
from sqlalchemy.dialects.mysql import base
from sqlalchemy.dialects.mysql.base import MySQLDialect
from pydantic.fields import Field
from sqlalchemy import create_engine, dialects, inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.sql import sqltypes as types
import types as python_types
from typing import (
Dict,
Iterable,
Optional,
Union,
cast
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
import logging
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DatasetPropertiesClass,
SubTypesClass,
ViewPropertiesClass,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
make_sqlalchemy_type,
register_custom_type,
SqlWorkUnit,
get_schema_metadata,
make_dataset_urn_with_platform_instance
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
NumberTypeClass
)
GEOMETRY = make_sqlalchemy_type("GEOMETRY")
POINT = make_sqlalchemy_type("POINT")
LINESTRING = make_sqlalchemy_type("LINESTRING")
POLYGON = make_sqlalchemy_type("POLYGON")
DECIMAL128 = make_sqlalchemy_type("DECIMAL128")
LARGEINT = make_sqlalchemy_type("LARGEINT")
BITMAP = make_sqlalchemy_type("BITMAP")
HLL = make_sqlalchemy_type("HLL")
HYPERLOGLOG = make_sqlalchemy_type("HYPERLOGLOG")
register_custom_type(GEOMETRY)
register_custom_type(POINT)
register_custom_type(LINESTRING)
register_custom_type(POLYGON)
register_custom_type(DECIMAL128)
register_custom_type(LARGEINT, NumberTypeClass)
register_custom_type(BITMAP)
register_custom_type(HLL)
register_custom_type(HYPERLOGLOG)
base.ischema_names["geometry"] = GEOMETRY
base.ischema_names["point"] = POINT
base.ischema_names["linestring"] = LINESTRING
base.ischema_names["polygon"] = POLYGON
base.ischema_names["decimal128"] = DECIMAL128
base.ischema_names["largeint"] = LARGEINT
base.ischema_names["bitmap"] = BITMAP
base.ischema_names["hll"] = HLL
base.ischema_names["hyperloglog"] = HYPERLOGLOG
logger: logging.Logger = logging.getLogger(__name__)
class DorisConfig(BasicSQLAlchemyConfig):
# defaults
host_port = Field(default="localhost:3306", description="MySQL host URL.")
scheme = "mysql+pymysql"
def get_identifier(self, *, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
if self.database_alias:
return f"{self.database_alias}.{table}"
else:
return regular
@platform_name("Doris")
@config_class(DorisConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class DorisSource(SQLAlchemySource):
"""
This plugin extracts the following:
Metadata for databases, schemas, and tables
Column types and schema associated with each table
Table, row, and column statistics via optional SQL profiling
"""
def __init__(self, config, ctx):
super().__init__(config, ctx, self.get_platform())
def get_platform(self):
return "doris"
@classmethod
def create(cls, config_dict, ctx):
print(">>>> doris")
config = DorisConfig.parse_obj(config_dict)
return cls(config, ctx)
def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: BasicSQLAlchemyConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
try:
setup_parser = python_types.MethodType(doris_setup_parser, inspector.bind.dialect)
inspector.bind.dialect._setup_parser = setup_parser
columns = inspector.get_columns(view, schema)
except KeyError:
# For certain types of views, we are unable to fetch the list of columns.
self.report.report_warning(
dataset_name, "unable to get schema for this view"
)
schema_metadata = None
else:
schema_fields = self.get_schema_fields(dataset_name, columns)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
self.platform,
columns,
canonical_schema=schema_fields,
)
try:
# SQLALchemy stubs are incomplete and missing this method.
# PR: https://github.com/dropbox/sqlalchemy-stubs/pull/223.
view_info: dict = inspector.get_table_comment(view, schema) # type: ignore
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
except ProgrammingError as pe:
# Snowflake needs schema names quoted when fetching table comments.
logger.debug(
f"Encountered ProgrammingError. Retrying with quoted schema name for schema {schema} and view {view}",
pe,
)
description = None
properties = {}
view_info: dict = inspector.get_table_comment(view, f'"{schema}"') # type: ignore
else:
description = view_info["text"]
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = view_info.get("properties", {})
try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""
properties["view_definition"] = view_definition
properties["is_view"] = "True"
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
db_name = self.get_db_name(inspector)
yield from self.add_table_to_schema_container(dataset_urn, db_name, schema)
if self.is_stateful_ingestion_configured():
cur_checkpoint = self.get_current_checkpoint(
self.get_default_ingestion_job_id()
)
if cur_checkpoint is not None:
checkpoint_state = cast(
BaseSQLAlchemyCheckpointState, cur_checkpoint.state
)
checkpoint_state.add_view_urn(dataset_urn)
dataset_properties = DatasetPropertiesClass(
name=view,
description=description,
customProperties=properties,
)
dataset_snapshot.aspects.append(dataset_properties)
if schema_metadata:
dataset_snapshot.aspects.append(schema_metadata)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect
subtypes_aspect = MetadataWorkUnit(
id=f"{view}-subtypes",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["view"]),
),
)
self.report.report_workunit(subtypes_aspect)
yield subtypes_aspect
if "view_definition" in properties:
view_definition_string = properties["view_definition"]
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
view_properties_wu = MetadataWorkUnit(
id=f"{view}-viewProperties",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="viewProperties",
aspect=view_properties_aspect,
),
)
self.report.report_workunit(view_properties_wu)
yield view_properties_wu
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
sql_config=sql_config,
)
import re
def doris_setup_parser(self, connection, table_name, schema=None, **kw):
charset = self._connection_charset
parser = self._tabledef_parser
full_name = ".".join(
self.identifier_preparer._quote_free_identifiers(
schema, table_name
)
)
sql = self._show_create_table(
connection, None, charset, full_name=full_name
)
if re.match(r"^CREATE VIEW", sql):
# Adapt views to something table-like.
columns = self._describe_table(
connection, None, charset, full_name=full_name
)
sql = parser._describe_to_create(table_name, columns)
return parser.parse(sql.lower(), charset)
æå让 doris source å¯ä»¥è¢«æ¾å°
对 metadata-ingestion/setup.py æ件è¿è¡ä¿®æ¹ï¼å¨"datahub.ingestion.source.plugins" ä¸å¢å "doris = datahub.ingestion.source.sql.doris:DorisSource"ã
...
entry_points = {
"console_scripts": ["datahub = datahub.entrypoints:main"],
"datahub.ingestion.source.plugins": [
...
"doris = datahub.ingestion.source.sql.doris:DorisSource",
...
],
...
æåçæå
recipe.yml çé ç½®ä¾åï¼
source:
type: doris
config:
platform_instance: doris_dev
host_port: 'doris_fe_ip:9030'
database: data_market
# Credentials
username: doris_username
password: doris_password
include_tables: true
include_views: true
schema_pattern:
deny:
- sys
- mysql
- information_schema
- performance_schema
sink:
# sink configs
type: datahub-rest
config:
server: 'http://gms-server-ip:8080'