天天看点

DataX学习笔记-Writer插件开发

本文主要是基于将数据写入ElasticSearch开发的Writer插件

1、检出DataX源码(svn checkout http://code.taobao.org/svn/datax/trunk)

2、在com.taobao.datax.plugins.writer包下面创建一个eswriter包,新建ESWriter.java,ParamKey.java

package com.taobao.datax.plugins.writer.eswriter;

import org.apache.log4j.Logger;

import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
import com.taobao.datax.plugins.writer.eswriter.ParamKey;

public class ESWriter extends Writer {

    private String singleCurl = "";

    private String nullString = "";

    private String columnNameString = "";

    private String columnNameSplit = "";

    private String[] columnNames = null;

    private Logger logger = Logger.getLogger(ESWriter.class.getCanonicalName());

    @Override
    public int init() {
        this.singleCurl = param.getValue(ParamKey.singleCurl, 
                "curl -XPOST 'http://192.168.0.108:9200/user/student/{id}' -d '{data}'");
        this.nullString = param.getValue(ParamKey.nullChar, this.nullString);
        this.columnNameString = param.getValue(ParamKey.columnNames, "userid,name,phone");
        this.columnNameSplit = param.getValue(ParamKey.columnNameSplit, ",");
        this.columnNames = columnNameString.split(columnNameSplit);
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int connect() {
        return PluginStatus.SUCCESS.value();
    }

    private String makeCurl(Line line) {
        if (line == null || line.getFieldNum() == 0) {
            return this.singleCurl + "\n";
        }
        String item = null;
        int num = line.getFieldNum();
        this.singleCurl = this.singleCurl.replace("{id}", line.getField(0));
        StringBuilder sb = new StringBuilder().append("{");
        for (int i = 1; i < num; i++) {
            item = line.getField(i);
            sb.append("\"").append(columnNames[i-1]).append("\":\"");
            if (null == item) {
                sb.append(nullString);
            } else {
                sb.append(item);
            }
            sb.append("\",");
        }
        sb.deleteCharAt(sb.length() - 1);
        sb.append("}");
        return new StringBuilder(this.singleCurl).toString().replace("{data}", sb.toString());
    }

    @Override
    public int startWrite(LineReceiver receiver) {
        Line line;
        try {
            while ((line = receiver.getFromReader()) != null) {
                String curlCommand = makeCurl(line);
                String[] commands = new String[3];  
                commands[0] = "/bin/sh";  
                commands[1] = "-c";  
                commands[2] = curlCommand;  
                Runtime.getRuntime().exec(commands);
            }
            return PluginStatus.SUCCESS.value();
        }  catch (Exception e) {
            logger.error(ExceptionTracker.trace(e));
            throw new DataExchangeException(e.getCause());
        }
    }

    @Override
    public int commit() {
        return 0;
    }

    @Override
    public int finish() {
        return 0;
    }

}
           
package com.taobao.datax.plugins.writer.eswriter;

public final class ParamKey {

    /*
     * @name: singleCurl 
     * @description:  single curl
     * @range: 
     * @mandatory: false
     * @default:
     */
    public final static String singleCurl = "single_curl";

    /*
     * @name: nullChar
     * @description:  replace null with the nullchar
     * @range: 
     * @mandatory: false
     * @default: 
     */
    public final static String nullChar = "null_char";

    /*
     * @name: columnNames
     * @description:  column name list 
     * @range: 
     * @mandatory: false
     * @default: 
     */
    public final static String columnNames = "column_names";

    /*
     * @name: columnNameSplit
     * @description: separator to split column names
     * @range:
     * @mandatory: false
     * @default:\t
     */
    public final static String columnNameSplit = "column_name_split";

     /*
       * @name:concurrency
       * @description:concurrency of the job
       * @range:1
       * @mandatory: false
       * @default:1
       */
    public final static String concurrency = "concurrency";

}
           

3、在DataX安装目录的conf目录下的plugins.xml文件中添加以下内容

<plugin>

        <!-- the version of this plugin -->

        <version>1</version>

        <!-- the name of this plugin, it must be unique in this file -->

        <name>eswriter</name>

        <!-- reader | writer | spliter -->

        <type>writer</type>

        <!-- mysql | oracle | hdfs | feitian -->

        <target>es</target>

        <!-- the jar's filename -->

        <jar>eswriter-1.0.0.jar</jar>

        <!-- pulgin's class name, must include package path -->

        <class>com.taobao.datax.plugins.writer.eswriter.ESWriter</class>

        <!-- max concurrency the plugin can support -->

        <maxthreadnum>10</maxthreadnum>

 </plugin>

4、修改build.xml文件中部分内容

<target name="plugindist" depends="clean,compile">

        <foreach target="eachplugindist" param="var">

            <path id="plugins">

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/hdfswriter/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/oraclewriter/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/mysqlwriter/1.0.0" />           

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/hdfsreader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/oraclereader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/mysqlreader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/httpreader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/hbasewriter/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/hbasereader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/streamwriter/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/streamreader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/fakereader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/sqlserverreader/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/eswriter/1.0.0" />

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/esbulkwriter/1.0.0" />

            </path>

        </foreach>

</target>

<target name="eswriter" depends="clean,compile">

        <foreach target="eachplugindist" param="var">

            <path id="plugins">

                <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/eswriter/1.0.0" />

            </path>

        </foreach>

</target>

5、在项目目录下执行ant命令

6、在DataX安装目录的plugins/writer目录下新建eswriter目录,并将build/plugins目录下的 eswriter-1.0.0.jar,plugins-common-1.0.0.jar 和 ParamKey.java 放到新建的eswriter目录下

7、在DataX安装目录下执行bin/datax.py -e 按步骤选择生成一个mysqlreader_to_eswriter_1464169417703.xml

8、编辑DataX安装目录下jobs/mysqlreader_to_eswriter_1464169417703.xml文件欠缺的内容

9、在DataX安装目录下执行bin/datax.py jobs/mysqlreader_to_eswriter_1464169417703.xml

上面的步骤是基于ElasticSearch的CURL方式操作的,下面的步骤是基于ElasticSearch的JavaAPI方式操作的

代码如下:

package com.taobao.datax.plugins.writer.esbulkwriter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.google.gson.Gson;
import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;

public class ESBulkWriter extends Writer {

    private ESUtils esUtils = null;

    private List<String> pojos = null;

    private Logger logger = Logger.getLogger(ESBulkWriter.class.getCanonicalName());

    @Override
    public int init() {
        this.pojos = new ArrayList<String>();
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int connect() {
        List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
        serverAddress.add(new EsServerAddress("192.168.0.1", 9300));
        serverAddress.add(new EsServerAddress("192.168.0.2", 9300));
        this.esUtils = new ESUtils(new EsConfig("test", serverAddress));
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int startWrite(LineReceiver receiver) {
        Line line = null;
        Student student = null;
        Gson gson = new Gson();
        try {
            while ((line = receiver.getFromReader()) != null) {
                if (null != line && line.getFieldNum() > 0) {
                    student = new Student();
                    String field1 = line.getField(0);
                    if (StringUtils.isNotBlank(field1)) {
                        student.setId(Integer.parseInt(field1));
                    }
                    String field2 = line.getField(1);
                    if (StringUtils.isNotBlank(field2)) {
                        student.setUserid(Long.parseLong(field2));
                    }
                    String field3 = line.getField(2);
                    if (StringUtils.isNotBlank(field3)) {
                        student.setName(field3);
                    }
                    String field4 = line.getField(3);
                    if (StringUtils.isNotBlank(field4)) {
                        student.setPhone(field4);
                    }
                    pojos.add(gson.toJson(student));
                }

            }
            return PluginStatus.SUCCESS.value();
        }  catch (Exception e) {
            logger.error(ExceptionTracker.trace(e));
            throw new DataExchangeException(e.getCause());
        }
    }

    @Override
    public int commit() {
        esUtils.bulkSaveOrUpdate(pojos, "user", "student");
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int finish() {
        return 0;
    }

    class Student implements Serializable {

        private static final long serialVersionUID = 1L;

        private Integer id;

        private Long userid;

        private String name;

        private String phone;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public Long getUserid() {
            return userid;
        }

        public void setUserid(Long userid) {
            this.userid = userid;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getPhone() {
            return phone;
        }

        public void setPhone(String phone) {
            this.phone = phone;
        }

    }
}
           
package com.alibaba.datax.plugin.writer.eswriter;

import java.io.Serializable;

public class EsServerAddress implements Serializable {

	private static final long serialVersionUID = 1L;

	private String host;
	private Integer port = 9300;

	public EsServerAddress() {
		super();
	}

	public EsServerAddress(String host) {
		super();
		this.host = host;
	}

	public EsServerAddress(String host, Integer port) {
		super();
		this.host = host;
		this.port = port;
	}

	public String getHost() {
		return host;
	}

	public void setHost(String host) {
		this.host = host;
	}

	public Integer getPort() {
		return port;
	}

	public void setPort(Integer port) {
		this.port = port;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((host == null) ? 0 : host.hashCode());
		result = prime * result + ((port == null) ? 0 : port.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		EsServerAddress other = (EsServerAddress) obj;
		if (host == null) {
			if (other.host != null)
				return false;
		} else if (!host.equals(other.host))
			return false;
		if (port == null) {
			if (other.port != null)
				return false;
		} else if (!port.equals(other.port))
			return false;
		return true;
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append("EsServerAddress [host=");
		builder.append(host);
		builder.append(", port=");
		builder.append(port);
		builder.append("]");
		return builder.toString();
	}

}
           

其他步骤同上面步骤相同,但是需要将相关依赖的Jar包放到DataX安装目录的plugins/writer/eswriter目录下。

继续阅读