天天看點

kafka confluent postgresql 時間精度轉換問題(微秒轉毫秒),兩種解決方法方法1:問題:總結:缺點:重點!!方式2:

方法1:

問題:

kafka confluent做資料轉換,pg--->pg同步。源pg字段time為timestamp,長度為6,值為2020-09-12 15:30:49。

源連接配接器類型是:io.debezium.connector.postgresql.PostgresConnector

但是同步到目标表pg中,則變成了1599895849000000。為微秒精度,且是bigint類型。經檢查,在topic中也是微秒精度。但是想讓2020-09-12 15:30:49到目标表中也是2020-09-12 15:30:49。

查了一下debezium官網:

https://debezium.io/documentation/reference/1.2/connectors/postgresql.html

Other than PostgreSQL’s TIMESTAMPTZ and TIMETZ data types, which contain time zone information, how temporal types are mapped depends on the value of the time.precision.mode connector configuration property. The following sections describe these mappings:

time.precision.mode=adaptive(預設)

time.precision.mode=adaptive_time_microseconds

time.precision.mode=connect

debezium對時間有三種處理方式。

kafka confluent postgresql 時間精度轉換問題(微秒轉毫秒),兩種解決方法方法1:問題:總結:缺點:重點!!方式2:

大體說,postgresql中的timestamp長度為1--3,下面兩種配置會轉成毫秒。如果timestamp長度為4--6,轉為微秒。

time.precision.mode=adaptive(預設)

time.precision.mode=adaptive_time_microseconds

time.precision.mode=connect,這種配置,會全部轉成毫秒。但是有丢失精度的風險。如下圖介紹:

kafka confluent postgresql 時間精度轉換問題(微秒轉毫秒),兩種解決方法方法1:問題:總結:缺點:重點!!方式2:

我們将"connector.class", "io.debezium.connector.postgresql.PostgresConnector",配置上加上"time.precision.mode", "connect",到topic則變成了1599895849。

接下來需要配置sink:

"connector.class", "io.confluent.connect.jdbc.JdbcSinkConnector"

"transforms", "TimestampConverter" 設定轉換器

"transforms.TimestampConverter.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value"

"transforms.TimestampConverter.format", "yyyy-MM-dd HH:mm:ss" 轉成時間類型

"transforms.TimestampConverter.target.type", "Timestamp" 轉後的格式

"transforms.TimestampConverter.field", "time" 需要轉的字段。

總結:

source添加:"time.precision.mode", "connect"

sink添加:

"transforms", "TimestampConverter" 設定轉換器

"transforms.TimestampConverter.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value"

"transforms.TimestampConverter.format", "yyyy-MM-dd HH:mm:ss" 轉成時間類型

"transforms.TimestampConverter.target.type", "Timestamp" 轉後的格式

"transforms.TimestampConverter.field", "time" 需要轉的字段。

缺點:

需要轉多少字段,就得單獨加多少字段名字。--->"transforms.TimestampConverter.field", "time"

重點!!方式2:

方式1,有時候不行。具體原因不知道。

找到了方式2:

source:

"database.serverTimezone":"UTC",

"time.precision.mode":"connect",

"transforms":"unwrap",

"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",

"transforms.unwrap.drop.tombstones":"true",

"transforms.unwrap.delete.handling.mode":"none",

"transforms.unwrap.operation.header":"false"

sink:

"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",

"transforms.TimestampConverter.target.type": "Timestamp",

"transforms.TimestampConverter.field": "create_time,his_create_time,his_update_time"

整體的config

{
    "name": "source.postgres-hhhh.sss.table1.hisdata.table1.12341234",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
		"slot.name":"table1_12341234",  #pg的複制槽名稱
        "plugin.name":"wal2json",
        "database.hostname":"127.0.0.1",
        "database.port":"5432",
        "database.user":"root",
        "database.password":"[email protected]",
        "database.dbname":"hhhh",
        "database.server.name":"hhhh",
        "database.serverTimezone":"UTC",
		"message.key.columns":"hhhh.sss.table1:bar_code_id",  #指定主鍵
        "time.precision.mode":"connect",
        "database.whitessst":"hhhh",
        "schema.whitessst":"sss",
		"database.tcpKeepAlive":"true",
        "transforms":"unwrap",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"true",
        "transforms.unwrap.delete.handling.mode":"none",
        "transforms.unwrap.operation.header":"false"
    }
}








{
    "name": "sink.postgres-hhhh.sss.table1-hisdata.table1.12341234",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "transforms.rootaceField.type": "org.apache.kafka.connect.transforms.rootaceField$Value",
    "transforms": "unwrap,rootaceField,RenameField,TimestampConverter",
    "auto.evolve": "true",
    "transforms.RenameField.type": "org.apache.kafka.connect.transforms.rootaceField$Value",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "insert.mode": "upsert",
    "transforms.RenameField.renames": "lab_item_id:lab_item_id,ceate_user_sys_id:apply_id,his_update_time:his_create_time,lab_item_name:lab_item_name,create_time:create_time,his_creater_name:his_org_id,flag_print:flag_print,flag_emergency:flag_emergency,hospital_id:hospital_id,ceate_user_sys_name:version,bar_code_id:bar_code_id,version:his_updater_id,order_item_id:order_item_id,his_updater_id:his_creater_name,his_create_time:his_creater_id,print_count:print_count,bar_code:bar_code,apply_id:his_update_time,his_creater_id:ceate_user_sys_id,his_org_id:ceate_user_sys_name,status:status",
    "table.name.format": "table1",
    "topics": "hhhh.sss.table1",
    "delete.enabled": "false",
    "auto.create": "true",
    "connection.url": "jdbc:postgresql://127.0.0.1:5432/hisdata?currentSchema=sss&user=root&[email protected]",
    "transforms.rootaceField.whitessst": "lab_item_id,ceate_user_sys_id,his_update_time,lab_item_name,create_time,his_creater_name,flag_print,flag_emergency,hospital_id,ceate_user_sys_name,bar_code_id,version,order_item_id,his_updater_id,his_create_time,print_count,bar_code,apply_id,his_creater_id,his_org_id,status",
    "pk.mode": "record_value",
    "pk.fields": "bar_code_id",
	"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.field": "create_time,his_create_time,his_update_time"  #時間的字段名稱
    }
}