天天看點

大資料技術之_26_交通狀态預測項目_01

一 項目背景

該項目以車輛預測為基礎,學習業務解決的方法論。

學習完本項目後,可以解決如下問題或适用于如下業務場景:

  1、公路堵車預測

  2、地鐵人流量預測

  3、共享單車聚集點預測等等

擴充知識:

spark-shell --master yarn --conf spark.eventLog.dir=hdfs://hadoop/tmp/spark2 --jars /home/hadoop-SNAPSHOT.jar

智能判斷:
    軌迹:将某一輛車在所有監測點留下的蹤迹聚合連線,則為該車的軌迹。
    跟車:判斷某一輛車是否被跟蹤(此處“跟蹤”為中性詞彙)等。比如:婚車(判斷是否屬于一個車隊)
    碰撞:這裡不是撞車分析,而是在幾個電子圍欄内(比如,監測點1,監測點2),同一輛車,在某一個時間範圍内,檢測出該車出現在不同的監測點。電子圍欄:比如OA打卡。

技偵支援:
    布控:警方輸入布控的車輛資訊(車牌号,車型,顔色等等)
    實時報警:符合布控标準,則報警
    套牌分析:相同車牌号,車輛資訊不一緻
    落腳點:車輛在哪個區域停留時間長

統計分析:
    流量統計:哪幾個監測點的車流量比較高
    外來車輛統計等等

資料結構示例:
日期            關卡id       攝像id       車輛          發生時間          速度    公路id   區域id(次元=特征值向量)

2017-04-25      0001        09203       京W47147     2017-04-25 20:58:17     138     49      04
2017-04-25      0005        06975       京W47147     2017-04-25 20:12:39     50      10      06
2017-04-25      0001        02846       京W47147     2017-04-25 20:20:20     214     21      00
2017-04-25      0003        06044       京W47147     2017-04-25 20:15:58     78      47      01
2017-04-25      0000        01599       京W47147     2017-04-25 20:40:58     59      32      01
2017-04-25      0002        09260       京M91266     2017-04-25 09:09:57     105     15      00

一個 Event(事件)至少包含一行資料。
因為 Kafka 是基于事件的。

為什麼一個 Event(事件)包含多行資料?
答:我們将多行資料封裝(打包)成一個 Event,發送給 Kafka,這樣的好處是減少網絡IO。
如何打包呢?
答:使用 json 格式,如下:

{
    "monitor_arr":
    [
        {
            "time": "2017-04-25",
            monitor_id:"0001",
            ...
        },
        {
            "time": "2017-04-25",
            monitor_id:"0005",
            ...
        },
        ......
    ]
}

有監督學習:有标簽(label)的訓練 --> 模組化的過程 --> 求通項公式的過程 --> 求拟合函數的過程 --> 求參數的過程 --> 連續資料,常用算法:回歸算法 --> 線性回歸、邏輯斯特回歸
無監督學習:沒有标簽(label)的訓練 --> 離散資料 --> 比如歸類問題,常用算法:聚類算法 --> 支援向量機、随機森林(起源于決策樹,萬能藥)、K-means 算法
半監督學習:一部分有标簽,一部分無标簽。

老羅的錘子手機不賺錢,為了交個朋友--情懷,賣配件、T恤等賺錢。

平民化的最接近科學/科研 --> 計算機           

複制

次元認知:

大資料技術之_26_交通狀态預測項目_01

二 項目架構與環境

2.1 項目架構

大資料技術之_26_交通狀态預測項目_01

2.2 項目環境

以下環境為本次項目使用的情景,并非隻有在此環境下才可以完成整體業務需求。請靈活變動。(本例已在以下環境中完成測試)

大資料技術之_26_交通狀态預測項目_01

三 項目實作

我們建立 java 項目 tf,之後的每一個項目子產品都建立于該項目之下。然後删除掉 src 目錄。

3.1 資料模拟

  請確定 zookeeper 和 kafka 的正确配置。

  如果之前安裝的 scala 版本不是 2.11.8 請替換之:

$ tar -zxf /opt/software/scala-2.11.8.tgz -C /opt/module/

使用 root 使用者,配置環境變量:
[atguigu@hadoop102 module]$ sudo vim /etc/profile

#SCALA_HOME
export SCALA_HOME=/opt/module/scala/scala-2.11.8
export PATH=$PATH:$SCALA_HOME/bin

使環境變量生效:
[atguigu@hadoop102 module]$ sudo source /etc/profile           

複制

  我們需要産生一些監測點的模拟車速資料,并将這些資料實時的發送給 kafka,儲存到 traffic 主題中,以供後續的 Spark 讀取資料并加工之後存放于 redis。

3.1.1 資料結構

大資料技術之_26_交通狀态預測項目_01

3.1.2 編寫代碼

思路:

  a) 建立子產品 maven 工程:tf_producer

  b) 配置 maven 依賴。

  c) 因為要把資料發送給 kafka,是以配置 kafka 屬性,儲存于某個配置檔案中。

  d) 編寫 kafka 加載屬性的工具類。

  e) 每隔 5 分鐘,切換一次模拟狀态,例如第一個五分鐘,車速都在 30km/h 以上,下一個五分鐘,車速都在 10km/h 以下,往複模拟公路一會堵車,一會不堵車的情況。

  f) 啟動 zookeeper 叢集和 kafka 叢集,并建立 kafka 主題,檢查主題存在性。

  g) 将資料發送至 kafka 并使用 kafka console-consumer 進行檢測。

1) 建立項目:tf_producer

2) maven 的 pom.xml 檔案配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>
    </dependencies>
</project>           

複制

3) kafka 屬性配置檔案:kafka.properties(生産者)

# 設定 kafka 的 brokerlist
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 生産者序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

acks=all
retries=0

# 設定消費者所屬的消費組
group.id=g_traffic1

# 設定是否自動确認 offset
enable.auto.commit=true

# 設定自動确認 offset 的時間間隔
auto.commit.interval.ms=30000

# 設定本次消費的主題
kafka.topics=traffic

# 設定 zookeeper 中 follower 和 leader 之間的關于 kafka 的資訊同步時間間隔
zookeeper.sync.time.ms=250
num.io.threads=12
batch.size=65536
buffer.memory=524288

# kafka 中消息儲存的時間(機關是小時),企業開發中是 7 天
log.retention.hours=2           

複制

4) 編寫 kafka 屬性加載工具類:PropertyUtil.scala

package com.atguigu.utils

import java.util.Properties

object PropertyUtil {
  val properties = new Properties()
  // 加載配置屬性
  try {
    val inputStream = ClassLoader.getSystemResourceAsStream("kafka.properties")
    properties.load(inputStream)
  } catch {
    case ex: Exception => println(ex)
  } finally {

  }

  // 定義通過鍵得到屬性值的方法
  def getProperty(key: String): String = properties.getProperty(key)
}           

複制

5) 開始模拟資料,每隔 5 分鐘切換一次模拟狀态,檔案:Producer.scala

package com.atguigu.producer

import java.text.DecimalFormat
import java.util.Calendar

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import com.atguigu.utils.PropertyUtil

import scala.util.Random
import java._

import com.alibaba.fastjson.JSON

/**
  * 模拟産生資料,同時把資料實時的發送到 kafka
  * 随機産生 監測點id 以及 速度
  * 序列化為 json
  * 發送給 kafka
  */
object Producer {

  def main(args: Array[String]): Unit = {
    // 讀取配置檔案資訊
    val properties = PropertyUtil.properties
    // 建立 kafka 生産者對象
    val producer = new KafkaProducer[String, String](properties)

    // 模拟産生實時資料,機關為:秒
    var startTime = Calendar.getInstance().getTimeInMillis() / 1000

    // 資料模拟,堵車狀态切換的周期機關為:秒
    val trafficCycle = 300

    val df = new DecimalFormat("0000")
    // 開始不停的實時産生資料
    while (true) {
      // 模拟産生監測點 id:1~20
      val randomMonitorId = df.format(Random.nextInt(20) + 1)
      // 模拟車速
      var randomSpeed = "000"

      // 得到本條資料産生時的目前時間,機關為:秒
      val currentTime = Calendar.getInstance().getTimeInMillis() / 1000
      // 每 5 分鐘切換一次公路狀态
      if (currentTime - startTime > trafficCycle) {
        randomSpeed = new DecimalFormat("000").format(Random.nextInt(16))
        if (currentTime - startTime > trafficCycle * 2) {
          startTime = currentTime
        }
      } else {
        randomSpeed = new DecimalFormat("000").format(Random.nextInt(31) + 30)
      }

      // 該 Map 集合用于存放生産出來的資料
      val jsonmMap = new util.HashMap[String, String]()
      jsonmMap.put("monitor_id", randomMonitorId)
      jsonmMap.put("speed", randomSpeed)

      // 因為 kafka 是基于事件的,在此,我們每一條産生的資料都序列化為一個 json 事件
      val event = JSON.toJSON(jsonmMap)

      // 發送事件到 kafka 叢集中
      producer.send(new ProducerRecord[String, String](PropertyUtil.getProperty("kafka.topics"), event.toString))

      Thread.sleep(500)

      // 測試
      // println("監測點id:" + randomMonitorId + "," + "車速:" + randomSpeed)
      println(event)
    }
  }
}           

複制

6) 啟動叢集中的其他相關節點(zookeeper,hadoop 等),啟動 kafka,并建立 kafka 主題,檢查主題存在性

[atguigu@hadoop102 ~]$ start-cluster.sh            

複制

Linux 叢集服務群起腳本

(1) 啟動腳本:start-cluster.sh

#!/bin/bash
echo "================        開始啟動所有節點服務      ==========="
echo "================        正在啟動 Zookeeper      ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh start'
done

echo "================        正在啟動 HDFS           ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/start-dfs.sh'

echo "================        正在啟動 YARN           ==========="
ssh atguigu@hadoop103 '/opt/module/hadoop-2.7.2/sbin/start-yarn.sh'

echo "================    hadoop102 節點正在啟動 JobHistoryServer   ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh start historyserver'           

複制

(2) 停止腳本:stop-cluster.sh

#!/bin/bash
echo "================        開始停止所有節點服務      ==========="
echo "================    hadoop102 節點正在停止 JobHistoryServer ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh stop historyserver'

echo "================        正在停止 YARN           ==========="
ssh atguigu@hadoop103 '/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh'

echo "================        正在停止 HDFS           ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh'

echo "================        正在停止 Zookeeper      ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop'
done           

複制

(3) 檢視程序腳本:util.sh

#!/bin/bash
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    echo "================      $i 的所有程序       ==========="
    ssh $i '/opt/module/jdk1.8.0_144/bin/jps'
done           

複制

尖叫提示

:腳本學會之後,如果後續再有新的節點需要添加到群起任務中,可以自行解決之。

尖叫提示

:啟動與停止注意腳本的執行順序,而且停止腳本的停止過程應該是啟動過程的倒序。

zookeeper 叢集群起腳本:

[atguigu@hadoop102 ~]$ zkstart.sh           

複制

(1) 啟動腳本:zkstart.sh

#!/bin/bash
echo "==========  正在啟動 zookeeper 叢集  =========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh start'
done           

複制

(2) 停止腳本:zkstop.sh

#!/bin/bash
echo "==========  正在停止 zookeeper 叢集  =========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop'
done           

複制

(3) 狀态腳本:zkstatus.sh

#!/bin/bash
echo "==========  正在檢視 zookeeper 叢集狀态  =========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh status'
done           

複制

kafka 叢集腳本:

[atguigu@hadoop102 ~]$ kafka-start.sh           

複制

(1) 啟動腳本:kafka-start.sh

#!/bin/bash
echo "================        正在啟動 Kafka 叢集       ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
done           

複制

(2) 停止腳本:kafka-stop.sh

#!/bin/bash
echo "================        正在停止 Kafka 叢集       ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/kafka/bin/kafka-server-stop.sh -daemon'
done           

複制

建立 kafka 主題:traffic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --replication-factor 1 --partitions 3 --topic traffic           

複制

删除 kafka 主題:traffic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181 --topic traffic           

複制

檢查 kafka 的 traffic 主題是否正常:

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --list --zookeeper hadoop102:2181           

複制

3.1.3 測試

将資料發送至 kafka 并使用 kafka console-consumer 進行檢測,持續運作若幹分鐘後,檢視資料是否穩定輸入輸出。

啟動 kafka 控制台消費者:

// kafka-console-consumer
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic trafffic           

複制

kafka 控制台消費者消費資料如下圖所示;

大資料技術之_26_交通狀态預測項目_01

3.2 資料采集

  我們将實時模拟出來的資料,放置于 redis 中。

3.2.1 編寫代碼

思路:

  a) 建立工程:tf_consumer

  b) 配置 maven 依賴并添加 scala 架構的支援。

  c) 配置 redis 并測試。

  d) 将剛才 kafka.properties 以及 PropertyUtil 拷貝過來,并進行相應的修改。

  e) 編寫 redis 操作工具類:RedisUtil

  f) 讀取 kafka 中的資料,實時儲存到 redis 中,并且按照分鐘和監測點聚合車速和車輛個數。

1) 建立工程:tf_consumer

2) 配置 maven 的 pom.xml 檔案以及 kafka.properties:

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
</project>           

複制

3) 修改 kafka.properties 配置檔案(消費者):

# 設定 kafka 的 brokerlist
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 消費者反序列化
key.deserializer=org.apache.kafka.common.serialization.StringDeSerializer
value.deserializer=org.apache.kafka.common.serialization.StrinDegSerializer

acks=all
retries=0

# Kafka 老版本中的中繼資料服務清單
metadata.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 設定消費者所屬的消費組
group.id=g_traffic1

# 設定是否自動确認 offset
enable.auto.commit=true

# 設定自動确認 offset 的時間間隔
auto.commit.interval.ms=30000

# 設定本次消費的主題
kafka.topics=traffic

# 設定 zookeeper 中 follower 和 leader 之間的關于 kafka 的資訊同步時間間隔
zookeeper.sync.time.ms=250
num.io.threads=12
batch.size=65536
buffer.memory=524288

# kafka 中消息儲存的時間(機關是小時),企業開發中是 7 天
log.retention.hours=2           

複制

3) 配置 Redis(單節點)環境并測試

// 通過 wget 下載下傳 Redis 的源碼
[atguigu@hadoop102 software]$ wget http://download.redis.io/releases/redis-4.0.2.tar.gz

// 将源代碼解壓到指定目錄 /opt/module 下
[atguigu@hadoop102 software]$ tar -zxf redis-4.0.2.tar.gz -C /opt/module

// 進入 Redis 源代碼目錄,編譯安裝(因為 redis 是用 C 語言寫的)
[atguigu@hadoop102 module]$ cd redis-4.0.2/

// 安裝 GCC
[atguigu@hadoop102 module]$ sudo yum install gcc

// 編譯源代碼
[atguigu@hadoop102 redis-4.0.2]$ make MALLOC=libc

如果報錯
zmalloc.h:50:31: error: jemalloc/jemalloc.h: No such file or directory
zmalloc.h:55:2: error: #error "Newer version of jemalloc required"
make[1]: *** [adlist.o] Error 1
make[1]: Leaving directory `/opt/module/redis-4.0.2/src'
make: *** [all] Error 2
解決辦法是:
make MALLOC=libc

注意:Redis 并沒有自己實作記憶體池,沒有在标準的系統記憶體配置設定器上再加上自己的東西。
redis-2.4 以上自帶 jemalloc,你不需要加任何參數,通過 zmalloc.c 源碼中我們可以看到,Redis 在編譯時,會先判斷是否使用 tcmalloc,如果是,會用 tcmalloc 對應的函數替換掉标準的 libc 中的函數實作。其次會判斷 jemalloc 是否使用,最後如果都沒有使用才會用标準的 libc 中的記憶體管理函數。是以用 tcmalloc 優化請謹慎使用,這兩個配置設定器碎片率相差不大,建議用自帶 jemalloc。

如果要安裝 tcmalloc 可以這樣:
make USE_TCMALLOC=yes

// 編譯安裝(注意:要使用 root 使用者權限)
[atguigu@hadoop102 redis-4.0.2]$ sudo make install

// 建立配置檔案,放入指定的目錄
[atguigu@hadoop102 redis-4.0.2]$ sudo cp /opt/module/redis-4.0.2/redis.conf /opt/module/redis-4.0.2/myredis

// 修改配置檔案中以下内容(注意 redis 新版的 4.x 與 老版本 3.x 上配置的細微差别)
[atguigu@hadoop102 redis-4.0.2]$ sudo vim /opt/module/redis-4.0.2/myredis/redis.conf

bind 0.0.0.0                                            #69行       #綁定主機 IP,預設值為127.0.0.1,我們是跨機器運作,是以需要更改,表示任意機器叢集均可通路,實際開發是中不建議這樣改
daemonize yes                                           #136行      #是否以背景 daemon 方式運作,預設不是背景運作
pidfile /var/run/redis/redis_6379.pid                   #158行      #redis 的 PID 檔案路徑(可選)
logfile "/opt/module/redis-4.0.2/myredis/redis.log"     #171行      #定義 log 檔案位置,模式 log 資訊定向到 stdout,輸出到 /dev/null(可選)
dir "/opt/module/redis-4.0.2/myredis"                   #263行      #本地資料庫存放路徑,預設為./(可選)

// 編譯安裝預設存在在 /usr/local/bin 目錄下,如下
[atguigu@hadoop102 redis-4.0.2]$ cd /usr/local/bin/
[atguigu@hadoop102 bin]$ ll
總用量 9572
-rw-r--r-- 1 root root      83 5月   8 01:27 dump6379.rdb
-rw-r--r-- 1 root root      83 5月   8 01:27 dump6380.rdb
-rw-r--r-- 1 root root      83 5月   8 01:27 dump6381.rdb
lrwxrwxrwx 1 root root       6 4月  28 17:17 nc -> netcat
-rwxr-xr-x 1 root root  103479 4月  28 17:17 netcat
-rwxr-xr-x 1 root root  290454 5月  23 12:37 redis-benchmark
-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-check-aof
-rwxr-xr-x 1 root root   45443 5月   6 17:27 redis-check-dump
-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-check-rdb
-rwxr-xr-x 1 root root  419907 5月  23 12:37 redis-cli
lrwxrwxrwx 1 root root      12 5月  23 12:37 redis-sentinel -> redis-server
-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-server           

複制

在安裝完 Redis 之後,啟動 Redis

// 啟動 Redis 伺服器
[atguigu@hadoop102 redis-4.0.2]$ redis-server /opt/module/redis-4.0.2/myredis/redis.conf

// 連接配接 Redis 伺服器
[atguigu@hadoop102 redis-4.0.2]$ redis-cli -h 192.168.25.102 -p 6379
192.168.25.102:6379> set k1 123
OK
192.168.25.102:6379> get k1
"123"
192.168.25.102:6379> keys *
1) "k1"
2) "uid:2"
192.168.25.102:6379> lrange uid:2 0 -1      #檢視清單的某個範圍的資料
1) "150:5.0"
2) "144:3.0"
3) "110:4.0"
192.168.25.102:6379> lpush uid:1 3671:3.0 2968:1.0 2455:2.5     #存一組清單資料
192.168.25.102:6379> flushall       #清空所有資料
192.168.25.102:6379> select 1       #選擇資料庫

// 檢視 Redis 的啟動情況
[atguigu@hadoop102 redis-4.0.2]$ ps -ef | grep redis
atguigu    6033      1  0 13:08 ?        00:00:00 redis-server 0.0.0.0:6379                              
atguigu    6046   4336  0 13:12 pts/0    00:00:00 grep redis 

// 停止 Redis 伺服器
[atguigu@hadoop102 redis-4.0.2]$ redis-cli shutdown           

複制

4) 将剛才 kafka.properties 以及 PropertyUtil 拷貝過來,kafka.properties 需要進行相應的修改

5) 編寫 redis 操作工具類:RedisUtil.scala

package com.atguigu.utils

import redis.clients.jedis._

// 代碼寫在半生對象中,這些代碼會在類加載的時候,自動的進行初始化
object RedisUtil {
  // 配置 redis 基本連接配接參數
  val host = "192.168.25.102"
  val port = 6379
  val timeout = 30000

  val config = new JedisPoolConfig

  // 設定連接配接池允許最大的連接配接個數
  config.setMaxTotal(200)
  // 設定最大空閑連接配接數
  config.setMaxIdle(50)
  // 設定最小空閑連接配接數
  config.setMinIdle(8)

  // 設定連接配接時的最大等待的毫秒數
  config.setMaxWaitMillis(10000)
  // 設定在擷取連接配接時,檢查連接配接的有效性
  config.setTestOnBorrow(true)
  // 設定在釋放連接配接時,檢查連接配接的有效性
  config.setTestOnReturn(true)

  // 設定在連接配接空閑時,檢查連接配接的有效性
  config.setTestWhileIdle(true)

  // 設定兩次掃描之間的時間間隔毫秒數
  config.setTimeBetweenEvictionRunsMillis(30000)
  // 設定每次掃描的最多的對象數
  config.setNumTestsPerEvictionRun(10)
  // 設定逐出連接配接的最小時間間隔,預設是 1800000 毫秒 = 30 分鐘
  config.setMinEvictableIdleTimeMillis(60000)

  //  連接配接池
  lazy val pool = new JedisPool(config, host, port, timeout)

  // 釋放資源
  lazy val hook = new Thread{ // 鈎子函數:執行一些善後操作,正常退出
    override def run() = {
      pool.destroy()
    }
  }

  sys.addShutdownHook(hook.run())
}           

複制

6) 在 SparkConsumer.scala 中讀取 kafka 中的資料,實時儲存到 redis 中,并且按照分鐘和監測點聚合車速和車輛個數。用到 Spark Streaming 的時間視窗函數進行聚合。

package com.atguigu.consumer

import java.text.SimpleDateFormat
import java.util.Calendar

import com.alibaba.fastjson.{JSON, TypeReference}
import com.atguigu.utils.{PropertyUtil, RedisUtil}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 堵車預測:處理實時資料,消費資料到 redis
  */
object SparkConsumer {
  def main(args: Array[String]): Unit = {
    // 初始化 Spark
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficStreaming")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))

    // 設定檢查點目錄
    ssc.checkpoint("./ssc/checkpoint")

    // 配置 kafka 參數,使用的是 spark 為我們封裝的一套操作 kafka coonsumer 的工具包
    val kafkaParam = Map("metadata.broker.list" -> PropertyUtil.getProperty("metadata.broker.list"))

    // 配置 kafka 主題
    val topics = Set(PropertyUtil.getProperty("kafka.topics"))

    // 讀取 kafka 主題 中的每一個事件 event
    val kafkaLineDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)
      .map(_._2) // 由于我們 event 中的鍵是 null,是以我們需要把值映射出來

    // 解析 json 字元串
    val event = kafkaLineDStream.map(line => { // {"monitor_id":"0001","speed":"038"}
      // 使用 fastjson 來解析目前事件中封裝的資料資訊,由于該 json 字元串不支援 Scala Map,是以需要先将 json 字元串解析為 Java Map
      val lineJavaMap = JSON.parseObject(line, new TypeReference[java.util.Map[String, String]]() {})
      // 将 Java Map 轉換成 Scala Map
      import scala.collection.JavaConverters._
      val lineScalaMap: collection.mutable.Map[String, String] = mapAsScalaMapConverter(lineJavaMap).asScala
      println(lineScalaMap) // Map[String, String] = ("monitor_id" -> "0001", "speed" -> "038")
      lineScalaMap
    })

    // 将每一條資料根據 monitor_id 聚合,聚合每一條資料中的 “車輛速度” 疊加
    // 例如:聚合好的資料形式:(monitor_id, (speed, 1))  ("0001", (038, 1))
    // 最終結果舉例:("0001", (1365, 30))
    val sumOfSpeedAndCount = event
      .map(e => (e.get("monitor_id").get, e.get("speed").get)) // ("0001", "038")、("0001", "048")、("0002", "015")
      .mapValues(s => (s.toInt, 1)) // ("0001", (038, 1))、("0001", (048, 1))、("0002", (015, 1))
      .reduceByKeyAndWindow( // reduce 表示從左邊開始執行将得到的結果傳回給第一個參數
      (t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2),
      Seconds(60), // 滑動視窗大小 60 秒,誤差最大 59 秒,即上一分鐘的資料當成下一分鐘的資料來用了。
      Seconds(60)) // 滑動步長 60 秒,對我們實際模組化的影響忽略不計,因為:實際中,不可能1分鐘内就造成大量擁堵,或者堵車不可能1分鐘之内就緩解了!!!後面模組化的時候會進行線性濾波。

    // 定義 redis 資料庫中的資料庫索引 index
    val dbIndex = 1
    // 将采集到的資料,按照每分鐘放置于redis 中,将用于後邊的資料模組化
    sumOfSpeedAndCount.foreachRDD(rdd => {
      rdd
        .foreachPartition(partitionRecords => {
          partitionRecords
            .filter((tuple: (String, (Int, Int))) => tuple._2._1 > 0) // 過濾掉元組資料中的速度小于0的資料
            .foreach(pair => {
            // 開始取出這 60 秒的 windows 中所有的聚合資料進行封裝,準備存入 redis 資料庫
            val jedis = RedisUtil.pool.getResource

            val monitorId = pair._1
            val sumOfCarSpeed = pair._2._1
            val sumOfCarCount = pair._2._2

            // 模拟資料為實時流入
            // 兩種情況:
            // 1、資料生産時,會産生時間戳字段,流入到 kafka 的事件中
            // 2、資料消費時,資料消費的時間,就當做資料的生産時間(會有一些小小誤差),本業務選擇這種方式

            val dateSDF = new SimpleDateFormat("yyyyMMdd") // 用于 redis 中的 key
            val hourMinuteSDF = new SimpleDateFormat("HHmm") // 用于 redis 中的 fields

            val currentTime = Calendar.getInstance().getTime

            val dateTime = dateSDF.format(currentTime) // 20190528
            val hourMinuteTime = hourMinuteSDF.format(currentTime) // 1617

            // 選擇存入的資料庫
            jedis.select(dbIndex)
            jedis.hset(dateTime + "_" + monitorId, hourMinuteTime, sumOfCarSpeed + "_" + sumOfCarCount)

            println(dateTime + "_" + monitorId, hourMinuteTime, sumOfCarSpeed + "_" + sumOfCarCount)

            // RedisUtil.pool.returnResource(jedis) // 老的 API
            jedis.close() // 新的 API
          })
        })
    })

    // Spark 開始工作
    ssc.start()
    ssc.awaitTermination()
  }
}

// 複習 Scala 中 Map 的取值方式:

// 方式1-使用 map(key)
//   1、如果 key 存在,則傳回對應的值。
//   2、如果 key 不存在,則抛出異常 [java.util.NoSuchElementException]。
//   3、在 Java 中,如果 key 不存在則傳回 null。
// 方式2-使用 contains 方法檢查是否存在 key
//  使用 containts 先判斷再取值,可以防止異常,并加入相應的處理邏輯。
//   1、如果 key 存在,則傳回 true。
//   2、如果 key 不存在,則傳回 false。
// 方式3-使用 map.get(key).get 取值
//   1、如果 key 存在,則 map.get(key) 就會傳回 Some(值),然後 Some(值).get 就可以取出。
//   2、如果 key 不存在,則 map.get(key) 就會傳回 None。
// 方式4-使用 map.getOrElse(key, defaultvalue) 取值
//   底層是:def getOrElse[V1 >: V](key: K, default: => V1)
//   1、如果 key 存在,則傳回 key 對應的值。
//   2、如果 key 不存在,則傳回預設值。在 java 中底層有很多類似的操作。
// 如何選擇取值方式建議
//   如果我們确定 map 有這個 key,則應當使用 map(key),速度快。
//   如果我們不能确定 map 是否有 key,而且有不同的業務邏輯,使用 map.contains() 先判斷再加入邏輯。
//   如果隻是簡單的希望得到一個值,使用 map4.getOrElse("ip", "127.0.0.1")           

複制

3.2.2 測試

我們使用叢集的群起腳本:

開啟 zookeeper 叢集:

[atguigu@hadoop102 ~]$ zkstart.sh           

複制

開啟 kafka 叢集:

[atguigu@hadoop102 ~]$ kafka-start.sh           

複制

開啟 redis,在 redis 根目錄執行:

// 啟動 Redis 伺服器
[atguigu@hadoop102 redis-4.0.2]$ redis-server /opt/module/redis-4.0.2/myredis/redis.conf           

複制

運作資料生産

運作資料消費

檢視運作結果:

在 redis 根目錄中,舉個例子依次執行:

[atguigu@hadoop102 redis-4.0.2]$ redis-cli -h 192.168.25.102 -p 6379
192.168.25.102:6379> select 1
OK
192.168.25.102:6379[1]> keys *
 1) "20190528_0014"
 2) "20190528_0005"
 3) "20190528_0019"
 4) "20190528_0009"
 5) "20190528_0004"
 6) "20190528_0013"
 7) "20190528_0016"
 8) "20190528_0020"
 9) "20190528_0015"
10) "20190528_0010"
11) "20190528_0018"
12) "20190528_0008"
13) "20190528_0001"
14) "20190528_0003"
15) "20190528_0007"
16) "20190528_0012"
17) "20190528_0002"
18) "20190528_0011"
19) "20190528_0017"
20) "20190528_0006"
192.168.25.102:6379[1]> hgetall 20190528_0001
 1) "1646"
 2) "279_7"
 3) "1647"
 4) "239_6"
 5) "1648"
 6) "240_5"
 7) "1649"
 8) "318_7"
 9) "1650"
10) "184_6"
11) "1651"
12) "54_8"
13) "1652"
14) "81_10"
15) "1653"
16) "69_9"
17) "1654"
18) "69_9"
19) "1655"
20) "57_8"
21) "1656"
22) "262_6"
23) "1657"
24) "149_3"
25) "1659"
26) "168_4"
27) "1700"
28) "134_4"
29) "1701"
30) "65_8"
31) "1702"
32) "81_10"           

複制

注意

:不要直接複制,每次操作有些内容是有變動的。比如時間相關的,比如 IP 相關的。

小結:

堵車内容回顧:
一、資料生産
    目的:能夠讓我們清楚資料結構是什麼樣子的,實際開發中這部分不是我們做;實際開發中:已有資料結構,已有目标,要做的就是目前手中已有的資料如何實作目标
    資料結構:卡口id,車速(沒有包含資料生産時的時間戳)
    堵車狀态的轉換邏輯(if else),為的是生産的資料盡可能的貼近現實情況

二、資料消費
    kafka(進階 API,spark 提供的工具包) --> redis
    時間視窗的大小為 60 秒
    時間視窗的滑動步長為 60 秒
    資料存儲在 redis 中,使用的是資料類型是 Hash(即 Map 類型):KV 模式不變,但是 V 也是一個鍵值對
        key : 20190528_0001
        field : 1754
        value : 1365_30

天貓雙十一(使用 Storm + Flink 實作)
1、如果我們使用 SparkStreaming 實作,時間視窗的寬度不能設定太大,可能會出現記憶體溢出。
2、5秒内聚合的資料該如何處理呢?答:儲存到 redis 中(即落盤)。
3、那麼下一個時間視窗的新的資料該如何處理呢?答:先将 redis 中前一個 5 秒的資料讀出來,然後和這次的 5 秒資料進行相加後,再放回到 redis 中(即落盤)。小結:所有的流式架構都是這樣做的。

流式架構的根本的哲學意義是:僅僅進行中間邏輯,即是進行運算(計算)的,不負責資料存儲的。
如果在記憶體中想進行長期的資料累加,就相當于一個不斷微分再積分的過程,把時間微分到足夠細,細到不會導緻記憶體溢出為止,然後再微分的基礎上求和,再把所有的微分結果進行積分。

某一個小時間段内的資料量越小,則時間視窗的寬度就可以設定的越大,那麼資料展示的延遲就會變長,但是整體資料處理的效率就會變得越高。--> 不像流式處理了!           

複制

3.3 資料模組化

  在此我們選擇通過有監督學習中的手段建立可以預測下一時刻堵車狀态的模型。

拟牛頓圖解:

大資料技術之_26_交通狀态預測項目_01

線性濾波圖解:

大資料技術之_26_交通狀态預測項目_01

目标卡口與相關卡口關系:

大資料技術之_26_交通狀态預測項目_01

模型圖解:

大資料技術之_26_交通狀态預測項目_01

3.3.1 編寫代碼

思路:

  a) 确定要對哪個監測點進行模組化,我們稱之為目标監測點。

  b) 找到目标監測點的其他相關監測點(比如相關監測點與目标監測點屬于一條公路的)。

  c) 從 redis 中通路得到以上所有監測點若幹小時内的曆史資料資訊(一部分作為訓練資料,一部分作為測試資料)。

  d) 提取組裝特征向量與目标向量,訓練參數集,訓練模型。

  e) 測試模型吻合度,将符合吻合度的模型儲存到 HDFS 中,同時将模型的儲存路徑放置于 redis 中。

1) 建立 module:tf_modeling

2) 編寫 maven 的 pom.xml 檔案,添加 scala 架構支援

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_modeling</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
</project>           

複制

3) 建立 Train.scala 實作上述思路:

package com.atguigu.train

import java.io.{File, PrintWriter}
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.atguigu.utils.RedisUtil
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * 堵車預測:模組化,不同的卡口不同的模型(函數)
  */
object Train {
  def main(args: Array[String]): Unit = {
    // 寫入檔案的輸出流,将本次評估結果儲存到下面這個檔案中
    val writer = new PrintWriter(new File("model_train.txt"))

    // 初始化 Spark
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficTrainModel")
    val sc = new SparkContext(sparkConf)

    // 定義 redis 的資料庫相關參數
    val dbIndex = 1
    // 擷取 redis 連接配接
    val jedis = RedisUtil.pool.getResource
    jedis.select(dbIndex)

    // 設定 目标監測點:你要對哪幾個監測點進行模組化(本例中對 2 個檢測點進行模組化)
    val targetMonitorIDs = List("0005", "0015")
    // 取出 目标監測點的相關監測點:算法工程師告訴我們的(本例中我們随意寫幾個)
    val relationMonitors = Map[String, Array[String]](
      "0005" -> Array("0003", "0004", "0005", "0006", "0007"),
      "0015" -> Array("0013", "0014", "0015", "0016", "0017")
    )

    // 通路 redis 取出 目标監測點的相關監測點 的資料

    // 周遊 目标監測點的相關監測點 的 Map 集合
    targetMonitorIDs.map(targetMonitorID => { // 這個 map 執行 2 次
      // 初始化時間
      // 擷取目前時間
      val currentDate = Calendar.getInstance().getTime

      // 格式化 目前時間 為 年月日 對象
      val dateSDF = new SimpleDateFormat("yyyyMMdd")
      // 格式化 目前時間 為 小時分鐘數 對象
      val hourMinuteSDF = new SimpleDateFormat("HHmm")

      // 格式化目前時間
      val dateSDFString = dateSDF.format(currentDate) // 20190528

      // 擷取 目前目标監測點的相關監測點
      val relationMonitorArray = relationMonitors(targetMonitorID)
      // 根據 目前目标監測點的相關監測點,取出目前時間的所有相關監測點的平均車速
      val relationMonitorInfo = relationMonitorArray.map(relationMonitorID => { // 這個 map 執行 5 次
        (relationMonitorID, jedis.hgetAll(dateSDFString + "_" + relationMonitorID))
        // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})
      })

      // 建立 3 個數組:因為要使用 拟牛頓法(LBFGS)進行模組化,該方法需要
      // 第一個數組放 特征因子資料集,
      // 第二個數組放 label 标簽向量(特征因子對應的結果資料集),
      // 第三個數組放 前兩者之間的關聯(即真正的特征向量)
      val dataX = ArrayBuffer[Double]() // 實際的每一分鐘的平均車速
      val dataY = ArrayBuffer[Double]() // 第 4 分鐘的平均車速

      // 用于存放 特征因子資料集 和 特征因子對應的結果資料集 的映射關系
      val dataTrain = ArrayBuffer[LabeledPoint]()

      // 确定使用多少時間内的資料進行模組化(本例中取 1 小時)
      val hours = 1

      // 将時間回退到目前時間的 1 小時之前,時間機關:分鐘
      // 周遊 目标監測點的資料(外循環)
      for (i <- Range(60 * hours, 2, -1)) { // 本例中是 60 到 2(不包括2),步長是 -1,即 60, 59, 58, ..., 5, 4,
        dataX.clear()
        dataY.clear()

        // 周遊 目标監測點的所有相關監測點 的資料(内循環)
        for (index <- 0 to 2) {
          // 目前for循環 的時間 = 目前時間的毫秒數 - 1 個小時的毫秒數 + 0分鐘的毫秒數,1分鐘的毫秒數,2分鐘的毫秒數  (第3分鐘作為監督學習的結果向量--label 向量)
          val oneMoment = currentDate.getTime - 60 * i * 1000 + 60 * index * 1000
          // 擷取 目前for循環 的時間的小時分鐘數
          val oneHM = hourMinuteSDF.format(new Date(oneMoment))

          // 擷取目前小時分鐘數的資料
          for ((k, v) <- relationMonitorInfo) { // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})

            // hours 個小時前的後 3 分鐘的資料,組裝到 dataX 中
            if (v.containsKey(oneHM)) { // 判斷本次時刻的資料是否存在,如果存在,則取值,否則,則取 -1(表示資料缺失)
              val speedAndCarCount = v.get(oneHM).split("_")
              val valueX = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat // 得到目前這一分鐘的平均車速
              dataX += valueX
            } else {
              dataX += -59.0F
            }

            // 如果 index == 2,說明已經得到 hours 個小時前的後 3 分鐘的資料,并組裝到了 dataX 中;如果是目标卡口,則說明下一分鐘資料是 label 向量的資料,ze存放 dataY 中
            if (index == 2 && targetMonitorID == k) {
              val nextMoment = oneMoment + 60 * 1000
              val nextHM = hourMinuteSDF.format(new Date(nextMoment))
              if (v.containsKey(nextHM)) { // 判斷本次時刻的資料是否存在,如果存在,則取值,否則,則不管它(有預設值 0)
                val speedAndCarCount = v.get(nextHM).split("_")
                val valueY = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat // 得到第 4 分鐘的平均車速
                dataY += valueY
              }
            }

          }
        }

        // 準備訓練模型
        // 先将 dataX 和 dataY 映射到一個 LabeledPoint 對象中
        if (dataY.toArray.length == 1) { // 說明結果集中有資料了
          val label = dataY.toArray.head
          val record = LabeledPoint(
            // 因為使用的是 拟牛頓法(LBFGS) 進行模組化,該方法需要 特征結果 有幾種情況(不能是無窮種情況)
            // label 範圍為 0~6(7個類别),越大則道路越通暢
            if (label / 10 < 6) (label / 10).toInt else 6, Vectors.dense(dataX.toArray)
          )
          dataTrain += record
        }
      }

      // 将特征資料集寫入到檔案中友善檢視,至此,我們的特征資料集已經封裝完畢
      dataTrain.foreach(record => {
        println(record)
        writer.write(record.toString() + "\r\n")
      })

      // 将特征資料集轉為 rdd 資料集
      val rddData = sc.parallelize(dataTrain)
      // 随機封裝訓練集和測試集
      val randomSplits = rddData.randomSplit(Array(0.6, 0.4), 11L)
      val trainData = randomSplits(0)
      val testData = randomSplits(1)

      if (!rddData.isEmpty()) {
        // 使用訓練資料集進行訓練模型
        val model = new LogisticRegressionWithLBFGS().setNumClasses(7).run(trainData)

        // 使用測試資料集測試訓練好的模型
        val predictAndLabel = testData.map {
          case LabeledPoint(label, feature) =>
            val predict = model.predict(feature)
            (predict, label)
        }

        // 得到目前 目标監測點 的評估值
        val metrics = new MulticlassMetrics(predictAndLabel)
        val accuracy = metrics.accuracy
        println("評估值:" + accuracy)
        writer.write(accuracy.toString + "\r\n")

        // 設定評估門檻值,評估值範圍為[0.0, 1.0],越大 model 越優秀,我們儲存評估值大于 0 的評估模型
        if (accuracy > 0.6) {
          // 将模型儲存到 hdfs 中,并将模型路徑儲存到 redis 中
          val hdfsPath = "hdfs://hadoop102:9000/traffic/model/" + targetMonitorID + "_" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date
          (currentDate.getTime))
          model.save(sc, hdfsPath)

          jedis.hset("model", targetMonitorID, hdfsPath)
        }
      }
    })

    // 釋放 redis 連接配接
    // RedisUtil.pool.returnResource(jedis) // 老的 API
    jedis.close() // 新的 API
    writer.close()
  }
}           

複制

3.3.2 測試

  運作資料模拟與資料采集,等待一會之後,開始進行預測,檢視 http://hadoop102:50070 中是否産生對應的模型樣本。同時檢視 redis 中是否有儲存訓練好的模型存放路徑。

3.4 資料預測

3.4.1 編寫代碼

思路:

  a) 使用者傳入想要進行預測的時間節點,讀取該時間節點之前 3 分鐘,2 分鐘和 1 分鐘的資料。

  b) 此時應該已經得到了曆史資料集,通過該曆史資料集預測傳入時間點的車流狀态。

  尖叫提示:為了友善觀察測試,建議傳一個曆史時間點,這樣可以很直覺的看到預測結果是否符合期望值。

1) 建立 module:tf_prediction

2) 配置 maven 的 pom.xml 檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_prediction</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
</project>           

複制

3) 建立 Prediction.scala 檔案,實作上述思路

package com.atguigu.predict

import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.utils.RedisUtil
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * 堵車預測:根據訓練出來的模型進行堵車預測
  */
object Prediction {
  def main(args: Array[String]): Unit = {
    // 初始化 Spark
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficPrediction")
    val sc = new SparkContext(sparkConf)

    // 時間設定:為了拼湊出 redis 中的 key 和 field 的字段

    // 格式化 時間 為 年月日 對象
    val dateSDF = new SimpleDateFormat("yyyyMMdd")
    // 格式化 時間 為 小時分鐘數 對象
    val hourMinuteSDF = new SimpleDateFormat("HHmm")

    // 2019-05-29 13:00
    val userSDF = new SimpleDateFormat("yyyy-MM-dd HH:mm")

    // 定義使用者傳入的日期:想要預測是否堵車的日期
    val inputDateString = "2019-05-29 10:29"
    val inputDate = userSDF.parse(inputDateString)

    // 得到 redis 中的 key
    val dateSDFString = dateSDF.format(inputDate) // 20180529

    val dbIndex = 1
    val jedis = RedisUtil.pool.getResource
    jedis.select(dbIndex)

    // 想要預測的監測點
    // 設定 目标監測點:你要對哪幾個監測點進行模組化(本例中對 2 個檢測點進行模組化)
    val targetMonitorIDs = List("0005", "0015")
    // 取出 目标監測點的相關監測點:算法工程師告訴我們的(本例中我們随意寫幾個)
    val relationMonitors = Map[String, Array[String]](
      "0005" -> Array("0003", "0004", "0005", "0006", "0007"),
      "0015" -> Array("0013", "0014", "0015", "0016", "0017")
    )

    // 周遊 目标監測點的相關監測點 的 Map 集合
    targetMonitorIDs.map(targetMonitorID => { // 這個 map 執行 2 次
      // 擷取 目前目标監測點的相關監測點
      val relationMonitorArray = relationMonitors(targetMonitorID)
      // 根據 目前目标監測點的相關監測點,取出目前時間的所有相關監測點的平均車速
      val relationMonitorInfo = relationMonitorArray.map(relationMonitorID => { // 這個 map 執行 5 次
        (relationMonitorID, jedis.hgetAll(dateSDFString + "_" + relationMonitorID))
        // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})
      })

      // 裝載目标時間點之前的 3 分鐘的曆史資料
      val dataX = ArrayBuffer[Double]() // 實際的每一分鐘的平均車速

      // 組裝資料
      for (index <- Range(3, 0, -1)) {
        val oneMoment = inputDate.getTime - 60 * index * 1000
        val oneHM = hourMinuteSDF.format(new Date(oneMoment)) // 1257

        for ((k, v) <- relationMonitorInfo) {
          if (v.containsKey(oneHM)) {
            val speedAndCarCount = v.get(oneHM).split("_")
            val valueX = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat
            dataX += valueX
          } else {
            dataX += -59.0F
          }
        }
      }

      // 加載模型
      val modelPath = jedis.hget("model", targetMonitorID)
      val model = LogisticRegressionModel.load(sc, modelPath)

      // 進行預測
      val predict = model.predict(Vectors.dense(dataX.toArray))

      // 列印展示
      println(targetMonitorID + ",堵車評估值:" + predict + ",是否通暢:" + (if (predict > 3) "通暢" else "擁堵"))

      // 結果儲存
      jedis.hset(inputDateString, targetMonitorID, predict.toString)
    })

    // 釋放 redis 連接配接
    // RedisUtil.pool.returnResource(jedis) // 老的 API
    jedis.close() // 新的 API
  }
}           

複制

3.4.2 測試

  預測任務執行完畢後,進入redis,通過檢視對應監測點,對應傳入時間節點的具體車速值,來驗證預測結果是否正确。

四 項目總結

  與該項目類似的需求還有很多很多,涵蓋了生活各個方面。不同的業務,不同的邏輯,不同的思路,不同的數學模型,需要具體情況具體分析。有類似的、但不完全一樣的需求,就需要多思考,靈活處理了。