天天看點

Mysql 流增量寫入 Hdfs(二) --Storm + hdfs 的流式處理

一. 概述

上一篇我們介紹了如何将資料從 mysql 抛到 kafka,這次我們就專注于利用 storm 将資料寫入到 hdfs 的過程,由于 storm 寫入 hdfs 的可定制東西有些多,我們先不從 kafka 讀取,而先自己定義一個 Spout 資料充當資料源,下章再進行整合。這裡預設你是擁有一定的 storm 知識的基礎,起碼知道 Spout 和 bolt 是什麼。

寫入 hdfs 可以有以下的定制政策:

  1. 自定義寫入檔案的名字
  2. 定義寫入内容格式
  3. 滿足給定條件後更改寫入的檔案
  4. 更改寫入檔案時觸發的 Action

本篇會先說明如何用 storm 寫入 HDFS,寫入過程一些 API 的描述,以及最後給定一個例子:

storm 每接收到 10 個 Tuple 後就會改變 hdfs 寫入檔案,新檔案的名字就是第幾次改變。

ps:storm 版本:1.1.1 。Hadoop 版本:2.7.4 。

接下來我們首先看看 Storm 如何寫入 HDFS 。

二. Storm 寫入 HDFS

Storm 官方有提供了相應的 API 讓我們可以使用。可以通過建立 HdfsBolt 以及定義相應的規則,即可寫入 HDFS 。

首先通過 maven 配置依賴以及插件。

<properties>
        <storm.version>1.1.1</storm.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--<scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>

        <!--hadoop子產品-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>1.1.1</version>
            <!--<scope>test</scope>-->
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
                </configuration>
            </plugin>
   
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.7</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
           

這裡要提一下,如果要打包部署到叢集上的話,打包的插件需要使用 maven-shade-plugin 這個插件,然後使用 maven Lifecycle 中的 package 打包。而不是用 Maven-assembly-plugin 插件進行打包。

因為使用 Maven-assembly-plugin 的時候,會将所有依賴的包unpack,然後在pack,這樣就會出現,同樣的檔案被覆寫的情況。釋出到叢集上的時候就會報 No FileSystem for scheme: hdfs 的錯 。

然後是使用 HdfsBolt 寫入 Hdfs。這裡來看看官方文檔中的例子吧。

// 使用 "|" 來替代 ",",來進行字元分割
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// 每輸入 1k 後将内容同步到 Hdfs 中
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// 當檔案大小達到 5MB ,轉換寫入檔案,即寫入到一個新的檔案中
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

//當轉換寫入檔案時,生成新檔案的名字并使用
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");

HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:9000")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);

//生成該 bolt
topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
                   

到這裡就結束了。可以将 HdfsBolt 當作一個 Storm 中特殊一些的 bolt 即可。這個 bolt 的作用即使根據接收資訊寫入 Hdfs。

而在建立 HdfsBolt 中,Storm 為我們提供了相當強的靈活性,我們可以定義一些政策,比如當達成某個條件的時候轉換寫入檔案,新寫入檔案的名字,寫入時候的分隔符等等。

如果選擇使用的話,Storm 有提供部分接口供我們使用,但如果我們覺得不夠豐富也可以自定義相應的類。下面我們看看如何控制這些政策吧。

RecordFormat

這是一個接口,允許你自由定義接收到内容的格式。

public interface RecordFormat extends Serializable {
    byte[] format(Tuple tuple);
}           

Storm 提供了 DelimitedRecordFormat ,使用方法在上面已經有了。這個類預設的分割符是逗号",",而你可以通過 withFieldDelimiter 方法改變分隔符。

如果你的初始分隔符不是逗号的話,那麼也可以重寫寫一個類實作 RecordFormat 接口即可。

FileNameFormat

同樣是一個接口。

public interface FileNameFormat extends Serializable {
    void prepare(Map conf, TopologyContext topologyContext);
    String getName(long rotation, long timeStamp);
    String getPath();
}           

Storm 所提供的預設的是 org.apache.storm.hdfs.format.DefaultFileNameFormat 。預設人使用的轉換檔案名有點長,格式是這樣的:

{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}

例如:

MyBolt-5-7-1390579837830.txt

預設情況下,字首是空的,擴充辨別是".txt"。

SyncPolicy

同步政策允許你将 buffered data 緩沖到 Hdfs 檔案中(進而client可以讀取資料),通過實作org.apache.storm.hdfs.sync.SyncPolicy 接口:

public interface SyncPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}           

FileRotationPolicy

這個接口允許你控制什麼情況下轉換寫入檔案。

public interface FileRotationPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}           

Storm 有提供三個實作該接口的類:

  • 最簡單的就是不進行轉換的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy ,就是什麼也不幹。
  • 通過檔案大小觸發轉換的 org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。
  • 通過時間條件來觸發轉換的 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。

如果有更加複雜的需求也可以自己定義。

RotationAction

這個主要是提供一個或多個 hook ,可加可不加。主要是在觸發寫入檔案轉換的時候會啟動。

public interface RotationAction extends Serializable {
    void execute(FileSystem fileSystem, Path filePath) throws IOException;
}           

三.實作一個例子

了解了上面的情況後,我們會實作一個例子,根據寫入記錄的多少來控制寫入轉換(改變寫入的檔案),并且轉換後檔案的名字表示目前是第幾次轉換。

首先來看看 HdfsBolt 的内容:

RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
//        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
        /** rotate file with Date,every month create a new file
         * format:yyyymm.txt
         */
        FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
        FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
        RotationAction action = new NewFileAction();
        HdfsBolt bolt = new HdfsBolt()
                .withFsUrl("hdfs://127.0.0.1:9000")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy)
                .addRotationAction(action);           

然後分别來看各個政策的類。

import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 計數以改變Hdfs寫入檔案的位置,當寫入10次的時候,則更改寫入檔案,更改名字取決于 “TimesFileNameFormat”
 * 這個類是線程安全
 */

public class CountStrRotationPolicy implements FileRotationPolicy {


    private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");

    private String date =  null;

    private int count = 0;

    public CountStrRotationPolicy(){
        this.date =  df.format(new Date());
//        this.date = df.format(new Date());
    }


    /**
     * Called for every tuple the HdfsBolt executes.
     *
     * @param tuple  The tuple executed.
     * @param offset current offset of file being written
     * @return true if a file rotation should be performed
     */
    @Override
    public boolean mark(Tuple tuple, long offset) {
        count ++;
        if(count == 10) {
            System.out.print("num :" +count + "   ");
            count = 0;
            return true;

        }
        else {
            return false;
        }
    }

    /**
     * Called after the HdfsBolt rotates a file.
     */
    @Override
    public void reset() {

    }

    @Override
    public FileRotationPolicy copy() {
        return new CountStrRotationPolicy();
    }


}           

import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;

import java.util.Map;

/**
 * 決定重新寫入檔案時候的名字
 * 這裡會傳回是第幾次轉換寫入檔案,将這個第幾次做為檔案名
 */
public class TimesFileNameFormat implements FileNameFormat {
    //預設路徑
    private String path = "/storm";
    //預設字尾
    private String extension = ".txt";
    private Long times = new Long(0);

    public TimesFileNameFormat withPath(String path){
        this.path = path;
        return this;
    }

    @Override
    public void prepare(Map conf, TopologyContext topologyContext) {
    }


    @Override
    public String getName(long rotation, long timeStamp) {
        times ++ ;
        //傳回檔案名,檔案名為更換寫入檔案次數
        return times.toString() + this.extension;
    }

    public String getPath(){
        return this.path;
    }
}           

import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
/**
    當轉換寫入檔案時候調用的 hook ,這裡僅寫入日志。
 */
public class NewFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);



    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        LOG.info("Hdfs change the written file!!");

        return;
    }
}           

OK,這樣就大功告成了。通過上面的代碼,每接收到 10 個 Tuple 後就會轉換寫入檔案,新檔案的名字就是第幾次轉換。

完整代碼包括一個随機生成字元串的 Spout ,可以到我的 github 上檢視。

StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo

更多幹貨,歡迎關注公衆号,哈爾的資料城堡。