方法1:
問題:
kafka confluent做資料轉換,pg--->pg同步。源pg字段time為timestamp,長度為6,值為2020-09-12 15:30:49。
源連接配接器類型是:io.debezium.connector.postgresql.PostgresConnector
但是同步到目标表pg中,則變成了1599895849000000。為微秒精度,且是bigint類型。經檢查,在topic中也是微秒精度。但是想讓2020-09-12 15:30:49到目标表中也是2020-09-12 15:30:49。
查了一下debezium官網:
https://debezium.io/documentation/reference/1.2/connectors/postgresql.html
Other than PostgreSQL’s TIMESTAMPTZ and TIMETZ data types, which contain time zone information, how temporal types are mapped depends on the value of the time.precision.mode connector configuration property. The following sections describe these mappings:
time.precision.mode=adaptive(預設)
time.precision.mode=adaptive_time_microseconds
time.precision.mode=connect
debezium對時間有三種處理方式。

大體說,postgresql中的timestamp長度為1--3,下面兩種配置會轉成毫秒。如果timestamp長度為4--6,轉為微秒。
time.precision.mode=adaptive(預設)
time.precision.mode=adaptive_time_microseconds
time.precision.mode=connect,這種配置,會全部轉成毫秒。但是有丢失精度的風險。如下圖介紹:
我們将"connector.class", "io.debezium.connector.postgresql.PostgresConnector",配置上加上"time.precision.mode", "connect",到topic則變成了1599895849。
接下來需要配置sink:
"connector.class", "io.confluent.connect.jdbc.JdbcSinkConnector"
"transforms", "TimestampConverter" 設定轉換器
"transforms.TimestampConverter.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value"
"transforms.TimestampConverter.format", "yyyy-MM-dd HH:mm:ss" 轉成時間類型
"transforms.TimestampConverter.target.type", "Timestamp" 轉後的格式
"transforms.TimestampConverter.field", "time" 需要轉的字段。
總結:
source添加:"time.precision.mode", "connect"
sink添加:
"transforms", "TimestampConverter" 設定轉換器
"transforms.TimestampConverter.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value"
"transforms.TimestampConverter.format", "yyyy-MM-dd HH:mm:ss" 轉成時間類型
"transforms.TimestampConverter.target.type", "Timestamp" 轉後的格式
"transforms.TimestampConverter.field", "time" 需要轉的字段。
缺點:
需要轉多少字段,就得單獨加多少字段名字。--->"transforms.TimestampConverter.field", "time"
重點!!方式2:
方式1,有時候不行。具體原因不知道。
找到了方式2:
source:
"database.serverTimezone":"UTC",
"time.precision.mode":"connect",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"true",
"transforms.unwrap.delete.handling.mode":"none",
"transforms.unwrap.operation.header":"false"
sink:
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "create_time,his_create_time,his_update_time"
整體的config
{
"name": "source.postgres-hhhh.sss.table1.hisdata.table1.12341234",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"slot.name":"table1_12341234", #pg的複制槽名稱
"plugin.name":"wal2json",
"database.hostname":"127.0.0.1",
"database.port":"5432",
"database.user":"root",
"database.password":"[email protected]",
"database.dbname":"hhhh",
"database.server.name":"hhhh",
"database.serverTimezone":"UTC",
"message.key.columns":"hhhh.sss.table1:bar_code_id", #指定主鍵
"time.precision.mode":"connect",
"database.whitessst":"hhhh",
"schema.whitessst":"sss",
"database.tcpKeepAlive":"true",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"true",
"transforms.unwrap.delete.handling.mode":"none",
"transforms.unwrap.operation.header":"false"
}
}
{
"name": "sink.postgres-hhhh.sss.table1-hisdata.table1.12341234",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"transforms.rootaceField.type": "org.apache.kafka.connect.transforms.rootaceField$Value",
"transforms": "unwrap,rootaceField,RenameField,TimestampConverter",
"auto.evolve": "true",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.rootaceField$Value",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"insert.mode": "upsert",
"transforms.RenameField.renames": "lab_item_id:lab_item_id,ceate_user_sys_id:apply_id,his_update_time:his_create_time,lab_item_name:lab_item_name,create_time:create_time,his_creater_name:his_org_id,flag_print:flag_print,flag_emergency:flag_emergency,hospital_id:hospital_id,ceate_user_sys_name:version,bar_code_id:bar_code_id,version:his_updater_id,order_item_id:order_item_id,his_updater_id:his_creater_name,his_create_time:his_creater_id,print_count:print_count,bar_code:bar_code,apply_id:his_update_time,his_creater_id:ceate_user_sys_id,his_org_id:ceate_user_sys_name,status:status",
"table.name.format": "table1",
"topics": "hhhh.sss.table1",
"delete.enabled": "false",
"auto.create": "true",
"connection.url": "jdbc:postgresql://127.0.0.1:5432/hisdata?currentSchema=sss&user=root&[email protected]",
"transforms.rootaceField.whitessst": "lab_item_id,ceate_user_sys_id,his_update_time,lab_item_name,create_time,his_creater_name,flag_print,flag_emergency,hospital_id,ceate_user_sys_name,bar_code_id,version,order_item_id,his_updater_id,his_create_time,print_count,bar_code,apply_id,his_creater_id,his_org_id,status",
"pk.mode": "record_value",
"pk.fields": "bar_code_id",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "create_time,his_create_time,his_update_time" #時間的字段名稱
}
}