一、UDF&UDAF
public class JavaExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("Sam", "Tom", "Jetty", "Tom", "Jetty"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
/**
* 动态创建Schema方式加载DF
*/
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = sqlContext.createDataFrame(rowRDD, schema);
dataFrame.registerTempTable("user");
/**
* 根据UDF函数参数的个数决定是实现哪个UDF UDF1,UDF2...
* UDF1表示传一个参数,StrLen(name)
*/
sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {
public Integer call(String s) throws Exception {
return s.length();
}
}, DataTypes.IntegerType);
sqlContext.sql("select name,StrLen(name) as length from user").show();
/**
* 注册一个UDAF函数,实现统计相同的值得个数
*/
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
/**
* 指定输入字段的字段及类型
*/
@Override
public StructType inputSchema() {
return DataTypes.createStructType(
Arrays.asList(DataTypes.createStructField("name",
DataTypes.StringType, true)));
}
/**
* 指定UDAF函数计算后返回的结果类型
*/
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}
/**
* 确保一致性一般用true,用以标记针对给定的一组输入,UDAF总是生成相同的结果
*/
@Override
public boolean deterministic() {
return true;
}
/**
* 可以认为一个一个的将组内的字段值传递出来显现拼接的逻辑
* buffer.update(0)获取的是上一次聚合的值
* 相当于map的combiner,combiner就是对每一个map task的处理结果进行一次小聚合
* 大聚合发生在reduce端
* 这里既是:在进行聚合的时候,每当有新的值进来,对分组后的聚合
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
buffer.update(0,buffer.getInt(0)+1);
}
/**
* 在进行聚合操作的时候所要处理的数据的结果的类型
*/
@Override
public StructType bufferSchema() {
return DataTypes.createStructType(
Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));
}
/**
* 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,
* 会分布在多个节点上处理
* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值
* buffer2.getInt(0) : 这次计算传入进来的update的结果
* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
}
/**
* 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0);
}
/**
* 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果
*/
@Override
public Object evaluate(Row buffer) {
return buffer.getInt(0);
}
});
sqlContext.sql("select name ,StringCount(name) as number from user group by name").show();
sc.stop();
}
}
二、开窗函数
例:驾驶第一列为日期,第二类为类别,第三类为价格。统计每个类别赚的最多的三次。
数据:https://download.csdn.net/download/qq_33283652/10904792
/**
* row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于 分组取topN
* 如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用HiveContext来执行,HiveContext默认情况下在本地无法创建。
* 开窗函数格式:
* row_number() over (partitin by XXX order by XXX desc)
*/
public class JavaExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("windowfun");
conf.set("spark.sql.shuffle.partitions","1");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
/**
* 开窗函数格式:
* 【 rou_number() over (partitin by XXX order by XXX) 】
*/
Dataset<Row> result = hiveContext.sql("select riqi,leibie,jine "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
sc.stop();
}
}