天天看点

DataWorks百问百答34:mongoDB同步odps实践时如何使用时间戳筛选数据?

情景:mongoDB数据集成任务query配置参数中不能支持.valueOf()这个方法(形如"query":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424Z.valueOf()')}}" bson解析不出)

以下我们使用赋值节点+数据集成节点进行时间戳值的实现:

1.新建赋值节点,并向下游传出unixtime时间戳数值:(赋值节点内选择odps sql节点或者shell节点均可实现向下游传值。更多赋值节点内容请参见:

https://help.aliyun.com/document_detail/137534.html?spm=a2c4g.11186631.6.778.62b06158yTpLAN

DataWorks百问百答34:mongoDB同步odps实践时如何使用时间戳筛选数据?

这样我们使用unix_timestamp函数将设置的定时周期的时间转成了一个时间戳并传向下游数据集成节点。关于参数的配置、使用、如何获取到想要的时间值,请参见文档链接:

https://help.aliyun.com/document_detail/137548.html?spm=5176.11065259.1996646101.searchclickresult.73481a65zOfzvM

2.下游数据集成节点配置本节点输入参数来接赋值节点的传入的值:(这里我们使用input(名称可自定义))

调度配置引入上游赋值:

DataWorks百问百答34:mongoDB同步odps实践时如何使用时间戳筛选数据?

这样就可以接到上游赋值节点传入的时间戳并在数据集成json配置内部使用参数替换需要用到的时间戳。

3.本数据集成任务的功能:查询出mongoDB中u6字段值大于传入的时间戳的值后将数据同步到odps表中。

odps表结构:字段类型、名称等均可自定义

(create table mongo_uni_odp3(userId string,uclass string,name STRING ,age bigint,email string,birthday string,datastatus STRING,u6 string);)

mongoDB数据结构:

DataWorks百问百答34:mongoDB同步odps实践时如何使用时间戳筛选数据?

json配置一览:

DataWorks百问百答34:mongoDB同步odps实践时如何使用时间戳筛选数据?

完整的数据集成配置json:mongoDB==>odps

{

"type": "job",

"steps": [

{

"stepType": "mongodb",

"parameter": {

"datasource": "wpw_test_mongo",

"query":"{'u6':{'$gte':'${input}'}}",

"column": [

{

"name": "userId",

"type": "string"

},

"name": "uclass",

"name": "name",

"name": "age",

"type": "int"

"name": "email",

"name": "birthday",

"name": "datastatus",

"name": "u6",

}

],

"collectionName": "wpw_test_collec"

},

"name": "Reader",

"category": "reader"

},

"stepType": "odps",

"partition": "",

"truncate": true,

"datasource": "odps_first",

"userId",

"uclass",

"name",

"age",

"email",

"birthday",

"datastatus",

"u6"

"emptyAsNull": false,

"table": "mongo_uni_odp3"

"name": "Writer",

"category": "writer"

}

],

"version": "2.0",

"order": {

"hops": [

{

"from": "Reader",

"to": "Writer"

}

]

},

"setting": {

"errorLimit": {

"record": "0"

"speed": {

"throttle": false,

"concurrent": 1

}

}

任务运行后即可在odps表中查看同步的数据了。

DataWorks百问百答历史记录

请点击这里查看>>

更多DataWorks技术和产品信息,欢迎加入

【DataWorks钉钉交流群】