天天看點

從理論到工程實踐——使用者畫像入門寶典

​ 使用者畫像是大資料頂層應用中最重要的一環,搭建一套适合本公司體系的使用者畫像尤為重要。但是,使用者畫像的資料往往理論居多,實踐少,更少有工程化的實戰案例。

​ 本文檔結合了常見的使用者畫像架構,使用Elasticsearch作為底層存儲支撐,使用者畫像的檢索和可視化效率得到了大幅度的提升。文檔從使用者畫像的理論到實踐均有所涉及,大家可以參照此文檔完成使用者畫像系統從0到1的搭建。

本文檔共分為6個部分,層級結構如下圖所示。

從理論到工程實踐——使用者畫像入門寶典
文檔版權為公衆号 大資料流動 所有,請勿商用。相關技術問題以及安裝包可以聯系筆者獨孤風加入相關技術交流群讨論擷取。

一、什麼是使用者畫像?

使用者畫像簡介

使用者畫像,作為一種勾畫目标使用者、聯系使用者訴求與設計方向的有效工具,使用者畫像在各領域得到了廣泛的應用。

使用者畫像最初是在電商領域得到應用的,在大資料時代背景下,使用者資訊充斥在網絡中,将使用者的每個具體資訊抽象成标簽,利用這些标簽将使用者形象具體化,進而為使用者提供有針對性的服務。

還記得年底收到的支付寶年度消費賬單嗎?幫助客戶回顧一年的消費細節,包括消費能力、消費去向、信用額度等等,再根據每位客戶的消費習慣,量身定制商品推薦清單……這一活動,将資料這個量化的詞以形象生動的表現手法推到了大衆面前。

這就是使用者畫像在電商領域的一個應用,随着我國電子商務的高速發展,越來越多的人注意到資料資訊對于電商市場的推動作用。基于資料分析的精準營銷方式,可以最大限度的挖掘并留住潛在客戶,資料統計與分析為電商市場帶來的突破不可估量。在大資料時代,一切皆可“量化”,看似普通的小小數字背後,蘊藏着無限商機,也正在被越來越多的企業所洞悉。

從理論到工程實踐——使用者畫像入門寶典

如何從大資料中挖掘商機?建立使用者畫像和精準化分析是關鍵。

使用者畫像可以使産品的服務對象更加聚焦,更加的專注。在行業裡,我們經常看到這樣一種現象:做一個産品,期望目标使用者能涵蓋所有人,男人女人、老人小孩、專家小白、文青屌絲...... 通常這樣的産品會走向消亡,因為每一個産品都是為特定目标群的共同标準而服務的,當目标群的基數越大,這個标準就越低。換言之, 如果這個産品是适合每一個人的,那麼其實它是為最低的标準服務的,這樣的産品要麼毫無特色,要麼過于簡陋。

從理論到工程實踐——使用者畫像入門寶典

縱覽成功的産品案例,他們服務的目标使用者通常都非常清晰,特征明顯,展現在産品上就是專注、極緻,能解決核心問題。比如蘋果的産品,一直都為有态度、追求品質、特立獨行的人群服務,赢得了很好的使用者口碑及市場佔有率。又比如豆瓣,專注文藝事業十多年,隻為文藝青年服務,使用者粘性非常高,文藝青年在這裡能找到知音,找到歸宿。是以,給特定群體提供專注的服務,遠比給廣泛人群提供低标準的服務更接近成功。 其次,使用者畫像可以在一定程度上避免産品設計人員草率的代表使用者。代替使用者發聲是在産品設計中常出現的現象,産品設計人員經常不自覺的認為使用者的期望跟他們是一緻的,并且還總打着“為使用者服務”的旗号。這樣的後果往往是:我們精心設計的服務,使用者并不買賬,甚至覺得很糟糕。

在産品研發和營銷過程中,确定目标使用者是首要任務。不同類型的使用者往往有不同甚至相沖突的需求,企業不可能做出一個滿足所有使用者的産品和營銷。是以,通過大資料建立使用者畫像是必不可少的。

這隻是使用者畫像在電商領域的應用,事實上使用者畫像已經不知不覺的滲透到了各個領域,在目前最火的抖音,直播等領域,推薦系統在大資料時代到來以後,使用者的一切行為都是可以追溯分析的。

使用者畫像實作步驟

什麼是使用者畫像?使用者畫像是根據市場研究和資料,建立的理想中客戶虛構的表示。建立使用者畫像,這将有助于了解現實生活中的目标閱聽人。企業建立的人物角色畫像,具體到針對他們的目标和需求,并解決他們的問題,同時,這将幫助企業更加直覺的轉化客戶。

使用者畫像最重要的一個步驟就是對使用者标簽化,我們要明确要分析使用者的各種次元,才能确定如何對使用者進行畫像。

在建立使用者畫像上,有很多個步驟:

  • 首先,基礎資料收集,電商領域大緻分為行為資料、内容偏好資料、交易資料,如浏覽量、通路時長、家具偏好、回頭率等等。而金融領域又有貸款資訊,信用卡,各種征信資訊等等。
  • 然後,當我們對使用者畫像所需要的基礎資料收集完畢後,需要對這些資料進行分析和加工,提煉關鍵要素,建構可視化模型。對收集到的資料進行行為模組化,抽象出使用者的标簽。電商領域可能是把使用者的基本屬性、購買能力、行為特征、興趣愛好、心理特征、社交網絡大緻的标簽化,而金融風控領域則是更關注使用者的基本資訊,風險資訊,财務資訊等等。
  • 随後,要利用大資料的整體架構對标簽化的過程進行開發實作,對資料進行加工,将标簽管理化。同時将标簽計算的結果進行計算。這個過程中需要依靠Hive,Hbase等大資料技術,為了提高資料的實時性,還要用到Flink,Kafka等實時計算技術。
  • 最後,也是最關鍵的一步,要将我們的計算結果,資料,接口等等,形成服務。比如,圖表展示,可視化展示,
從理論到工程實踐——使用者畫像入門寶典

事實上,在建構使用者畫像過程中,注重提取資料的多元性而不是單一性,譬如針對不同類型的客戶提取不同的資料,又或者針對線上線下的客戶分析其中差異。總而言之,保證資料的豐富性、多樣性、科學性,是建立精準使用者畫像的前提。

當使用者畫像基本成型後,接下來就可以對其進行形象化、精準化的分析。此時一般是針對群體的分析,如可以根據使用者價值來細分出核心使用者、評估某一群體的潛在價值空間,以此作出針對性的産品結構、經營政策、客戶引導的調整。是以,突出研發和展示此類型的産品,又在家具的整體搭配展示中進行相關的主題設計,以此吸引目标人群的關注和購買。

毫無疑問,大資料在商業市場中的運用效果已經突顯,在競争激烈的各個行業,誰能抓住大資料帶來的優勢,誰才更有機會引領行業的未來。

使用者畫像的實時性

現在大資料應用比較火爆的領域,比如推薦系統在實踐之初受技術所限,可能要一分鐘,一小時,甚至更久對使用者進行推薦,這遠遠不能滿足需要,我們需要更快的完成對資料的處理,而不是進行離線的批處理。

現在企業對于資料的實時要求越來越高,已經不滿足于T+1的方式,有些場景下不可能間隔一天才回報出結果。特别是推薦,風控等領域,需要小時,分鐘,甚至秒級别的實時資料響應。而且這種秒級别響應的不隻是簡單的資料流,而且經過與離線計算一樣的,複雜的聚合分析之後的結果,這種難度其實非常大。

幸好實時計算架構的崛起足夠我們解決這些問題,近年來Flink,Kafka等實時計算技術的架構與技術越來越穩定,足夠我們支撐這些使用場景。

從理論到工程實踐——使用者畫像入門寶典

在實時使用者畫像的建構中,通過對實時資料的不斷疊代計算,逐漸的不斷完善出使用者畫像的全貌,這也正符合資料傳輸的本質,這整體架構中,淡化離線計算在之前特别重的作用,隻留做歸檔和曆史查詢使用,更多的資料通過實時計算進行輸出,最終達到對使用者畫像的目的。

在實時計算的過程需要對資料實時聚合計算,而複雜的标簽也需要實時的進行機器學習,難度巨大,但是最終對于畫像的實時性有着重大的意義。

從理論到工程實踐——使用者畫像入門寶典

二、使用者畫像系統架構

前文中我們已經知道使用者畫像對于企業的巨大意義,當然也有着非常大實時難度。那麼在使用者畫像的系統架構中都有哪些難度和重點要考慮的問題呢?

挑戰

挑戰(一)——大資料

随着網際網路的崛起和智能手機的興起,以及物聯網帶來的各種可穿戴裝置,我們能擷取的每一個使用者的資料量是非常巨大的,而使用者量本身更是巨大的,我們面臨的是TB級,PB級的資料,是以我們必須要一套可以支撐大資料量的高可用性,高擴充性的系統架構來支撐使用者畫像分析的實作。毫無疑問,大資料時代的到來讓這一切都成為可能,近年來,以Hadoop為代表的大資料技術如雨後春筍般迅速發展,每隔一段時間都會有一項新的技術誕生,不斷驅動的業務向前,這讓我們對于使用者畫像的簡單統計,複雜分析,機器學習都成為可能。是以整體使用者畫像體系必須建立在大資料架構之上。

從理論到工程實踐——使用者畫像入門寶典

挑戰(二)——實時性

在Hadoop崛起初期,大部分的計算都是通過批處理完成的,也就是T+1的處理模式,要等一天才能知道前一天的結果。但是在使用者畫像領域,我們越來越需要實時性的考慮,我們需要在第一時間就得到各種次元的結果,在實時計算的初期隻有Storm一家獨大,而Storm對于時間視窗,水印,觸發器都沒有很好的支援,而且保證資料一緻性時将付出非常大的性能代價。但Kafka和Flink等實時流式計算架構的出現改變了這一切,資料的一緻性,事件時間視窗,水印,觸發器都成為很容易的實作。而實時的OLAP架構Druid更是讓互動式實時查詢成為可能。這這些高性能的實時架構成為支撐我們建立實時使用者畫像的最有力支援。

從理論到工程實踐——使用者畫像入門寶典

挑戰(三)——與數倉的結合

  • 資料倉庫的概念由來已久,在我們得到海量的資料以後,如何将資料變成我們想要的樣子,這都需要ETL,也就是對資料進行抽取(extract)、轉換(transform)、加載(load)的過程,将資料轉換成想要的樣子儲存在目标端。毫無疑問,Hive是作為離線數倉的不二選擇,而hive使用的新引擎tez也有着非常好的查詢性能,而最近新版本的Flink也支援了hive性能非常不錯。但是在實時使用者畫像架構中,Hive是作為一個按天的歸檔倉庫的存在,作為曆史資料形成的最終存儲所在,也提供了曆史資料查詢的能力。而Druid作為性能良好的實時數倉,将共同提供資料倉庫的查詢與分析支撐,Druid與Flink配合共同提供實時的處理結果,實時計算不再是隻作為實時資料接入的部分,而真正的挑起大梁。

    是以,兩者的差別僅在于資料的處理過程,實時流式處理是對一個個流的反複處理,形成一個又一個流表,而數倉的其他概念基本一緻。

    數倉的基本概念如下:

    • Extract,資料抽取,也就是把資料從資料源讀出來。
    • Transform,資料轉換,把原始資料轉換成期望的格式和次元。如果用在資料倉庫的場景下,Transform也包含資料清洗,清洗掉噪音資料。
    • Load 資料加載,把處理後的資料加載到目标處,比如資料倉庫。
    • DB 是現有的資料來源(也稱各個系統的中繼資料),可以為mysql、SQLserver、檔案日志等,為資料倉庫提供資料來源的一般存在于現有的業務系統之中。
    • ETL的是 Extract-Transform-Load 的縮寫,用來描述将資料從來源遷移到目标的幾個過程:
    • ODS(Operational Data Store) 操作性資料,是作為資料庫到資料倉庫的一種過渡,ODS的資料結構一般與資料來源保持一緻,便于減少ETL的工作複雜性,而且ODS的資料周期一般比較短。ODS的資料最終流入DW
    • DW (Data Warehouse)資料倉庫,是資料的歸宿,這裡保持這所有的從ODS到來的資料,并長期儲存,而且這些資料不會被修改。
    • DM(Data Mart) 資料集市,為了特定的應用目的或應用範圍,而從資料倉庫中獨立出來的一部分資料,也可稱為部門資料或主題資料。面向應用。
  • 在整個資料的處理過程中我們還需要自動化的排程任務,免去我們重複的工作,實作系統的自動化運作,Airflow就是一款非常不錯的排程工具,相比于老牌的Azkaban 和 Oozie,基于Python的工作流DAG,確定它可以很容易地進行維護,版本化和測試,當然最終提供的服務不僅僅是可視化的展示,還有實時資料的提供,最終形成使用者畫像的實時服務,形成産品化。
從理論到工程實踐——使用者畫像入門寶典
  • 至此我們所面臨的問題都有了非常好的解決方案,下面我們設計出我們系統的整體架構,并分析我們需要掌握的技術與所需要的做的主要工作。
  • 架構設計

  • 依據上面的分析與我們要實作的功能,我們将依賴Hive和Druid建立我們的資料倉庫,使用Kafka進行資料的接入,使用Flink作為我們的流處理引擎,對于标簽的中繼資料管理我們還是依賴Mysql作為把标簽的管理,并使用Airflow作為我們的排程任務架構,并最終将結果輸出到Mysql和Hbase中。對于标簽的前端管理,可視化等功能依賴Springboot+Vue.js搭建的前後端分離系統進行展示,而Hive和Druid的可視化查詢功能,我們也就使用強大的Superset整合進我們的系統中,最終系統的架構圖設計如下:
從理論到工程實踐——使用者畫像入門寶典

相對于傳統的技術架構,實時技術架構将極大的依賴于Flink的實時計算能力,當然大部分的聚合運算我們還是可以通過Sql搞定,但是複雜的機器學習運算需要依賴編碼實作。而标簽的存儲細節還是放在Mysql中,Hive與Druid共同建立起資料倉庫。相對于原來的技術架構,隻是将計算引擎由Spark換成了Flink,當然可以選擇Spark的structured streaming同樣可以完成我們的需求,兩者的取舍還是依照具體情況來做分析。

傳統架構如下:

從理論到工程實踐——使用者畫像入門寶典

這樣我們就形成,資料存儲,計算,服務,管控的強有力的支撐,我們是否可以開始搭建大資料叢集了呢?其實還不着急,在開工之前,需求的明确是無比重要的,針對不同的業務,電商,風控,還是其他行業都有着不同的需求,對于使用者畫像的要求也不同,那麼該如何明确這些需求呢,最重要的就是定義好使用者畫像的标簽體系,這是涉及技術人員,産品,營運等崗位共同讨論的結果,也是使用者畫像的核心所在。

三、标簽體系

什麼是标簽?

使用者畫像的核心在于給使用者“打标簽”,每一個标簽通常是人為規定的特征辨別,用高度精煉的特征描述一類人,例如年齡、性别、興趣偏好等,不同的标簽通過結構化的資料體系整合,就可與組合出不同的使用者畫像。

梳理标簽體系是實作使用者畫像過程中最基礎、也是最核心的工作,後續的模組化、資料倉庫搭建都會依賴于标簽體系。

為什麼需要梳理标簽體系,因為不同的企業做使用者畫像有不同的戰略目的,廣告公司做使用者畫像是為精準廣告服務,電商做使用者畫像是為使用者購買更多商品,内容平台做使用者畫像是推薦使用者更感興趣的内容提升流量再變現,金融行業做使用者畫像是為了尋找到目标客戶的同時做好風險的控制。

是以第一步,我們要結合所在的行業,業務去分析我們使用者畫像的目的。這其實就是戰略,我們要通過戰略去指引我們最終的方向。

從理論到工程實踐——使用者畫像入門寶典

對于電商企業來說,可能最重要的兩個問題就是:

現有使用者- 我的現存使用者是誰?為什麼買我的産品?他們有什麼偏好?哪些使用者價值最高?

潛在客戶- 我的潛在使用者在哪兒?他們喜歡什麼?哪些管道能找到他們?獲客成本是多少?

而對于金融企業,還要加上一條:

使用者風險—使用者的收入能力怎麼樣?他們是否有過貸款或者信用卡的逾期?他們的征信有問題嗎?

我們做使用者畫像的目的也就是根據我們指定的戰略方向最終去解決這些問題。

在梳理标簽的過程還要緊密的結合我們的資料,不能脫離了資料去空想,當然如果是我們必須要的資料,我們可能需要想辦法去擷取這些資料,這就是資料采集的問題,我們之後會深入的讨論。

先展示兩種常見的标簽體系,随後我們将按步驟建立我們的标簽體系。

電商類标簽體系

從理論到工程實踐——使用者畫像入門寶典

可以看到電商類的标簽體系,更關注使用者的屬性,行為等等資訊。那麼我們需要的資料也就來源于使用者可提供的基本資訊,以及使用者的行為資訊,這些我們可以通過埋點擷取,而使用者的訂單情況也是非常的重要的标簽。

金融類标簽體系

從理論到工程實踐——使用者畫像入門寶典

對于金融行業,最明顯的差別是增加了使用者的價值和使用者風險的資訊。這些資訊在使用者申請貸款時一般都可以提供,還有很多資訊需要通過征信擷取。

最終,不管是電商還是金融或者其他領域,我們都可以通過資料對使用者進行畫像,最終建立标簽體系,影響我們的業務,最終實作戰略目的。

下面我們來具體看一下如何一步步的分析建立整體标簽體系。

标簽的次元與類型

在我們建立使用者标簽時,首先要明确基于哪種次元去建立标簽。

一般除了基于使用者次元(userid)建立使用者标簽體系外,還有基于裝置次元(cookieid)建立相應的标簽體系,當使用者沒有登入裝置時,就需要這個次元。當然這兩個次元還可以進行關聯。

而兩者的關聯就是需要ID-Mapping算法來解決,這也是一個非常複雜的算法。更多的時候我們還是以使用者的唯一辨別來建立使用者畫像。

而标簽也分為很多種類型,這裡參照常見的分類方式,

從對使用者打标簽的方式來看,一般分為三種類型:1、基于統計類的标簽;2、基于規則類的标簽、3、基于挖掘類的标簽。下面我們介紹這三種類型标簽的差別:

  • 統計類标簽:這類标簽是最為基礎也最為常見的标簽類型,例如對于某個使用者來說,他的性别、年齡、城市、星座、近7日活躍時長、近7日活躍天數、近7日活躍次數等字段可以從使用者注冊資料、使用者通路、消費類資料中統計得出。該類标簽構成了使用者畫像的基礎;
  • 規則類标簽:該類标簽基于使用者行為及确定的規則産生。例如對平台上“消費活躍”使用者這一口徑的定義為近30天交易次數>=2。在實際開發畫像的過程中,由于營運人員對業務更為熟悉、而資料人員對資料的結構、分布、特征更為熟悉,是以規則類标簽的規則确定由營運人員和資料人員共同協商确定;
  • 機器學習挖掘類标簽:該類标簽通過資料挖掘産生,應用在對使用者的某些屬性或某些行為進行預測判斷。例如根據一個使用者的行為習慣判斷該使用者是男性還是女性,根據一個使用者的消費習慣判斷其對某商品的偏好程度。該類标簽需要通過算法挖掘産生。

标簽的類型是對标簽的一個區分,友善我們了解标簽是在資料處理的哪個階段産生的,也更友善我們管理。

标簽分級分類

标簽需要進行分級分類的管理,一方面使得标簽更加的清晰有條件,另一方面也友善我們對标簽進行存儲查詢,也就是管理标簽。

從理論到工程實踐——使用者畫像入門寶典

使用者畫像體系和标簽分類從兩個不同角度來梳理标簽,使用者畫像體系偏戰略和應用,标簽分類偏管理和技術實作側。

把标簽分成不同的層級和類别,一是友善管理數千個标簽,讓散亂的标簽體系化;二是次元并不孤立,标簽之間互有關聯;三可以為标簽模組化提供标簽子集。

梳理某類别的子分類時,盡可能的遵循MECE原則(互相獨立、完全窮盡),尤其是一些有關使用者分類的,要能覆寫所有使用者,但又不交叉。比如:使用者活躍度的劃分為核心使用者、活躍使用者、新使用者、老使用者、流失使用者,使用者消費能力分為超強、強、中、弱,這樣按照給定的規則每個使用者都有分到不同的組裡。

标簽命名

标簽的命名也是為了我們可以對标簽進行統一的管理,也更好識别出是什麼标簽。

從理論到工程實踐——使用者畫像入門寶典

這是一種非常好的命名方式,解釋如下:

标簽主題:用于刻畫屬于那種類型的标簽,如使用者屬性、使用者行為、使用者消費、風險控制等多種類型,可用A、B、C、D等 字母表示各标簽主題; 标簽類型:标簽類型可劃為分類型和統計型這兩種類型,其中分類型用于刻畫使用者屬于哪種類型,如是男是女、是否是會員、 是否已流失等标簽,統計型标簽用于刻畫統計使用者的某些行為次數,如曆史購買金額、優惠券使用次數、近30日登陸次數等 标簽,這類标簽都需要對應一個使用者相應行為的權重次數; 開發方式:開發方式可分為統計型開發和算法型開發兩大開發方式。其中統計型開發可直接從資料倉庫中各主題表模組化加工 而成,算法型開發需要對資料做機器學習的算法處理得到相應的标簽; 是否互斥标簽:對應同一級類目下(如一級标簽、二級标簽),各标簽之間的關系是否為互斥,可将标簽劃分為互斥關系和 非互斥關系。例如對于男、女标簽就是互斥關系,同一個使用者不是被打上男性标簽就是女性标簽,對于高活躍、中活躍、低 活躍标簽也是互斥關系; 使用者次元:用于刻畫該标簽是打在使用者唯一辨別(userid)上,還是打在使用者使用的裝置(cookieid)上。可用U、C等字 母分别辨別userid和cookieid次元。

最終形成得标簽示例:

對于使用者是男是女這個标簽,标簽主題是使用者屬性,标簽類型屬于分類型,開發方式為統計型,為互斥關系,使用者 次元為userid。這樣給男性使用者打上标簽“A111U001_001”,女性使用者打上标簽“A111U001002”,其中 “A111U”為上面介紹的命名方式,“001”為一級标簽的id,後面對于使用者屬性次元的其他一級标簽可用“002”、 “003”等方式追加命名,“”後面的“001”和“002”為該一級标簽下的标簽明細,如果是劃分高、中、低活躍 使用者的,對應一級标簽下的明細可劃分為“001”、“002”、“003”。

從理論到工程實踐——使用者畫像入門寶典

标簽存儲與管理

Hive與Druid數倉存儲标簽計算結果集

因為資料非常大,是以跑标簽出來的結果必須要通過hive和druid數倉引擎來完成。

在資料倉庫的模組化過程中,主要是事實表和次元表的開發。

事實表依據業務來開發,描述業務的過程,可以了解為我們對原始資料做ETL整理後業務事實。

而次元表就是我們最終形成的使用者次元,次元表是實時變化的,逐漸的建立起使用者的畫像。

比如使用者次元标簽:

首先我們根據之前讨論的使用者名額體系,将使用者按照人口,行為,消費等等建立相關中間表,注意表的命名。

同樣的,其他的也按這種方式進行存儲,這種屬性類的計算很容易篩選出來。

然後,我們将使用者的标簽查詢出來,彙總到使用者身上:

最終使用者的标簽就形成了

當然,對于複雜的規則和算法類标簽,就需要在計算中間表時做更複雜的計算,我們需要在Flink裡解決這些複雜的計算,未來開發中我們會詳細的讨論,這一部分先根據标簽體系把相應的表結構都設計出來。

Mysql存儲标簽中繼資料

Mysql對于小資料量的讀寫速度更快,也更适合我們對标簽定義,管理。我們也可以在前端開發标簽的管理頁面。

我們在mysql存儲的字段,在頁面上提供編輯等功能,在開發标簽的過程中,就可以控制标簽的使用了。

這樣,我們的标簽體系已經根據實際的業務情況建立起來了,在明确了标簽體系以後,也就明确了我們的業務支撐,從下一章開始我們将正式開始搭建大資料叢集,接入資料,進行标簽開發。

四、使用者畫像大資料環境搭建

本章我們開始正式搭建大資料環境,目标是建構一個穩定的可以運維監控的大資料環境。我們将采用Ambari搭建底層的Hadoop環境,使用原生的方式搭建Flink,Druid,Superset等實時計算環境。使用大資料建構工具與原生安裝相結合的方式,共同完成大資料環境的安裝。

Ambari搭建底層大資料環境

Apache Ambari是一種基于Web的工具,支援Apache Hadoop叢集的供應、管理和監控。Ambari已支援大多數Hadoop元件,包括HDFS、MapReduce、Hive、Pig、 Hbase、Zookeeper、Sqoop和Hcatalog等。

Apache Ambari 支援HDFS、MapReduce、Hive、Pig、Hbase、Zookeepr、Sqoop和Hcatalog等的集中管理。也是頂級的hadoop管理工具之一。

本文使用的Ambari的版本為2.7,支援的元件也越來越豐富。

從理論到工程實踐——使用者畫像入門寶典

Hadoop的發行版本有很多,有華為發行版,Intel發行版,Cloudera發行版(CDH),MapR版本,以及HortonWorks版本等。所有發行版都是基于Apache Hadoop衍生出來的,産生這些版本的原因,是由于Apache Hadoop的開源協定決定的:任何人可以對其進行修改,并作為開源或商業産品釋出和銷售。

收費版本:收費版本一般都會由新的特性。國内絕大多數公司發行的版本都是收費的,例如Intel發行版本,華為發行版本等。

免費版本:不收費的版本主要有三個(都是國外廠商)。Cloudera版本(Cloudera’s Distribution Including Apache Hadoop)簡稱”CDH“。Apache基金會hadoop Hontonworks版本(Hortonworks Data Platform)簡稱“HDP”。按照順序代表了國内的使用率,CDH和HDP雖然是收費版本,但是他們是開源的,隻是收取服務費用,嚴格上講不屬于收費版本。

Ambari基于HDP安裝,但是他們不同版本之間有不同的對應關系。

也就是支援最新的版本為HDP 3.1.5 而HDP包含了大資料的基本元件如下:

從理論到工程實踐——使用者畫像入門寶典

已經非常的豐富了,下面我們開始Ambari的安裝。

前期準備

前期準備分為四部分

主機,資料庫,浏覽器,JDK

主機

請先準備好安裝Ambari的主機,開發環境可以三台就ok,其他環境依據公司機器規模而确定。

假設開發環境的三台機器為:

192.168.12.101 master 192.168.12.102 slave1 192.168.12.103 slave2

主機的最低要求如下:

軟體要求

在每個主機上:

  • yum

    rpm

    (RHEL / CentOS / Oracle / Amazon Linux)
  • zypper

    php_curl

    (SLES)
  • apt

    (Debian / Ubuntu)
  • scp, curl, unzip, tar

    wget

    gcc*

  • OpenSSL(v1.01,内部版本16或更高版本)
  • Python(帶python-devel *)

Ambari主機應至少具有1 GB RAM,并具有500 MB可用空間。

要檢查任何主機上的可用記憶體,請運作:

free -m
           

本地倉庫

如果網速不夠快,我們可以将包下載下傳下來,建立本地倉庫。網速夠快可以忽略這步。

先下載下傳安裝包

安裝httpd服務

yum install yum-utils createrepo
[root@master ~]# yum -y install httpd
[root@master ~]# service httpd restart
Redirecting to /bin/systemctl restart httpd.service
[root@master ~]# chkconfig httpd on
           

随後建立一個本地yum源

mkdir -p /var/www/html/
           

将剛剛下載下傳的包解壓到這個目錄下。

随後通過浏覽器 通路 成功

createrepo  ./
制作本地源  修改檔案裡邊的源位址
vi  ambari.repo
vi hdp.repo


#VERSION_NUMBER=2.7.5.0-72
[ambari-2.7.5.0]
#json.url = http://public-repo-1.hortonworks.com/HDP/hdp_urlinfo.json
name=ambari Version - ambari-2.7.5.0
baseurl=https://username:[email protected]/p/ambari/centos7/2.x/updates/2.7.5.0
gpgcheck=1
gpgkey=https://username:[email protected]/p/ambari/centos7/2.x/updates/2.7.5.0/RPM-GPG-KEY/RPM-GPG-KEY-Jenkins
enabled=1
priority=1

[root@master ambari]# yum clean all
[root@master ambari]# yum makecache
[root@master ambari]# yum repolist
           

軟體準備

為了友善以後的管理,我們要對機器做一些配置

安裝JDK
下載下傳位址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

rpm -ivh jdk-8u161-linux-x64.rpm
java -version
通過vi /etc/hostname 進行修改機器名  這裡主要是為了可以實作通過名稱來查找相應的伺服器

  各個節點修改成相應的名稱,分别為master,slave1.slave2
  vi /etc/hosts
192.168.12.101 master
192.168.12.102 slave1
192.168.12.103 slave2

 vi /etc/sysconfig/network
 NETWORKING=yes
HOSTNAME=master(其他的節點也對應修改)
關閉防火牆
[root@master~]#systemctl disable firewalld
[root@master~]#systemctl stop firewalld
ssh免密
ssh-keygen

ssh-copy-id -i ~/.ssh/id_rsa.pub remote-host
           

不同的環境會有不同的問題存在,大家可以參考官網手冊進行相應的安裝。

安裝ambari-server

ambariserver将最終帶我們完成大資料叢集的安裝

yum install ambari-server

Installing : postgresql-libs-9.2.18-1.el7.x86_64         1/4
Installing : postgresql-9.2.18-1.el7.x86_64              2/4
Installing : postgresql-server-9.2.18-1.el7.x86_64       3/4
Installing : ambari-server-2.7.5.0-124.x86_64           4/4
Verifying  : ambari-server-2.7.5.0-124.x86_64           1/4
Verifying  : postgresql-9.2.18-1.el7.x86_64              2/4
Verifying  : postgresql-server-9.2.18-1.el7.x86_64       3/4
Verifying  : postgresql-libs-9.2.18-1.el7.x86_64         4/4

Installed:
  ambari-server.x86_64 0:2.7.5.0-72
Dependency Installed:
 postgresql.x86_64 0:9.2.18-1.el7
 postgresql-libs.x86_64 0:9.2.18-1.el7
 postgresql-server.x86_64 0:9.2.18-1.el7
Complete!
           

啟動與設定

設定

ambari-server setup
           

不推薦直接用内嵌的postgresql,因為其他服務還要用mysql

安裝配置 MySql

yum install -y wget

wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm

rpm -ivh mysql57-community-release-el7-10.noarch.rpm

yum -y install mysql-community-server

systemctl enable mysqld

systemctl start mysqld.service

systemctl status mysqld.service

grep "password" /var/log/mysqld.log

mysql -uroot -p

set global validate_password_policy=0;

set global validate_password_length=1;

set global validate_password_special_char_count=0;

set global validate_password_mixed_case_count=0;

set global validate_password_number_count=0;

select @@validate_password_number_count,@@validate_password_mixed_case_count,@@validate_password_number_count,@@validate_password_length;



ALTER USER 'root'@'localhost' IDENTIFIED BY 'password';

grant all privileges on . to 'root'@'%' identified by 'password' with grant option;

flush privileges;

exit
yum -y remove mysql57-community-release-el7-10.noarch

下載下傳mysql驅動,放到三台的

/opt/ambari/mysql-connector-java-5.1.48.jar


初始化資料庫

mysql -uroot -p
create database ambari;

use ambari

source /var/lib/ambari-server/resources/Ambari-DDL-MySQL-CREATE.sql



CREATE USER 'ambari'@'localhost' IDENTIFIED BY 'bigdata';

CREATE USER 'ambari'@'%' IDENTIFIED BY 'bigdata';

GRANT ALL PRIVILEGES ON ambari.* TO 'ambari'@'localhost';

GRANT ALL PRIVILEGES ON ambari.* TO 'ambari'@'%';

FLUSH PRIVILEGES;
           

完成ambari的配置

[root@localhost download]# ambari-server setup
Using python  /usr/bin/python
Setup ambari-server
Checking SELinux...
SELinux status is 'enabled'
SELinux mode is 'permissive'
WARNING: SELinux is set to 'permissive' mode and temporarily disabled.
OK to continue [y/n] (y)? y
Customize user account for ambari-server daemon [y/n] (n)? y
Enter user account for ambari-server daemon (root):
Adjusting ambari-server permissions and ownership...
Checking firewall status...
Checking JDK...
[1] Oracle JDK 1.8 + Java Cryptography Extension (JCE) Policy Files 8
[2] Custom JDK
==============================================================================
Enter choice (1): 2
WARNING: JDK must be installed on all hosts and JAVA_HOME must be valid on all hosts.
WARNING: JCE Policy files are required for configuring Kerberos security. If you plan to use Kerberos,please make sure JCE Unlimited Strength Jurisdiction Policy Files are valid on all hosts.
Path to JAVA_HOME: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-0.el7_7.x86_64/jre
Validating JDK on Ambari Server...done.
Check JDK version for Ambari Server...
JDK version found: 8
Minimum JDK version is 8 for Ambari. Skipping to setup different JDK for Ambari Server.
Checking GPL software agreement...
GPL License for LZO: https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html
Enable Ambari Server to download and install GPL Licensed LZO packages [y/n] (n)? y
Completing setup...
Configuring database...
Enter advanced database configuration [y/n] (n)? y
Configuring database...
==============================================================================
Choose one of the following options:
[1] - PostgreSQL (Embedded)
[2] - Oracle
[3] - MySQL / MariaDB
[4] - PostgreSQL
[5] - Microsoft SQL Server (Tech Preview)
[6] - SQL Anywhere
[7] - BDB
==============================================================================
Enter choice (1): 3
Hostname (localhost):
Port (3306):
Database name (ambari):
Username (ambari):
Enter Database Password (bigdata):
Configuring ambari database...
Enter full path to custom jdbc driver: /opt/ambari/mysql-connector-java-5.1.48.jar
Copying /opt/ambari/mysql-connector-java-5.1.48.jar to /usr/share/java
Configuring remote database connection properties...
WARNING: Before starting Ambari Server, you must run the following DDL directly from the database shell to create the schema: /var/lib/ambari-server/resources/Ambari-DDL-MySQL-CREATE.sql
Proceed with configuring remote database connection properties [y/n] (y)? y
Extracting system views...
.....
Ambari repo file contains latest json url http://public-repo-1.hortonworks.com/HDP/hdp_urlinfo.json, updating stacks repoinfos with it...
Adjusting ambari-server permissions and ownership...
Ambari Server 'setup' completed successfully.
           

随後就可以啟動了

ambari-server start

ambari-server status

ambari-server stop
           

通路如下位址

http://<your.ambari.server>:8080
           

叢集安裝

從理論到工程實踐——使用者畫像入門寶典

接下來進行叢集的安裝,包括命名,ssh免密,選擇版本,規劃叢集

從理論到工程實踐——使用者畫像入門寶典

最終完成叢集安裝,我們就可以在頁面管理我們的叢集了。

詳細官網安裝文檔pdf請在關注“實時流式計算” 背景回複ambari

實時計算環境搭建

由于ambari支援的druid版本較低,目前暫不支援flink,是以除kafka外的實時計算元件,需要手動安裝,也友善以後的更新。

Linux系統上安裝flink

叢集安裝分為以下幾步:

1、在每台機器上複制解壓出來的flink目錄。

2、選擇一個作為master節點,然後修改所有機器conf/flink-conf.yaml

jobmanager.rpc.address = master主機名
           

3、修改conf/slaves,将所有work節點寫入

work01
work02
           

4、在master上啟動叢集

bin/start-cluster.sh
           

安裝在Hadoop

我們可以選擇讓Flink運作在Yarn叢集上。

下載下傳Flink for Hadoop的包

保證 HADOOP_HOME已經正确設定即可

啟動 bin/yarn-session.sh

運作flink示例程式

批處理示例:

送出flink的批處理examples程式:

bin/flink run examples/batch/WordCount.jar
           

這是flink提供的examples下的批處理例子程式,統計單詞個數。

$ bin/flink run examples/batch/WordCount.jar
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
           

Druid叢集部署

部署建議

叢集部署采用的配置設定如下:

  • 主節點部署 Coordinator 和 Overlord程序
  • 兩個資料節點運作 Historical 和 MiddleManager程序
  • 一個查詢節點 部署Broker 和 Router程序

未來我們可以添加更多的主節點和查詢節點

主節點建議 8vCPU 32GB記憶體

配置檔案位于

conf/druid/cluster/master
           

資料節點建議

16 vCPU 122GB記憶體 2 * 1.9TB SSD

conf/druid/cluster/data
           

查詢伺服器 建議 8vCPU 32GB記憶體

conf/druid/cluster/query
           

開始部署

下載下傳0.17.0發行版

解壓

tar -xzf apache-druid-0.17.0-bin.tar.gz
cd apache-druid-0.17.0
           

叢集模式的主要配置檔案都位于:

conf/druid/cluster
           

配置中繼資料存儲

conf/druid/cluster/_common/common.runtime.properties
           

替換

druid.metadata.storage.connector.connectURI
druid.metadata.storage.connector.host
           

例如配置mysql為中繼資料存儲

在mysql中配置好通路權限:

-- create a druid database, make sure to use utf8mb4 as encoding
CREATE DATABASE druid DEFAULT CHARACTER SET utf8mb4;

-- create a druid user
CREATE USER 'druid'@'localhost' IDENTIFIED BY 'druid';

-- grant the user all the permissions on the database we just created
GRANT ALL PRIVILEGES ON druid.* TO 'druid'@'localhost';
           

在druid中配置

druid.extensions.loadList=["mysql-metadata-storage"]
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://<host>/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
           

配置深度存儲

将資料存儲配置為S3或者HDFS

比如配置HDFS,修改

conf/druid/cluster/_common/common.runtime.properties
druid.extensions.loadList=["druid-hdfs-storage"]

#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments

druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments

#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs
           

将Hadoop配置XML(core-site.xml,hdfs-site.xml,yarn-site.xml,mapred-site.xml)放在Druid中

conf/druid/cluster/_common/
           

配置zookeeper連接配接

還是修改

conf/druid/cluster/_common/
           

下的

druid.zk.service.host
           

為zk伺服器位址就可以了

啟動叢集

啟動前注意打開端口限制

主節點:

derby 1527

zk 2181

Coordinator 8081

Overlord 8090

資料節點:

Historical 8083

Middle Manager 8091, 8100–8199

查詢節點:

Broker 8082

Router 8088

記得将剛才配好的druid複制到各個節點

啟動主節點

由于我們使用外部zk 是以使用no-zk啟動

bin/start-cluster-master-no-zk-server
           

啟動資料伺服器

bin/start-cluster-data-server
           

啟動查詢伺服器

bin/start-cluster-query-server
           

這樣的話 叢集就啟動成功了!

至此,我們的大資料環境基本搭建完畢,下一章我們将接入資料,開始進行标簽的開發。

五、标簽開發

資料接入

資料的接入可以通過将資料實時寫入Kafka進行接入,不管是直接的寫入還是通過oracle和mysql的實時接入方式,比如oracle的ogg,mysql的binlog

ogg

Golden Gate(簡稱OGG)提供異構環境下交易資料的實時捕捉、變換、投遞。

通過OGG可以實時的将oracle中的資料寫入Kafka中。

從理論到工程實踐——使用者畫像入門寶典

對生産系統影響小:實時讀取交易日志,以低資源占用實作大交易量資料實時複制

以交易為機關複制,保證交易一緻性:隻同步已送出的資料

高性能

  • 智能的交易重組和操作合并
  • 使用資料庫本地接口通路
  • 并行處理體系

binlog

MySQL 的二進制日志 binlog 可以說是 MySQL 最重要的日志,它記錄了所有的

DDL

DML

語句(除了資料查詢語句select、show等),以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進制日志是事務安全型的。binlog 的主要目的是複制和恢複。

從理論到工程實踐——使用者畫像入門寶典

通過這些手段,可以将資料同步到kafka也就是我們的實時系統中來。

Flink接入Kafka資料

Apache Kafka Connector可以友善對kafka資料的接入。

依賴

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.11</artifactId>  <version>1.9.0</version></dependency>
           
建構FlinkKafkaConsumer

必須有的:

1.topic名稱

2.用于反序列化Kafka資料的DeserializationSchema / KafkaDeserializationSchema

3.配置參數:“bootstrap.servers” “group.id” (kafka0.8還需要 “zookeeper.connect”)

val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")// only required for Kafka 0.8properties.setProperty("zookeeper.connect", "localhost:2181")properties.setProperty("group.id", "test")stream = env    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))    .print()
           
時間戳和水印

在許多情況下,記錄的時間戳(顯式或隐式)嵌入記錄本身。另外,使用者可能想要周期性地或以不規則的方式發出水印。

我們可以定義好Timestamp Extractors / Watermark Emitters,通過以下方式将其傳遞給消費者

val env = StreamExecutionEnvironment.getExecutionEnvironment()val myConsumer = new FlinkKafkaConsumer[String](...)myConsumer.setStartFromEarliest()      // start from the earliest record possiblemyConsumer.setStartFromLatest()        // start from the latest recordmyConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)myConsumer.setStartFromGroupOffsets()  // the default behaviour//指定位置//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)val stream = env.addSource(myConsumer)
           
檢查點

啟用Flink的檢查點後,Flink Kafka Consumer将使用主題中的記錄,并以一緻的方式定期檢查其所有Kafka偏移以及其他操作的狀态。如果作業失敗,Flink會将流式程式恢複到最新檢查點的狀态,并從存儲在檢查點中的偏移量開始重新使用Kafka的記錄。

如果禁用了檢查點,則Flink Kafka Consumer依賴于内部使用的Kafka用戶端的自動定期偏移送出功能。

如果啟用了檢查點,則Flink Kafka Consumer将在檢查點完成時送出存儲在檢查點狀态中的偏移量。

val env = StreamExecutionEnvironment.getExecutionEnvironment()env.enableCheckpointing(5000) // checkpoint every 5000 msecs
           

Flink消費Kafka完整代碼:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumer {    
public static void main(String[] args) throws Exception {        
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties properties = new Properties();        
properties.setProperty("bootstrap.servers", "localhost:9092");        properties.setProperty("group.id", "test");        //建構FlinkKafkaConsumer        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);        //指定偏移量        
myConsumer.setStartFromEarliest();        
DataStream<String> stream = env                
.addSource(myConsumer);        
env.enableCheckpointing(5000);        
stream.print();        
env.execute("Flink Streaming Java API Skeleton");    
}
           

這樣資料已經實時的接入我們系統中,可以在Flink中對資料進行處理了,那麼如何對标簽進行計算呢?标簽的計算過程極大的依賴于資料倉庫的能力,是以擁有了一個好的資料倉庫,标簽也就很容易計算出來了。

資料倉庫基礎知識

資料倉庫是指一個面向主題的、內建的、穩定的、随時間變化的資料的集合,以用于支援管理決策的過程。

(1)面向主題 業務資料庫中的資料主要針對事物處理任務,各個業務系統之間是各自分離的。而資料倉庫中的資料是按照一定的主題進行組織的

(2)內建 資料倉庫中存儲的資料是從業務資料庫中提取出來的,但并不是原有資料的簡單複制,而是經過了抽取、清理、轉換(ETL)等工作。業務資料庫記錄的是每一項業務處理的流水賬,這些資料不适合于分析處理,進入資料倉庫之前需要經過系列計算,同時抛棄一些分析處理不需要的資料。

(3)穩定 操作型資料庫系統中一般隻存儲短期資料,是以其資料是不穩定的,記錄的是系統中資料變化的瞬态。資料倉庫中的資料大多表示過去某一時刻的資料,主要用于查詢、分析,不像業務系統中資料庫一樣經常修改。一般資料倉庫建構完成,主要用于通路

從理論到工程實踐——使用者畫像入門寶典

OLTP 聯機事務處理 OLTP是傳統關系型資料庫的主要應用,主要用于日常事物、交易系統的處理 1、資料量存儲相對來說不大 2、實時性要求高,需要支援事物 3、資料一般存儲在關系型資料庫(oracle或mysql中)

OLAP 聯機分析處理 OLAP是資料倉庫的主要應用,支援複雜的分析查詢,側重決策支援 1、實時性要求不是很高,ETL一般都是T+1的資料;2、資料量很大;3、主要用于分析決策;

星形模型是最常用的資料倉庫設計結構。由一個事實表和一組維表組成,每個維表都有一個維主鍵。該模式核心是事實表,通過事實表将各種不同的維表連接配接起來,各個維表中的對象通過事實表與另一個維表中的對象相關聯,這樣建立各個維表對象之間的聯系 維表:用于存放次元資訊,包括維的屬性和層次結構;事實表:是用來記錄業務事實并做相應名額統計的表。同維表相比,事實表記錄數量很多。

從理論到工程實踐——使用者畫像入門寶典

雪花模型是對星形模型的擴充,每一個維表都可以向外連接配接多個詳細類别表。除了具有星形模式中維表的功能外,還連接配接對事實表進行較長的描述的次元,可進一步細化檢視資料的粒度 例如:地點維表包含屬性集{location_id,街道,城市,省,國家}。這種模式通過地點次元表的city_id與城市次元表的city_id相關聯,得到如{101,“解放大道10号”,“武漢”,“湖北省”,“中國”}、{255,“解放大道85号”,“武漢”,“湖北省”,“中國”}這樣的記錄。星形模型是最基本的模式,一個星形模型有多個維表,隻存在一個事實表。在星形模式的基礎上,用多個表來描述一個複雜維,構造維表的多層結構,就得到雪花模型。

從理論到工程實踐——使用者畫像入門寶典

清晰資料結構:每一個資料分層都有它的作用域,這樣我們在使用表的時候能更友善地定位和了解 髒資料清洗:屏蔽原始資料的異常 屏蔽業務影響:不必改一次業務就需要重新接入資料 資料血緣追蹤:簡單來講可以這樣了解,我們最終給業務呈現的是能直接使用的一張業務表,但是它的來源有很多,如果有一張來源表出問題了,我們希望能夠快速準确地定位到問題,并清楚它的危害範圍。減少重複開發:規範資料分層,開發一些通用的中間層資料,能夠減少極大的重複計算。把複雜問題簡單化。将一個複雜的任務分解成多個步驟來完成,每一層隻處理單一的步驟,比較簡單和容易了解。便于維護資料的準确性,當資料出現問題之後,可以不用修複所有的資料,隻需要從有問題的步驟開始修複。

從理論到工程實踐——使用者畫像入門寶典

資料倉庫的資料直接對接OLAP或日志類資料, 使用者畫像隻是站在使用者的角度,對資料倉庫資料做進一步的模組化加工。是以每天畫像标簽相關資料的排程依賴上遊資料倉庫相關任務執行完成。

在了解了資料倉庫以後,我們就可以進行标簽的計算了。在開發好标簽的邏輯以後,将資料寫入hive和druid中,完成實時與離線的标簽開發工作。

Flink 與Hive和 Druid內建

Flink+Hive

Flink從1.9開始支援內建Hive,在Flink1.10版本,标志着對 Blink的整合宣告完成,随着對 Hive 的生産級别內建,Hive作為資料倉庫系統的絕對核心,承擔着絕大多數的離線資料ETL計算和資料管理,期待Flink未來對Hive的完美支援。

而 HiveCatalog 會與一個 Hive Metastore 的執行個體連接配接,提供中繼資料持久化的能力。要使用 Flink 與 Hive 進行互動,使用者需要配置一個 HiveCatalog,并通過 HiveCatalog 通路 Hive 中的中繼資料。

添加依賴

要與Hive內建,需要在Flink的lib目錄下添加額外的依賴jar包,以使內建在Table API程式或SQL Client中的SQL中起作用。或者,可以将這些依賴項放在檔案夾中,并分别使用Table API程式或SQL Client 的

-C

-l

選項将它們添加到classpath中。本文使用第一種方式,即将jar包直接複制到$FLINK_HOME/lib目錄下。本文使用的Hive版本為2.3.4(對于不同版本的Hive,可以參照官網選擇不同的jar包依賴),總共需要3個jar包,如下:

  • flink-connector-hive_2.11-1.10.0.jar
  • flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
  • hive-exec-2.3.4.jar

添加Maven依賴

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>
           

執行個體代碼

package com.flink.sql.hiveintegration;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;


public class FlinkHiveIntegration {

    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 使用BlinkPlanner
                .inBatchMode() // Batch模式,預設為StreamingMode
                .build();

        //使用StreamingMode
       /* EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 使用BlinkPlanner
                .inStreamingMode() // StreamingMode
                .build();*/

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "myhive";      // Catalog名稱,定義一個唯一的名稱表示
        String defaultDatabase = "qfbap_ods";  // 預設資料庫名稱
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";  // hive-site.xml路徑
        String version = "2.3.4";       // Hive版本号

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);

        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");
        // 建立資料庫,目前不支援建立hive表
        String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123";

        tableEnv.sqlUpdate(createDbSql);

    }
}
           

Flink+Druid

可以将Flink分析好的資料寫回kafka,然後在druid中接入資料,也可以将資料直接寫入druid,以下為示例代碼:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.flinkdruid</groupId>
    <artifactId>FlinkDruid</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>FlinkDruid</name>
    <description>Flink Druid Connection</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
           

示例代碼

@SpringBootApplication
public class FlinkDruidApp {


    private static String url = "http://localhost:8200/v1/post/wikipedia";

    private static RestTemplate template;

    private static HttpHeaders headers;


    FlinkDruidApp() {

        template = new RestTemplate();
        headers = new HttpHeaders();
        headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
        headers.setContentType(MediaType.APPLICATION_JSON);

    }

    public static void main(String[] args) throws Exception {

        SpringApplication.run(FlinkDruidApp.class, args);

        // Creating Flink Execution Environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //Define data source
        DataSet<String> data = env.readTextFile("/wikiticker-2015-09-12-sampled.json");

        // Trasformation on the data
        data.map(x -> {

            return httpsPost(x).toString();
        }).print();


    }

    // http post method to post data in Druid
    private static ResponseEntity<String> httpsPost(String json) {

        HttpEntity<String> requestEntity =
                new HttpEntity<>(json, headers);
        ResponseEntity<String> response =
                template.exchange("http://localhost:8200/v1/post/wikipedia", HttpMethod.POST, requestEntity,
                        String.class);

        return response;
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}
           

标簽的開發工作繁瑣,需要不斷的開發并且優化,但是如何将做好的标簽提供出去産生真正的價值呢?下一章,我們将介紹使用者畫像産品化。

六、使用者畫像産品化

在開發好使用者标簽以後,如何将标簽應用到實際其實是一個很重要的問題。隻有做好産品的設計才能讓标簽發揮真正的價值,本文将介紹使用者畫像的産品化過程。

1、标簽展示

從理論到工程實踐——使用者畫像入門寶典

首先是标簽展示功能,這個主要供業務人員和研發人員使用,是為了更直覺的看見整個的使用者标簽體系。

不同的标簽體系會有不同的層級,那麼這個頁面的設計就需要我們展示成樹狀的結構,友善以後的擴充。

在最後一個層級,比如自然性别,可以設計一個統計頁面,在進入頁面後,可以展示相應的資料統計情況,

可以更直覺看見标簽中值得比例,也可以為業務提供好的建議,另外可以對标簽的具體描述進行展示,起到一個說明的作用,還可以展示标簽按天的波動情況,觀察标簽的變化情況。

從理論到工程實踐——使用者畫像入門寶典

這一部分的資料來源呢?之前也提到過,這些标簽的中繼資料資訊都存在mysql中,友善我們查詢。

是以樹狀圖和标簽描述資訊需要去mysql中擷取,而比例等圖表資料則是從Hbase,Hive中查詢擷取的,當然也有直接通過ES擷取的。但是每天的标簽曆史波動情況,還是要通過每天跑完标簽後存在mysql中作為曆史記錄進行展示。

2、标簽查詢

這一功能可以提供給研發人員和業務人員使用。

标簽查詢功能其實就是對使用者進行全局畫像的過程,對于一個使用者的全量标簽資訊,我們是需要對其進行展示的。

從理論到工程實踐——使用者畫像入門寶典

輸入使用者id後,可以檢視該使用者的屬性資訊、行為資訊、風控屬性等資訊。從多方位了解一個具體的使用者特征。

這些已經是标簽的具體資訊了,由于是對單一id的查找,從hive中擷取會造成查詢速度的問題,是以我們更建議從Hbase或者ES中查詢擷取,這樣查詢效率和實時性都能獲得極大的提升。

3、标簽管理

這一功能是提供給研發人員使用的。

對于标簽,不能每一次新增一個标簽都進行非常大改動,這樣是非常耗費人力的,是以必須要有可以對标簽進行管理的功能。

這裡定義了标簽的基本資訊,開發方式,開發人員等等,在完成标簽的開發以後,直接在此頁面對标簽進行錄入,就可以完成标簽的上線工作,讓業務人員可以對标簽進行使用。

從理論到工程實踐——使用者畫像入門寶典

新增和編輯标簽的頁面,可以提供下拉框或者輸入框提供資訊錄入的功能。

從理論到工程實踐——使用者畫像入門寶典

之前已經提到過,這些标簽的中繼資料資訊都儲存在了Mysql中,隻要完成對其的新增和修改就可以了。

4、使用者分群

作為使用者畫像最核心的功能,使用者分群功能。是使用者畫像與業務系統建立聯系的橋梁,也是使用者畫像的價值所在。

這項功能主要供業務人員使用。

此功能允許使用者自定義的圈定一部分人員,圈定的規則就是對于标簽的條件限制。

在圈定好人群以後,可以對這部分人群提供與業務系統的外呼系統,客服系統,廣告系統,Push系統的互動,達到真正的精細化營運的目的。

從理論到工程實踐——使用者畫像入門寶典

對于标簽規則的判斷,需要将記錄好的規則存儲于Mysql中,在進行人群計算時又需要将規則解析成可計算的邏輯。不管是解析成Sql或者其他的查詢語言都難度巨大,這對于研發是一個非常大的挑戰。

從理論到工程實踐——使用者畫像入門寶典

在此功能中,還可以增加人群對比的功能,對不同人群的不同标簽進行圈定,對比。這對于查詢性能也是一個巨大的考驗。

從理論到工程實踐——使用者畫像入門寶典

但是,使用者分群功能作為使用者畫像的核心是我們必須要實作的。對于技術架構,Hbase更擅長與KV形式的查詢,對于多元度查詢性能較差,是以可以采取ES索引,在ES查詢出Hbase的Rowkey,再去查詢Hbase的方式。也有很多公司選擇整體遷移到ES中完成此項工作。

本文檔為使用者畫像入門寶典,介紹了使用者畫像的理論與系統搭建。也歡迎大資料、使用者畫像感興趣的朋友加入學習群進行讨論交流~

如二維碼過期請聯系公衆号大資料流動 作者 獨孤風,備注使用者畫像,申請入群~

大資料流動 專注于大資料實時計算,資料治理,資料可視化等技術分享與實踐。

請在背景回複關鍵字下載下傳相關資料。相關學習交流群已經成立,歡迎加入~