天天看點

maxCompute的UDAF demo,實作累加功能

需求:将表中資料按照name聚合,并且count進行累加

name count
Jan 1
2
Feb 3
Mar
5

預期結果:

7
13

使用idea的maxCompute studio新增UDAF

maxCompute的UDAF demo,實作累加功能
maxCompute的UDAF demo,實作累加功能

然後自動生成未實作的方法,我的字段name是string,count是bigint

是以@Resolve("string,bigint->bigint")

新增一個class用來存儲字段

private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }           

這樣newBuffer就可以這樣寫了

public Writable newBuffer() {
        return new MyBuffer();
    }           

還需要一個Map來存儲key(name)和value(count),一個long類型的參數存儲累加的值

Long old_count = 0L;//存儲累加值
    private LongWritable ret = new LongWritable();//存儲輸出值           

完整代碼參考:

public class UDAFTest extends Aggregator {
    private static class MyBuffer implements Writable {
        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }



    @Override
    public Writable newBuffer() {
        return new MyBuffer();
    }
    Map<String,Long> map = new LinkedHashMap<>();
    Long old_count = 0L;
    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        String arg = String.valueOf(args[0]);
        Long cnt = Long.parseLong(String.valueOf(args[1]));
        MyBuffer buf = (MyBuffer) buffer;

        if (arg != null) {
            if(map.containsKey(arg)){
                Long newcnt = map.get(arg);
                old_count = cnt+newcnt;
                map.put(arg,old_count);
            }else {
                map.put(arg,old_count+cnt);
            }
        }
        buf.name = arg;
        buf.count = map.get(arg);

    }
    private LongWritable ret = new LongWritable();
    @Override
    public Writable terminate(Writable arg0) throws UDFException {
        MyBuffer buffer = (MyBuffer) arg0;
        ret.set(buffer.count);
        return ret;
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        MyBuffer buf = (MyBuffer) buffer;
        MyBuffer p = (MyBuffer) partial;
        buf.name = p.name;
        buf.count = p.count;
    }


}           

然後通過maxCompute studio釋出下

maxCompute的UDAF demo,實作累加功能

釋出名為test20191119,這樣就可以在Dataworks中調用了。

其中原表資料:

maxCompute的UDAF demo,實作累加功能
maxCompute的UDAF demo,實作累加功能