天天看點

pyspark中ip地理位置統計案例代碼實作

ip地理位置統計案例代碼實作

案例分析:

一、 ip地理位置統計案例思路

  1. 加載城市ip段資訊,擷取ip起始數字和結束數字,經度,緯度
  2. 加載日志資料,擷取ip資訊,然後轉換為數字,和ip段比較
  3. 比較的時候采用二分法查找,找到對應的經度和緯度
  4. 對相同的經度和緯度做累計求和

資料形式

  1. 日志通路資訊,對應:去敏感使用者ID,IP位址
    pyspark中ip地理位置統計案例代碼實作
  2. IP收錄資訊,對應IP起始和結束範圍、IP經緯坐标
    pyspark中ip地理位置統計案例代碼實作

代碼實作

  1. 環境變量及pyspark的導入
import os
from pyspark import SparkContext
import time
from pyspark.sql import SparkSession
JAVA_HOME='/root/bigdata/jdk'
#向系統環境變量中添加 JAVA_HOME路徑
os.environ['JAVA_HOME']=JAVA_HOME
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
# PYSPARK使用Python位置
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# PYSPARK驅動使用Python的位置
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
           
  1. 定義一個能将使用者通路的IP位址轉換為對應10進制的方法
# 255.255.255.255 0~255 256  2^8 8位2進制數 32位2進制數
#将ip轉換為特殊的數字形式  223.243.0.0|223.243.191.255|  255 2^8
#‭11011111‬
#00000000
#1101111100000000
#‭        11110011‬
#11011111111100110000000000000000
def ip_transform(ip):     
        ips = ip.split(".")#[223,243,0,0] 32位二進制數
        ip_num = 0
        for i in ips:
                ip_num = int(i) | ip_num << 8
        return ip_num
           
  1. 定義一個能進行二分查找的方法,查找對應使用者通路IP位址對應經緯度的範圍
#二分法查找ip對應的行的索引
def binary_search(ip_num, broadcast_value):
        start = 0
        end = len(broadcast_value) - 1
        while (start <= end):
                mid = int((start + end) / 2)
                if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
                        return mid
                if ip_num < int(broadcast_value[mid][0]):
                        end = mid
                if ip_num > int(broadcast_value[mid][1]):
                        start = mid

           
  1. 定義一個主方法,負責啟動資料處理
def main():
        spark = SparkSession.builder.appName("test").getOrCreate()
        sc = spark.sparkContext
        city_id_rdd = sc.textFile("file:///root/tmp/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
        #建立一個廣播變量
        city_broadcast = sc.broadcast(city_id_rdd.collect())
        dest_data = sc.textFile("file:///root/tmp/data/20090121000132.394251.http.format").map(
        lambda x: x.split("|")[1])
        #根據取出對應的位置資訊
        def get_pos(x):
                city_broadcast_value = city_broadcast.value
                #根據單個ip擷取對應經緯度資訊
                def get_result(ip):
                        ip_num = ip_transform(ip)
                        index = binary_search(ip_num, city_broadcast_value)
                        #((緯度,精度),1)
                        return ((city_broadcast_value[index][2], city_broadcast_value[index][3]), 1)

                x = map(tuple,[get_result(ip) for ip in x])
                # [get_result(ip) for ip in x] => [((1x,1o), 1), ((1x,1o), 1)]
                # (((1x,1o), 1), ((2x,2o), 1), ...)
                # x = [((1x,1o), 1), ((2x,2o), 1), ...]
                return x

        dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((緯度,精度),1)
        result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
        print(result_rdd.collect())
        sc.stop()

           
  1. 啟動資料處理
if __name__ == '__main__':
        main()
           

總結

一. 代碼實作思路:

  1. 提取資料
city_id_rdd = sc.textFile("file:///root/tmp/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
           
  1. 定義資料處理的方法
def get_pos(x):
 def get_result(ip):
           
  1. 将資料丢給對應的方法
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((緯度,精度),1)
           
  1. 統計資料
result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
           
  1. 輸出結果
print(result_rdd.collect())
           

二. 注意

  1. 使用廣播變量,防止記憶體溢出
city_broadcast = sc.broadcast(city_id_rdd.collect())
           
  1. 使用資料分塊處理,加快資料處理能力
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((緯度,精度),1)
           

繼續閱讀