天天看点

20200625大数据spark学习笔记

启动

src/redis-server redis.conf 

src/redis-cli -h hadoop000 -p 6379

 keys * 查询里面的key

 HBase特点

 大:

 面向列:列族(可以存放很多列),列族/列独立索引

 稀疏:对于空的列,不会占用存储空间

 数据类型单一:btye/string

 无模式:每一行的数据所对应的列不一定相同,每行的列是可以动态添加的

 数据多版本:比如company可以存放不同的版本的值

     默认情况下版本号是自动分配的,是列的值插入时的时间戳

 启动hbase

 cd /bin

 ./hbase shell

 create 'member','member_id','address','info'    

 插入数据 put 表明, rowkey, cf :column,value

 put 'member','pk','info:age','28'

 put 'member','pk','info:birthday','1990-05-05'

 put 'member','pk','info:company','imooc'

get 'member','pk','info'

put 'member','pk','info:age','18'  //改年纪

 scan 'member'

HBase 的api操作

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.*;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.filter.*;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.After;

import org.junit.Assert;

import org.junit.Before;

import org.junit.Test;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

public class HBaseApp {

    Connection connection = null;

    Table table = null;

    Admin admin = null;

    String tableName = "pk_hbase_java_api";

    @Before

    public void setUp() {

        Configuration configuration = new Configuration();

        configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");

        configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");

        try {

            connection = ConnectionFactory.createConnection(configuration);

            admin = connection.getAdmin();

            Assert.assertNotNull(connection);

            Assert.assertNotNull(admin);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    @Test

    public void getConnection() {

    }

    @Test

    public void createTable() throws Exception {

        TableName table = TableName.valueOf(tableName);

        if (admin.tableExists(table)) {

            System.out.println(tableName + " 已经存在...");

        } else {

            HTableDescriptor descriptor = new HTableDescriptor(table);

            descriptor.addFamily(new HColumnDescriptor("info"));

            descriptor.addFamily(new HColumnDescriptor("address"));

            admin.createTable(descriptor);

            System.out.println(tableName + " 创建成功...");

        }

    }

    @Test

    public void queryTableInfos() throws Exception {

        HTableDescriptor[] tables = admin.listTables();

        if (tables.length > 0) {

            for (HTableDescriptor table : tables) {

                System.out.println(table.getNameAsString());

                HColumnDescriptor[] columnDescriptors = table.getColumnFamilies();

                for (HColumnDescriptor hColumnDescriptor : columnDescriptors) {

                    System.out.println("\t" + hColumnDescriptor.getNameAsString());

                }

            }

        }

    }

    @Test

    public void testPut() throws Exception {

        table = connection.getTable(TableName.valueOf(tableName));

//        Put put = new Put(Bytes.toBytes("pk"));

//

//        // 通过PUT设置要添加数据的cf、qualifier、value

//        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("28"));

//        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("xxxx"));

//        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("imooc"));

//

//        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN"));

//        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("province"), Bytes.toBytes("GUANGDONG"));

//        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("shenzhen"));

//

//

//        // 将数据put到HBase中去

//        table.put(put);

        List<Put> puts = new ArrayList<>();

        Put put1 = new Put(Bytes.toBytes("jepson"));

        put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18"));

        put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("xxxx"));

        put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("apple"));

        put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN"));

        put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("province"), Bytes.toBytes("SHANGHAI"));

        put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("SHANGHAI"));

        Put put2 = new Put(Bytes.toBytes("xingxing"));

        put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("19"));

        put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("xxxx"));

        put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("PDD"));

        put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN"));

        put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("province"), Bytes.toBytes("SHANGHAI"));

        put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("SHANGHAI"));

        puts.add(put1);

        puts.add(put2);

        table.put(puts);

    }

    @Test

    public void testUpdate() throws Exception {

        table = connection.getTable(TableName.valueOf(tableName));

        Put put = new Put(Bytes.toBytes("xingxing"));

        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("20"));

        table.put(put);

    }

    @Test

    public void testGet01() throws Exception {

        table = connection.getTable(TableName.valueOf(tableName));

//        table = connection.getTable(TableName.valueOf("access_20190130"));

//

//        Get get = new Get("20190130_1433814004".getBytes());

//        //get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));

//

//        Result result = table.get(get);

//        printResult(result);

        Get get = new Get("pk".getBytes());

        get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));

        Result result = table.get(get);

        printResult(result);

    }

    @Test

    public void testScan01() throws Exception {

        table = connection.getTable(TableName.valueOf(tableName));

        Scan scan = new Scan();

//        scan.addFamily(Bytes.toBytes("info"));

        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"));

        //Scan scan = new Scan(Bytes.toBytes("jepson")); // >=

        //Scan scan = new Scan(new Get(Bytes.toBytes("jepson")));

        //Scan scan = new Scan(Bytes.toBytes("jepson"),Bytes.toBytes("xingxing")); // [)

        ResultScanner rs = table.getScanner(scan);

//        ResultScanner rs = table.getScanner(Bytes.toBytes("info"), Bytes.toBytes("company"));

        for(Result result : rs){

            printResult(result);

            System.out.println("~~~~~~~~~~~~~");

        }

    }

    @Test

    public void testFilter() throws Exception {

        table = connection.getTable(TableName.valueOf(tableName));

        Scan scan = new Scan();

//        String reg = "^*ing";

//        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(reg));

//        scan.setFilter(filter);

//        Filter filter = new PrefixFilter(Bytes.toBytes("p"));

//        scan.setFilter(filter);

        FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ONE);

        Filter filter1 = new PrefixFilter("p".getBytes());

        Filter filter2 = new PrefixFilter("j".getBytes());

        filters.addFilter(filter1);

        filters.addFilter(filter2);

        scan.setFilter(filters);

        ResultScanner rs = table.getScanner(scan);

        for(Result result : rs){

            printResult(result);

            System.out.println("~~~~~~~~~~~~~");

        }

    }

    private void printResult(Result result) {

        for (Cell cell : result.rawCells()) {

            System.out.println(Bytes.toString(result.getRow()) + "\t "

                    + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t"

                    + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t"

                    + Bytes.toString(CellUtil.cloneValue(cell)) + "\t"

                    + cell.getTimestamp());

        }

    }

    @After

    public void tearDown() {

        try {

            connection.close();

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

}

项目背景

    离线+实时  基于Spark(RDD/SQL/Streaming)

    基于慕课网的访问日志进行统计分析

        访问日志:离线  HBase

            点击流日志:

            搜索:关键字

        订单数据日志:实时  Redis

        统计:不同需求/业务+根据不同的维度进行统计

            今天:新增了多少注册会员、订单量多少、订单金额多少?

            今天和昨天对比:增长?减少?百分比

            会员

            订单

            运营商/地市

离线项目的架构/处理流程

    数据采集:落地到HDFS  外部将数据采集到内部

        SDK数据==>日志==>Hadoop

        Server日志:Flume、Logstash

        数据库:Sqoop

        提供给你们采集过来的数据,直接存放在HDFS上即可,后续的所有操作都是基于这份数据进行的

    (*****)数据预处理/数据清洗: 脏/乱数据 ==> 数据规整化(RDD/DF/DS)

        [30/Jan/2019:00:00:21 +0800] ==> 时间解析

        按照指定的分隔符进行拆分

        加字段

            ip==>城市、运营商、经纬度

        减字段

        使用技术:Spark

        HDFS ==> Spark ==> HBase

    (*****)数据入库:把规整化的数据写入到存储(HBase)

        Hive、HBase、Redis......

        使用技术:HBase

        rowkey设计

        cf

        column

    (*****)数据分析

        出报表的核心所在

        统计分析结果可以找个地方存储起来

        使用技术:Spark

        HBase ==> MapReduce/Spark ==> 业务逻辑分析(代码)  ==> DB

        HBase ==> Hive/Spark SQL  ==> SQL ==> DB

    数据展示:将分析所得到的数据进行可视化显示

        使用技术:HUE、Zeppelin、Echarts、自研

离线项目中要统计的指标/需求

    1)区域统计:国家、省份

    2)终端统计:浏览器、版本号

    ==> Spark + HBase综合运用  *****

    两个版本:

        业务逻辑实现

        各种性能的优化

    两种实现

        Spark Core

        Spark SQL

Spark:

    日志按照统计需求清洗到HBase表中

        log ==> df

        DF  ==> put

        Spark把put写入到HBase中

    对HBase表中的数据进行维度指标的统计分析操作

        Spark把HBase中的Result读取出来

            使用RDD进行统计分析

            使用DataFrame API进行统计分析

            使用Spark SQL API进行统计分析

next:对这章的内容进行优化和重构

UserAgent进行处理和统计分析

    如何解析UserAgent的信息呢?

        自己开发:麻烦

        首先想到的是Github,是不是已经有开源的解析处理的工程呢?

        https://github.com/chetan/UASparser

    操作系统信息、浏览器信息

统计各个省份、地市的访问信息

    需要根据IP进行解析

        开源:纯真(有兴趣的,自己去尝试使用纯真进行解析)

        生产:收费 ,会定时更新IP库、直接调用人家公司提供的IP解析API就可以的

    调用Spark的方法,内部已经给你们集成好的

Spark+HBase+Redis综合使用,pom.xml中需要添加一些框架的依赖

基于Spark的流处理框架

项目背景:

    离线处理/批处理:慕课网的访问日志:点击、搜索

    实时处理:订单日志

        谁、什么时候、下单了什么课程、支付、IP(运营商、地市)、UA

流处理系统

    Spark Streaming

    Structured Streaming  *****

    Flink

    Storm

    Kafka Stream

项目架构及处理流程

    log==>Flume==>Kafka==>SparkStreaming(Direct)==>Redis

    实时:代码来生成订单日志==>Kafka==>SparkStreaming(Direct)==>Redis

    离线:HDFS==>Spark==>HBase

公司大数据团队的分工:采集、批处理、实时处理、API、前端

项目需求

1)统计每天付费成功的总订单数、订单总金额

2)统计每个小时付费成功的总订单数、订单金额

==>统计每分钟付费成功的总订单数、订单金额

==>统计基于Window付费成功的总订单数、订单金额

==>付费订单占到总下单的占比:天、小时、分钟

不能拘泥于某个具体的需求,而因为从一类场景中进行拓展/扩展,进而举一反三,才能达到更好的学习效果

Spark Streaming来进行统计分析,分析结果我们需要写入到Redis(数据类型的合适选择)

Spark Streaming&Kafka&Redis整合

离线项目:访问日志

实时项目:付费日志

    下单,但是没付钱

    下单,付钱

    time,userid,courseid,orderid,fee

    json格式提供

SparkStreaming读取Kafka的数据,通过fastjson的方式把我们所需要的字段解析出来

根据我们的业务逻辑实现功能:代码的重构,好好理解下

根据不同的业务选择合适的Redis的数据类型进行存储即可

我们的职责是把数据存储到Redis就行了,对于后续还有展示的功能,我们不考虑这部分的实现

我就不打包了到服务器上运行了,

作业:自己根据离线项目的讲解,把实时项目打包到服务器上运行

如果有疑问的,到时候加入到我们课程的QQ群里,我们一起来交流和讨论

彩蛋:这们课程我们会定时在课程群里,安排直播答疑

"auto.offset.reset" -> "latest"

如果Spark挂了,Kafka还在运行的话,可能会有数据的丢失

Kafka offset管理起来

StreamingContext

从Kafka中获取要处理的数据

根据业务来处理数据

处理结果入库

启动程序,等待程序终止

挂了:kafka的数据到底是从头开始还是从最新数据开始

正确的做法:

    第一次应用程序启动的时候,要从某个地方获取到已经消费过的offset

    业务逻辑处理完之后,应该要把已经处理完的offset给它保存到某个地方去

offset存储的地方

    Checkpoint

    Kafka

    ZK/MySQL/HBase/Redis

作业:把offset管理起来

------------------------------------------------------------------------

有点坑,把kafka版本换到2.11-0.10.0.0

配置下tmp目录跟localhost->hadoop000

cd /bin 启动

./kafka-server-start.sh -daemon ~/app/kafka_2.11-0.10.0.0/config/server.properties 

./kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic pkkafka

./kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic pkkafkatest

//测试

./kafka-console-producer.sh --broker-list hadoop000:9092 --topic pkkafkatest

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic pkkafkatest

继续阅读