天天看點

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

作者:大資料雜貨鋪
使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

介紹

随着越來越多的人呆在家裡,讓我們的信用卡代步,網上購物正在興起。不幸的是,與這一趨勢保持同步的是信用卡欺詐的增加。

這真的并不奇怪。據《福布斯》報道,過去幾年,線上欺詐問題日益嚴重。而現在,随着消費者和企業适應全球大流行并在無卡 (CNP) 領域進行更多的信用卡交易,線上購物和電子商務的興起為欺詐者開辟了更大的遊樂場嘗試新花樣。

欺詐檢測一直是金融服務和機構面臨的主要問題。但人工智能在減少金融欺詐方面具有巨大潛力。人工智能應用程式具有檢測和防止欺詐的巨大潛力。

是以,我們将開始一系列文章讨論這一點以及我們如何使用 Cloudera 機制來實施整個信用卡欺詐檢測解決方案。但首先,讓我們從實作它的簡單方法開始:

把事情簡單化

在這個 MVP 上,讓我們首先使用 Apache NiFi 從公共 API 攝取和轉換模拟資料,将該資料轉換為我們的欺詐檢測算法預期格式的資料,将該資料放入 Apache Kafka 主題,并使用 Apache Flink 的 SQL控制台來處理一個簡單的欺詐檢測算法。所有這一切都将在可擴充性方面變得更好,是以錦上添花的是将資料轉換攝取流轉換為帶有 Kubernetes 的 Cloudera 資料流服務。

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

CDF(Cloudera 資料流)和 CSA Cloudera Streaming Analytics 中提供了所有注釋元件:

CLOUDERA 動态資料平台

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

先決條件

我們将使用帶有 CDF 和 CSA Data Hub的 CDP 公共雲(大家在CDP Base中也一樣進行):

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

Data Hub:7.2.14 -使用 Apache NiFi、Apache NiFi Registry 的輕型流量管理

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

Data Hub:7.2.14 - Streams Messaging Light Duty:Apache Kafka、Schema Registry、Streams Messaging Manager、Streams Replication Manager、Cruise Control

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

Data Hub:7.2.14 -使用 Apache Flink 進行輕型流分析

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

資料攝取

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

讓我們開始在 NiFi 中擷取我們的資料。使用 InvokeHTTP Processor,我們可以從randomuser API 收集所有資料。

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

對https://randomuser.me/api/?nat=br的簡單調用将傳回如下内容:

{
"results": [
{
"gender": "female",
"name": {
"title": "Miss",
"first": "Shirlei",
"last": "Freitas"
},
"location": {
"street": {
"number": 6133,
"name": "Rua Santa Luzia "
},
"city": "Belford Roxo",
"state": "Amapá",
"country": "Brazil",
"postcode": 88042,
"coordinates": {
"latitude": "78.0376",
"longitude": "74.2175"
},
"timezone": {
"offset": "+11:00",
"description": "Magadan, Solomon Islands, New Caledonia"
}
},
"email": "[email protected]",
"login": {
"uuid": "d73f9a11-d61c-424d-8309-51d6d8e83a73",
"username": "organicfrog175",
"password": "1030",
"salt": "yhVkrYWm",
"md5": "2bf9beb695c663a0a83aa060f27629c0",
"sha1": "f4dfdef9f2d2a9d04a0622636d0851b5d000164a",
"sha256": "e0a96117182914b3fa7fef22829f6692607bd58eb012b8fee763e34b21acf043"
},
"dob": {
"date": "1991-09-06T08:31:08.082Z",
"age": 31
},
"registered": {
"date": "2009-06-26T00:02:49.893Z",
"age": 13
},
"phone": "(59) 5164-1997",
"cell": "(44) 4566-5655",
"id": {
"name": "",
"value": null
},
"picture": {
"large": "https://randomuser.me/api/portraits/women/82.jpg",
"medium": "https://randomuser.me/api/portraits/med/women/82.jpg",
"thumbnail": "https://randomuser.me/api/portraits/thumb/women/82.jpg"
},
"nat": "BR"
}
],
"info": {
"seed": "fad8d9259d3f2b0b",
"results": 1,
"page": 1,
"version": "1.3"
}
}           

使用 JoltTransformJSON 處理器,我們可以輕松地将之前的 Json 轉換為我們的 JSON 結構:

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

我們将使用JOLT轉換來清理和調整我們的資料:

[
{
"operation": "shift",
"spec": {
"results": {
"*": {
"login": { "username": "customer_id", "uuid": "account_number" },
"name": { "first": "name", "last": "lastname" },
"email": "email",
"gender": "gender",
"location": {
"street": { "number": "charge_amount" },
"country": "country",
"state": "state",
"city": "city",
"coordinates": {
"latitude": "lat",
"longitude": "lon"
}
},
"picture": { "large": "image" }
}
}
}
},
{
"operation": "default",
"spec": {
"center_inferred_lat": -5.0000,
"center_inferred_lon": -5.0000,
"max_inferred_distance": 0.0,
"max_inferred_amount": 0.0
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"lat": "=toDouble",
"lon": "=toDouble"
}
}
]           

我們的輸出轉換資料将是:

Result:
{
"customer_id" : "organicfrog175",
"account_number" : "d73f9a11-d61c-424d-8309-51d6d8e83a73",
"name" : "Shirlei",
"lastname" : "Freitas",
"email" : "[email protected]",
"gender" : "female",
"charge_amount" : 6133,
"country" : "Brazil",
"state" : "Amapá",
"city" : "Belford Roxo",
"lat" : 78.0376,
"lon" : 74.2175,
"image" : "https://randomuser.me/api/portraits/women/82.jpg",
"max_inferred_distance" : 0.0,
"center_inferred_lat" : -5.0,
"center_inferred_lon" : -5.0,
"max_inferred_amount" : 0.0
}           

現在,我們可以使用UpdateRecord 處理器來改進它并在某些字段中擷取一些随機數,是以,使用PublishKafka2RecordCDP處理器将我們的 JSON 資料放入 Kafka。

更新記錄處理器

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

PublishKafka2RecordCDP處理器

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

(重要的是要注意必須根據 Kafka 叢集端點填充的 Kafka 代理變量。)

最後,我們的 NiFi 流程将是這樣的:

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

資料緩沖

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

在 Kafka 叢集上,我們隻需點選 SMM(流消息管理器)元件中的“添加新”按鈕即可建立一個新的 Kafka 主題:我已經建立了 skilltransactions 作為示例。

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

一旦我們已經建立了 NiFi 流和 Kafka 主題,就可以打開您的流并檢視我們的資料進入我們的 Kafka 主題。 您還可以檢視資料資料總管圖示

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

檢視到目前為止所有攝取的資料。

流式 SQL 分析

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

Apache Flink是由Apache 軟體基金會開發的開源、統一的流處理和批處理架構。Flink 提供了一個高吞吐量、低延遲的流媒體引擎,并支援事件時間處理和狀态管理。

Flink 的 Table API 是一種用于關系流和批處理的類 SQL 表達式語言,可以嵌入到 Flink 的 Java 和 Scala DataSet 和 DataStream API 中。表 API 和 SQL 接口對關系表抽象進行操作。可以從外部資料源或現有資料流和資料集中建立表。

Cloudera 開發了一個名為 Cloudera SQL Stream Builder 的應用程式,它可以映射我們的 Kafka Topic,并通過 Flink 的 Table API 将所有資料查詢為一個表。

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

我們将在 SSB 上的表連接配接器上輕松建立我們的“虛拟表”映射:

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測
使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

建立這個“虛拟表”後,我們可以使用 SQL 對使用 power、sin 和 radians SQL 函數進行的交易進行了多遠的數學計算:

select account_number, charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((lat - center_inferred_lat) / 2))) , 2)
+ cos(radians(center_inferred_lat)) * cos(radians(lat))
* (sin(radians((lon - center_inferred_lon) / 2)))
, 2))) as distance, max_inferred_distance, max_inferred_amount
from `skilltransactions`
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((lat - center_inferred_lat) / 2))) , 2)
+ cos(radians(center_inferred_lat)) * cos(radians(lat))
* (sin(radians((lon - center_inferred_lon) / 2)))
, 2))) > max_inferred_distance           

要檢視有關此查詢的更多詳細資訊,請通路我們 Cloudera 社群上 @sunile_manjee撰寫的這篇精彩文章。

我們還可以建立我們的函數,然後調用它或查詢。

例如,讓我們建立一個 DISTANCE_BETWEEN函數并在我們的最終查詢中使用它。

最終查詢

select account_number, charge_amount, DISTANCE_BETWEEN(lat, lon, center_inferred_lat, center_inferred_lon) as distance, max_inferred_distance, max_inferred_amount
from `skilltransactions`
WHERE DISTANCE_BETWEEN(lat, lon, center_inferred_lat, center_inferred_lon) > max_inferred_distance
OR charge_amount > max_inferred_amount           

此時我們的查詢應該可以實時檢測到可疑交易,可以報警了。

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

可是等等!還有更多!

是時候在生産模式中看到它了!

從開發到生産

使用此架構,您可能會在黑色星期五或類似的大型活動中遇到一些問題。為此,您需要以高性能和可擴充性攝取所有流資料;換句話說……Kubernetes 中的 NiFi。

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

Cloudera DataFlow 服務可以在 Kubernetes 中部署 NiFi 流,提供生産環境所需的所有可擴充性。

CLOUDERA 資料流服務——公有雲

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

按照部署向導檢視您的流程以容器模式運作:

部署向導

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

關鍵績效名額

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

儀表闆

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

部署管理器

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

結論

這是流媒體之旅的第一篇文章;在這裡我們可以使用Cloudera Data Flow來實時攝取、緩沖和處理事件。希望看完這篇文章你能看懂CDF和CSA,看到Cloudera Streaming的所有能力,說到底也報警了。

下一篇文章見,我們将在 Kubernetes ( Cloudera Machine Learning ) 上使用機器學習來準确我們的簡單信用卡欺詐檢測并投入生産。

使用 NiFi、Kafka、Flink 和 DataFlow 進行簡單的信用卡欺詐檢測

原文作者:ThiagoSantiago

原文連結:https://community.cloudera.com/t5/Community-Articles/Simple-Credit-Card-Fraud-Detection-with-NiFi-Kafka-Flink-and/ta-p/340228