天天看點

MaxCompute安裝Eclipse開發插件

為了友善使用者使用 MapReduce及UDF的Java SDK進行開發工作,MaxCompute 提供了Eclipse開發插件。該插件能夠模拟MapReduce及UDF的運作過程,為使用者提供本地調試手段,并提供了簡單的模闆生成功能。

一、安裝

與MapReduce提供的本地運作模式不同,Eclipse插件不能夠與ODPS同步資料。使用者使用的資料需要手動拷貝到Eclipse插件的warehouse目錄下。

下載下傳Eclipse插件後,将軟體包解壓,會看到如下jar内容:

odps-eclipse-plugin-bundle-X.X.X.jar

将插件放置在Eclipse安裝目錄的plugins子目錄下。打開Eclipse,點選右上角的打開透視圖(Open Perspective)。

MaxCompute安裝Eclipse開發插件
點選後出現透視圖清單。
MaxCompute安裝Eclipse開發插件
擇ODPS,随後點選OK鍵。同樣在右上角會出現ODPS圖示,表示插件生效。
MaxCompute安裝Eclipse開發插件

1、 建立ODPS工程

建立ODPS工程有兩種方式。

方式一:

在左上角選擇檔案(File) -> 建立(New)->Project->ODPS->ODPS Project,建立工程(示例中使用ODPS作為工程名)。

MaxCompute安裝Eclipse開發插件
建立ODPS工程後會出現如下對話框。輸入Project name,選擇ODPS用戶端路徑(用戶端需要提前下載下傳),并确認(點選Finish)。
MaxCompute安裝Eclipse開發插件
建立好工程後,在左側包資料總管(Package Explorer)中可以看到如下目錄結構。
MaxCompute安裝Eclipse開發插件

方式二:

直接點選左上角的"建立"。

MaxCompute安裝Eclipse開發插件
彈出對話框後,選擇"ODPS Project",點選"下一步"。
MaxCompute安裝Eclipse開發插件

後續操作同方式一。

2、MapReduce開發插件介紹

(1). 快速運作WordCount示例

選擇ODPS項目中的WordCount示例。

MaxCompute安裝Eclipse開發插件
右鍵"WordCount.java",依次點選"Run As","ODPS MapReduce"。
MaxCompute安裝Eclipse開發插件
彈出對話框後,選擇"example_project",點選确認。
MaxCompute安裝Eclipse開發插件
運作成功後,會出現以下結果提示。
MaxCompute安裝Eclipse開發插件

(2). 運作自定義MapReduce程式

右鍵選擇src目錄,選擇建立(New) -> Mapper。

MaxCompute安裝Eclipse開發插件
選擇Mapper後出現下面的對話框。輸入Mapper類的名字,并确認。
MaxCompute安裝Eclipse開發插件

會看到在左側包資料總管(Package Explorer)中,src目錄下生成檔案UserMapper.java。該檔案的内容即是一個Mapper類的模闆。

package odps;

import java.io.IOException;

import com.aliyun.odps.data.Record;

import com.aliyun.odps.mapred.MapperBase;

public class UserMapper extends MapperBase {

@Override
public void setup(TaskContext context) throws IOException {
}

@Override
public void map(long recordNum, Record record, TaskContext context)
        throws IOException {
}

@Override
public void cleanup(TaskContext context) throws IOException {
}
           

}

模闆中,将package名稱預設配置為"odps",使用者可以根據自己的需求進行修改。編寫模闆内容。

import com.aliyun.odps.counter.Counter;

Record word;
Record one;
Counter gCnt;

@Override
public void setup(TaskContext context) throws IOException {
      word = context.createMapOutputKeyRecord();
      one = context.createMapOutputValueRecord();
      one.set(new Object[] { 1L });
      gCnt = context.getCounter("MyCounters", "global_counts");
}

@Override
public void map(long recordNum, Record record, TaskContext context)
        throws IOException {
      for (int i = 0; i < record.getColumnCount(); i++) {
          String[] words = record.get(i).toString().split("\\s+");
          for (String w : words) {
            word.set(new Object[] { w });
            Counter cnt = context.getCounter("MyCounters", "map_outputs");
            cnt.increment(1);
            gCnt.increment(1);
            context.write(word, one);
          }
        }
      }

@Override
public void cleanup(TaskContext context) throws IOException {
}
           

同理,右鍵選擇src目錄,選擇建立(New)->Reduce。

MaxCompute安裝Eclipse開發插件

輸入Reduce類的名字(本示例使用UserReduce)。同樣在包資料總管(Package Explorer)中,src目錄下生成檔案UserReduce.java。該檔案的内容即是一個Reduce類的模闆。

import java.util.Iterator;

import com.aliyun.odps.mapred.ReducerBase;

public class UserReduce extends ReducerBase {

private Record result;
Counter gCnt;

@Override
public void setup(TaskContext context) throws IOException {
      result = context.createOutputRecord();
      gCnt = context.getCounter("MyCounters", "global_counts");
}

@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
        throws IOException {

      long count = 0;
      while (values.hasNext()) {
        Record val = values.next();
        count += (Long) val.get(0);
      }
      result.set(0, key.get(0));
      result.set(1, count);
      Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
      cnt.increment(1);
      gCnt.increment(1);

      context.write(result);
    }

@Override
public void cleanup(TaskContext context) throws IOException {
}
           

建立main函數,右鍵選擇src目錄,選擇建立(New) -> MapReduce Driver。填寫Driver Name(示例中是UserDriver), Mapper及Recduce類(示例中是UserMapper及UserReduce),并确認。同樣會在src目錄下看到MyDriver.java檔案。

MaxCompute安裝Eclipse開發插件

編輯driver内容。

import com.aliyun.odps.OdpsException;

import com.aliyun.odps.data.TableInfo;

import com.aliyun.odps.examples.mr.WordCount.SumCombiner;

import com.aliyun.odps.examples.mr.WordCount.SumReducer;

import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;

import com.aliyun.odps.mapred.JobClient;

import com.aliyun.odps.mapred.RunningJob;

import com.aliyun.odps.mapred.conf.JobConf;

import com.aliyun.odps.mapred.utils.InputUtils;

import com.aliyun.odps.mapred.utils.OutputUtils;

import com.aliyun.odps.mapred.utils.SchemaUtils;

public class UserDriver {

public static void main(String[] args) throws OdpsException {
    JobConf job = new JobConf();
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(SumCombiner.class);
    job.setReducerClass(SumReducer.class);

    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));

    InputUtils.addTable(
        TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
    InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
    OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);

    RunningJob rj = JobClient.runJob(job);
    rj.waitForCompletion();
}
           

運作MapReduce程式,選中UserDriver.java,右鍵選擇Run As -> ODPS MapReduce,點選确認。出現如下對話框。

MaxCompute安裝Eclipse開發插件

選擇ODPS Project為example_project,點選Finish按鈕開始本地運作MapReduce程式。

MaxCompute安裝Eclipse開發插件

有如上輸出資訊,說明本地運作成功。運作的輸出結果在warehouse目錄下。

MaxCompute安裝Eclipse開發插件

wc_out即是輸出目錄,R_000000即是結果檔案。通過本地調試,确定輸出結果正确後,可以通過Eclipse導出(Export)功能将MapReduce打包。打包後将jar包上傳到ODPS中。

本地調試通過後,使用者可以通過Eclipse的Export功能将代碼打成jar包,供後續分布式環境使用。在本示例中,我們将程式包命名為mr-examples.jar。選擇src目錄,點選Export。

MaxCompute安裝Eclipse開發插件

選擇導出模式為Jar File。

MaxCompute安裝Eclipse開發插件

僅需要導出src目錄下package(com.aliyun.odps.mapred.open.example),Jar File名稱指定為"mr-examples.jar"。

MaxCompute安裝Eclipse開發插件

确認後,導出成功。

如果使用者想在本地模拟建立Project,可以在warehouse下面,建立一個新的子目錄(與example_project平級的目錄)。

|____my_project (項目空間目錄)

|____ <__tables__>

| |__table_name1(非分區表)

| | |____ data(檔案)

| | |

| | |____ <__schema__> (檔案)

| |

| |__table_name2(分區表)

| |_____partition_name=partition_value(分區目錄)

| | |____ data(檔案)

| |

| |____ <__schema__> (檔案)

|

|____ <__resources__>

|
      |___table_resource_name (表資源)
      |     |____<__ref__>
      |
      |___ file_resource_name(檔案資源)           

schema檔案示例:

非分區表:

project=project_name

table=table_name columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING

分區表:

table=table_name columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING

data檔案示例:

1,1.1,true,2015-06-04 11:22:42 896,hello world

N,N,N,N,N

8.4 UDF開發插件介紹

(1). Local Debug UDF程式

在本章節我們将介紹如何使用Eclipse插件開發并在本地運作UDF。UDAF和UDTF的編寫執行過程與UDF類似,均可參考UDF的示例介紹完成。ODPS Eclipse插件提供兩種運作UDF的方式,菜單欄和右鍵單擊快速運作方式。

菜單欄運作

從菜單欄選擇Run-->Run Configurations...彈出如下對話框。

MaxCompute安裝Eclipse開發插件

使用者可以建立一個Run Configuration,選擇運作的UDF類及類型、選擇ODPS Project、填寫輸入表資訊。

MaxCompute安裝Eclipse開發插件

上述配置中,"Table"表示UDF的輸入表,"Partitions"表示讀取某個分區下的資料,分區由逗号分隔,"Columns"表示列,将依次作為UDF函數的參數被傳入,列名由逗号分隔。

點選"Run"運作,運作結果将顯示在控制台中。

MaxCompute安裝Eclipse開發插件

右鍵單擊快速運作

選中一個udf.java檔案(比如:UDFExample.java)并單擊滑鼠右鍵,選擇"Run As" -> "Run UDF|UDAF|UDTF"。

MaxCompute安裝Eclipse開發插件
MaxCompute安裝Eclipse開發插件

填入配置資訊。

MaxCompute安裝Eclipse開發插件

點選"Finish"後,運作UDF,獲得輸出結果。

(2). 運作使用者自定義UDF程式

右擊一個工程并選擇"New-->UDF"(或者選擇菜單欄File-->New-->UDF)。

填寫UDF類名然後點選"Finish"。在對應的src目錄下生成與UDF類名同名的Java檔案,編輯該java檔案内容。

import com.aliyun.odps.udf.UDF;

public class UserUDF extends UDF {

/**
   * project: example_project 
   * table: wc_in1 
   * columns: col1,col2
   * 
   */
  public String evaluate(String a, String b) {
    return "ss2s:" + a + "," + b;
  }
           

右擊該java檔案(如UserUDF.java),選擇"Run As",再選擇"ODPS UDF|UDTF|UDAF"。

配置如下對話框。

點選"finish",得出結果。

ss2s:A1,A2

ss2s:A1,A2

繼續閱讀