天天看点

pyspark中ip地理位置统计案例代码实现

ip地理位置统计案例代码实现

案例分析:

一、 ip地理位置统计案例思路

  1. 加载城市ip段信息,获取ip起始数字和结束数字,经度,纬度
  2. 加载日志数据,获取ip信息,然后转换为数字,和ip段比较
  3. 比较的时候采用二分法查找,找到对应的经度和纬度
  4. 对相同的经度和纬度做累计求和

数据形式

  1. 日志访问信息,对应:去敏感用户ID,IP地址
    pyspark中ip地理位置统计案例代码实现
  2. IP收录信息,对应IP起始和结束范围、IP经纬坐标
    pyspark中ip地理位置统计案例代码实现

代码实现

  1. 环境变量及pyspark的导入
import os
from pyspark import SparkContext
import time
from pyspark.sql import SparkSession
JAVA_HOME='/root/bigdata/jdk'
#向系统环境变量中添加 JAVA_HOME路径
os.environ['JAVA_HOME']=JAVA_HOME
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
# PYSPARK使用Python位置
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# PYSPARK驱动使用Python的位置
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
           
  1. 定义一个能将用户访问的IP地址转换为对应10进制的方法
# 255.255.255.255 0~255 256  2^8 8位2进制数 32位2进制数
#将ip转换为特殊的数字形式  223.243.0.0|223.243.191.255|  255 2^8
#‭11011111‬
#00000000
#1101111100000000
#‭        11110011‬
#11011111111100110000000000000000
def ip_transform(ip):     
        ips = ip.split(".")#[223,243,0,0] 32位二进制数
        ip_num = 0
        for i in ips:
                ip_num = int(i) | ip_num << 8
        return ip_num
           
  1. 定义一个能进行二分查找的方法,查找对应用户访问IP地址对应经纬度的范围
#二分法查找ip对应的行的索引
def binary_search(ip_num, broadcast_value):
        start = 0
        end = len(broadcast_value) - 1
        while (start <= end):
                mid = int((start + end) / 2)
                if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
                        return mid
                if ip_num < int(broadcast_value[mid][0]):
                        end = mid
                if ip_num > int(broadcast_value[mid][1]):
                        start = mid

           
  1. 定义一个主方法,负责启动数据处理
def main():
        spark = SparkSession.builder.appName("test").getOrCreate()
        sc = spark.sparkContext
        city_id_rdd = sc.textFile("file:///root/tmp/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
        #创建一个广播变量
        city_broadcast = sc.broadcast(city_id_rdd.collect())
        dest_data = sc.textFile("file:///root/tmp/data/20090121000132.394251.http.format").map(
        lambda x: x.split("|")[1])
        #根据取出对应的位置信息
        def get_pos(x):
                city_broadcast_value = city_broadcast.value
                #根据单个ip获取对应经纬度信息
                def get_result(ip):
                        ip_num = ip_transform(ip)
                        index = binary_search(ip_num, city_broadcast_value)
                        #((纬度,精度),1)
                        return ((city_broadcast_value[index][2], city_broadcast_value[index][3]), 1)

                x = map(tuple,[get_result(ip) for ip in x])
                # [get_result(ip) for ip in x] => [((1x,1o), 1), ((1x,1o), 1)]
                # (((1x,1o), 1), ((2x,2o), 1), ...)
                # x = [((1x,1o), 1), ((2x,2o), 1), ...]
                return x

        dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((纬度,精度),1)
        result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
        print(result_rdd.collect())
        sc.stop()

           
  1. 启动数据处理
if __name__ == '__main__':
        main()
           

总结

一. 代码实现思路:

  1. 提取数据
city_id_rdd = sc.textFile("file:///root/tmp/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
           
  1. 定义数据处理的方法
def get_pos(x):
 def get_result(ip):
           
  1. 将数据丢给对应的方法
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((纬度,精度),1)
           
  1. 统计数据
result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
           
  1. 输出结果
print(result_rdd.collect())
           

二. 注意

  1. 使用广播变量,防止内存溢出
city_broadcast = sc.broadcast(city_id_rdd.collect())
           
  1. 使用数据分块处理,加快数据处理能力
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((纬度,精度),1)
           

继续阅读