需求:将表中資料按照name聚合,并且count進行累加
name | count |
---|---|
Jan | 1 |
2 | |
Feb | 3 |
Mar | |
5 |
預期結果:
7 | |
13 |
使用idea的maxCompute studio新增UDAF

然後自動生成未實作的方法,我的字段name是string,count是bigint
是以@Resolve("string,bigint->bigint")
新增一個class用來存儲字段
private String name;
private Long count;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
count = in.readLong();
}
}
這樣newBuffer就可以這樣寫了
public Writable newBuffer() {
return new MyBuffer();
}
還需要一個Map來存儲key(name)和value(count),一個long類型的參數存儲累加的值
Long old_count = 0L;//存儲累加值
private LongWritable ret = new LongWritable();//存儲輸出值
完整代碼參考:
public class UDAFTest extends Aggregator {
private static class MyBuffer implements Writable {
private String name;
private Long count;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
count = in.readLong();
}
}
@Override
public Writable newBuffer() {
return new MyBuffer();
}
Map<String,Long> map = new LinkedHashMap<>();
Long old_count = 0L;
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
String arg = String.valueOf(args[0]);
Long cnt = Long.parseLong(String.valueOf(args[1]));
MyBuffer buf = (MyBuffer) buffer;
if (arg != null) {
if(map.containsKey(arg)){
Long newcnt = map.get(arg);
old_count = cnt+newcnt;
map.put(arg,old_count);
}else {
map.put(arg,old_count+cnt);
}
}
buf.name = arg;
buf.count = map.get(arg);
}
private LongWritable ret = new LongWritable();
@Override
public Writable terminate(Writable arg0) throws UDFException {
MyBuffer buffer = (MyBuffer) arg0;
ret.set(buffer.count);
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
MyBuffer buf = (MyBuffer) buffer;
MyBuffer p = (MyBuffer) partial;
buf.name = p.name;
buf.count = p.count;
}
}
然後通過maxCompute studio釋出下
釋出名為test20191119,這樣就可以在Dataworks中調用了。
其中原表資料: