ip地理位置統計案例代碼實作
案例分析:
一、 ip地理位置統計案例思路
- 加載城市ip段資訊,擷取ip起始數字和結束數字,經度,緯度
- 加載日志資料,擷取ip資訊,然後轉換為數字,和ip段比較
- 比較的時候采用二分法查找,找到對應的經度和緯度
- 對相同的經度和緯度做累計求和
資料形式
- 日志通路資訊,對應:去敏感使用者ID,IP位址
- IP收錄資訊,對應IP起始和結束範圍、IP經緯坐标
代碼實作
- 環境變量及pyspark的導入
import os
from pyspark import SparkContext
import time
from pyspark.sql import SparkSession
JAVA_HOME='/root/bigdata/jdk'
#向系統環境變量中添加 JAVA_HOME路徑
os.environ['JAVA_HOME']=JAVA_HOME
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
# PYSPARK使用Python位置
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# PYSPARK驅動使用Python的位置
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
- 定義一個能将使用者通路的IP位址轉換為對應10進制的方法
# 255.255.255.255 0~255 256 2^8 8位2進制數 32位2進制數
#将ip轉換為特殊的數字形式 223.243.0.0|223.243.191.255| 255 2^8
#11011111
#00000000
#1101111100000000
# 11110011
#11011111111100110000000000000000
def ip_transform(ip):
ips = ip.split(".")#[223,243,0,0] 32位二進制數
ip_num = 0
for i in ips:
ip_num = int(i) | ip_num << 8
return ip_num
- 定義一個能進行二分查找的方法,查找對應使用者通路IP位址對應經緯度的範圍
#二分法查找ip對應的行的索引
def binary_search(ip_num, broadcast_value):
start = 0
end = len(broadcast_value) - 1
while (start <= end):
mid = int((start + end) / 2)
if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
return mid
if ip_num < int(broadcast_value[mid][0]):
end = mid
if ip_num > int(broadcast_value[mid][1]):
start = mid
- 定義一個主方法,負責啟動資料處理
def main():
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
city_id_rdd = sc.textFile("file:///root/tmp/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
#建立一個廣播變量
city_broadcast = sc.broadcast(city_id_rdd.collect())
dest_data = sc.textFile("file:///root/tmp/data/20090121000132.394251.http.format").map(
lambda x: x.split("|")[1])
#根據取出對應的位置資訊
def get_pos(x):
city_broadcast_value = city_broadcast.value
#根據單個ip擷取對應經緯度資訊
def get_result(ip):
ip_num = ip_transform(ip)
index = binary_search(ip_num, city_broadcast_value)
#((緯度,精度),1)
return ((city_broadcast_value[index][2], city_broadcast_value[index][3]), 1)
x = map(tuple,[get_result(ip) for ip in x])
# [get_result(ip) for ip in x] => [((1x,1o), 1), ((1x,1o), 1)]
# (((1x,1o), 1), ((2x,2o), 1), ...)
# x = [((1x,1o), 1), ((2x,2o), 1), ...]
return x
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((緯度,精度),1)
result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())
sc.stop()
- 啟動資料處理
if __name__ == '__main__':
main()
總結
一. 代碼實作思路:
- 提取資料
city_id_rdd = sc.textFile("file:///root/tmp/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
- 定義資料處理的方法
def get_pos(x):
def get_result(ip):
- 将資料丢給對應的方法
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((緯度,精度),1)
- 統計資料
result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
- 輸出結果
print(result_rdd.collect())
二. 注意
- 使用廣播變量,防止記憶體溢出
city_broadcast = sc.broadcast(city_id_rdd.collect())
- 使用資料分塊處理,加快資料處理能力
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((緯度,精度),1)