天天看點

基于haddop的HDFS和Excel開源庫POI導出大資料報表

關鍵詞

Java

PHP

hdfs

mqrocket

excel

poi

報表

需求背景  轉載位址

在業務需求方面,每個企業或多或少都會有報表導出的作業,量少則可是使用輸出流或者字元串的輸出即可完成,隻要指定respose的相應Content-Type即可。如果大量的資料需要導出,尤其是訂單這類業務邏輯複雜的報表,導出的時候需要加入各種條件和權限,從資料處理方面就已經很費力了,更何況導出的需求不是一天兩天,而是半月一月的資料量,小公司的業務,數量級也可能達到了十多萬。

function generateExcel($filename, $header, array &$data)
{
    generateDownHeader($filename);

    $rs = '<table><tr>';
    if (is_string($header)) {
        $header = explode(',', $header);
    }
    foreach ($header as $v) {
        $rs .= '<th>'.$v.'</th>';
    }
    $rs .= '</tr>';
    foreach ($data as $coll) {
        $rs .= '<tr>';
        foreach ($coll as $v) {
            if (AppHelper::isDouble($v)) {
                $rs .= '<td style="vnd.ms-excel.numberformat:@">'.$v.'</td>';
            } else {
                $rs .= '<td>'.$v.'</td>';
            }
        }
        $rs .= '</tr>';
    }

    $rs .= '</table>';

    echo $rs;
    exit;
}

function generateDownHeader($filename)
{
    header("Content-Type: application/force-download");
    header("Content-Type: application/octet-stream");
    header("Content-Type: application/download");
    header('Content-Disposition:inline;filename="'.$filename.'"');
    header("Content-Transfer-Encoding: binary");
    header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT");
    header("Cache-Control: must-revalidate, post-check=0, pre-check=0");
    header("Pragma: no-cache");
}                

這十多萬的資料,如果使用

一般的方法

(上面代碼所示)或許是不可行的(其他一般方法沒有嘗試過),php進行中一般使用curl調用接口,nginx伺服器和php中的curl請求逾時一般都是30s,30s處理1w條資料的導出工作,如果伺服器的性能好,并且是多核的,可以使用multi_curl多線程處理,如果伺服器的性能不是很好,這種處理方法或許更耗時。

下面是我使用的curl處理接口資料:

function curl($url, $option = null, $method = 'POST', $getCode = false, $header = [])
{
    $curl = curl_init ();
    curl_setopt($curl, CURLOPT_URL, $url);
    curl_setopt($curl, CURLOPT_TIMEOUT, );
    if (!array_key_exists('Content-Type', $header)) {
        $header['Content-Type'] = 'application/json;charset=UTF-8';
    }
    $headers = [];
    if ($header) {
        foreach ($header as $k=>$v) {
            $headers[] = $k.': '.$v;
        }
    }
    curl_setopt($curl, CURLOPT_HTTPHEADER, $headers);
    if ($option) {
        if (is_array($option)) {
            $option = json_encode($option);
        }
        curl_setopt($curl, CURLOPT_POSTFIELDS, $option);
    }
    curl_setopt($curl, CURLOPT_RETURNTRANSFER, );
    curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $method);
    $result = curl_exec($curl);
    if ($getCode) {
        $curl_code = curl_getinfo($curl, CURLINFO_HTTP_CODE);
        $message = self::isJson($result) ? json_decode($result, true) : $result;
        $result = ['code' => $curl_code];
        if (isset($message['exception']) && count($message) == ) {
            $result['exception'] = $message['exception'];
            $result['result'] = null;
        } else {
            $result['result'] = $message;
        }
    }
    curl_close($curl);
    return $result;
}                

因為資料量大,後來改為多線程:

function curlMulti(array $urls, $options = null, $method = 'POST',  $getCode = false, $header = []) 
{
    $mh = curl_multi_init();
    // 添加curl批處理會話
    $handles = $contents = [];
    foreach ($urls as $key => $url) {
        $handles[$key] = curl_init($url);
        curl_setopt($handles[$key], CURLOPT_RETURNTRANSFER, );
        curl_setopt($handles[$key], CURLOPT_TIMEOUT, );
        curl_setopt($handles[$key], CURLOPT_CUSTOMREQUEST, $method);

        if (!array_key_exists('Content-Type', $header)) {
            $header['Content-Type'] = 'application/json;charset=utf-8';
        }
        $headers = [];
        if ($header) {
            foreach ($header as $k => $val) {
                $headers[] = $k.': '.$val;
            }
        }
        curl_setopt($handles[$key], CURLOPT_HTTPHEADER, $headers);
        if ($options) {
            if (is_array($options)) {
                $options = json_encode($options);
            }
            curl_setopt($handles[$key], CURLOPT_POSTFIELDS, $options);
        }
        curl_multi_add_handle($mh, $handles[$key]);
    }
    // 執行批處理句柄
    /*$active = null;
    do{
        $mrc = curl_multi_exec($mh, $active);
    } while ($mrc == CURLM_CALL_MULTI_PERFORM);

    while ($active and $mrc == CURLM_OK) {
        if (curl_multi_select($mh) === -) {
            usleep();
            do {
                $mrc = curl_multi_exec($mh, $active);
            }while($mrc == CURLM_CALL_MULTI_PERFORM);
        }
    }// 擷取批處理内容
    $errors = [];
    foreach ($handles as $k => $ch) {
        $errors[$k] = curl_error($ch);
        $content = curl_multi_getcontent($ch);
        if ($getCode) {
            $content = curl_errno($ch) ==  && self::isJson($content)? json_decode($content,true) : [];
        }
        $contents = array_merge($contents,$content);

    }
    $info = curl_multi_info_read($mh);*/
    $output = $errors = $infos = [];
    do {
        while (($execrun =  curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);
        if ($execrun != CURLM_OK)
            break;
        while ($done = curl_multi_info_read($mh)) {
            $info= curl_getinfo($done['handle']);
            $infos['http_code'][] = $info['http_code'];
            $result['code'] = $info['http_code'];
            $infos['url'][] = $info['url'];
            $errors[] = curl_error($done['handle']);
            $output = self::isJson(curl_multi_getcontent($done['handle'])) ?
                array_merge($output, json_decode(curl_multi_getcontent($done['handle']),true)) : $output;
            if ($running)
                curl_multi_select($mh, );
        }
    } while ($running);

    $result['result'] = $output;
    $result['exception'] = $errors;
    $result['info'] = $infos;
    foreach ($handles as $ch) {
        curl_multi_remove_handle($mh, $ch);
    }
    curl_multi_close($mh);
    return $result;
}
           

上面的代碼中有一段代碼是注釋掉的,按照道理來說,上面的代碼執行的結果應該和下面的一樣,事實證明,卻是執行的結果是一樣,我這裡說的結果不是多線程傳回的結果,既然是多線程,那麼不同的線程競争到資源也是不一樣的,傳回結果出現了混亂,導出的excel資料并不是根據某種排序而排序的,也就是你不知道那個線程先傳回了結果,這是問題一,其二,在導出的過程中,發現不同程度的丢失資料,加熱管每個線程500條資料,結果在驗證資料時,發現僅僅傳回了300多條資料,資料變動不一緻,第三,過多的資料,依然造成nginx伺服器逾時,錯誤code 504。

PS: 為什麼在php的中沒有使用phpexcel第三方包,原因很簡單,測試發現,phpexcel太耗記憶體,機器吃不消,是以就沒用。

初步解決方案

既然php的多線程方案不能解決問題,隻能找其他的辦法,最可靠的也是大家都能想到的,就是隊列處理,把導出請求放入到隊列中,直接傳回給用戶端,告訴客戶業務正在處理,然後具體的導出交由消費端處理,最後把結果回報到用戶端。

我們都知道php的隊列有很多,常用的比如Swoole,Workman以及Gearman等。我選擇了Gearman,因為友善,而Swoole原來在我們的項目中,後來被踢掉了,不知原由。

Gearman服務端work的代碼demo:

<?php
/**
 * Created by PhpStorm.
 * User: zhoujunwen
 * Date: 16/7/12
 * Time: 下午4:54
 */

namespace console\controllers;

use Yii;
use common\extensions\AppHelper;
use yii\console\Controller;

class ExportController extends Controller
{
    public function actionExport()
    {
        $worker = new \GearmanWorker();
        $worker->addServer();
        $worker->addFunction('export', function (\GearmanJob $job) {
            $workload = $job->workload();

            if (($data = $this->parseJson($workload)) == false) {
                return AppHelper::encodeJson(['code' => '-1', 'result' => null, 'exception' => '參數錯誤']);
            }
            $user = isset($data['user']) && !empty($data['user']) ? $data['user'] : 'guest';
            $path = dirname(Yii::$app->basePath) . '/backend/downloads/' . sha1($user) . '/' . date('Y-m-d') . '/';
            $filename = isset($data['filename']) && !empty($data['filename']) ? $data['filename'] : date('Y-m-d') . '-order.xls';
            $rs = $this->getData($data['type']['data'], $data['type']['count'], $data['api'], $data['params']);
            $this->writeExcel($path, $filename, $rs, $data['header']);
            return ;
        });
        //無際循環運作,gearman内部已有處理,不會出現占用過高死掉的情況
        while ($worker->work()) {
            if ($worker->returnCode() !== GEARMAN_SUCCESS) {
                echo 'error' . PHP_EOL;
            }
        }
    }

    public function parseJson($str)
    {
        $data = json_decode($str, true);
        return (json_last_error() == JSON_ERROR_NONE) ? $data : false;
    }

    public function writeExcel($path, $filename, $data, $header)
    {
        if ($this->mkDir($path)) {
            $data = $this->assembleData($data);
            $rs = $this->generateExcel($header, $data);
            file_put_contents(rtrim($path, '/') . '/' . $filename, $rs);
        } else {
            echo '目錄不存在,寫檔案錯誤!';
        }
        return;

    }

    public function getData($dataApi, $countApi, $api, $params)
    {
        $start = microtime(true);
        $count = AppHelper::getData($api . $countApi . '?' . http_build_query($params));
        echo $api . $countApi . '?' . http_build_query($params).PHP_EOL;
        echo '總條數:' . $count . PHP_EOL;
        $params['perpage'] = ;
        $times = ceil($count / $params['perpage']);
        $data = [];
        if ($count > ) {
            for ($i = ; $i < $times; $i++) {
                $params['page'] = $i + ;
                $rs = AppHelper::getData($api . $dataApi . '?' . http_build_query($params));
                $data = array_merge($data, $rs);
            }
        }
        $end = microtime(true);
        echo "花費時間:" . ($end - $start) . PHP_EOL;
        return $data;
    }

    public function generateExcel($header, array &$data)
    {

        $rs = '<table><tr>';
        if (is_string($header)) {
            $header = explode(',', $header);
        }
        foreach ($header as $v) {
            $rs .= '<th>' . $v . '</th>';
        }
        $rs .= '</tr>';
        foreach ($data as $coll) {
            $rs .= '<tr>';
            foreach ($coll as $v) {
                if (AppHelper::isDouble($v)) {
                    $rs .= '<td style="vnd.ms-excel.numberformat:@">' . $v . '</td>';
                } else {
                    $rs .= '<td>' . $v . '</td>';
                }
            }
            $rs .= '</tr>';
        }

        $rs .= '</table>';

        unset($data);
        return $rs;
    }

    public function assembleData($rs)
    {
        $users = [];
        if ($rs) {
            $uids = array_column($rs, 'uid');
            $us = Yii::$app->get('db')->createCommand('select uid,gender,adminflag,mobile,type from {{%user}} where uid in (' . implode(',', $uids) . ')')->queryAll();
            if ($us && is_array($us)) {
                foreach ($us as $u) {
                    $users[$u['uid']] = $u;
                }
            }
        }
        $content = [];
        foreach ($rs as $k => $v) {
            $data = AppHelper::decodeJson($v['data'], true);
            $status = '已删除';
            if ($v['status'] == ) {
                $status = '已關閉';
            } elseif ($v['status'] == ) {
                $status = '下單';
            } elseif ($v['status'] == ) {
                $status = '付款确認中';
            } elseif ($v['status'] == ) {
                $status = '已付款';
            } elseif ($v['status'] == ) {
                $status = '已發貨';
            } elseif ($v['status'] == ) {
                $status = '已确認收貨';
            } elseif ($v['status'] == ) {
                $status = '已評價';
            } elseif ($v['status'] == ) {
                $status = '支付價格與訂單價格不一緻';
            }
            $refund = '未申請退款';
            if (isset($v['refund'])) {
                if ($v['refund'] == ) {
                    $refund = '退款已到賬';
                } elseif ($v['refund'] == ) {
                    $refund = '賣家已确認但需人工處理';
                } elseif ($v['refund'] == ) {
                    $refund = '同意退款';
                } elseif ($v['refund'] == ) {
                    $refund = '拒絕退款';
                } elseif ($v['refund'] == ) {
                    $refund = '退款申請中';
                } elseif ($v['refund'] == ) {
                    $refund = '未申請';
                } elseif ($v['refund'] == ) {
                    $refund = '退貨退款申請中';
                } elseif ($v['refund'] == ) {
                    $refund = '同意退貨申請';
                } elseif ($v['refund'] == ) {
                    $refund = '拒絕退貨申請';
                } elseif ($v['refund'] == ) {
                    $refund = '買家退貨已發出';
                } elseif ($v['refund'] == ) {
                    $refund = '賣家确認收貨';
                } elseif ($v['refund'] == ) {
                    $refund = '收到貨拒絕退款';
                } elseif ($v['refund'] == ) {
                    $refund = '退貨退款已到賬';
                }
            }
            $gender = '未知';
            if (isset($users[$v['uid']]) && $users[$v['uid']]['gender'] == ) {
                $gender = '男';
            } else if (isset($users[$v['uid']]) && $users[$v['uid']]['gender'] == ) {
                $gender = '女';
            }
            $type = '普通使用者';
            if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == ) {
                $type = '稽核中的匠人';
            } else if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == ) {
                $type = '種子使用者';
            } else if (isset($users[$v['uid']]) && $users[$v['uid']]['adminflag'] == ) {
                $type = '管理者';
            }
            $itype = '未設定/現貨';
            if (isset($data['type'])) {
                if ($data['type'] == ) {
                    $itype = '現貨';
                } else if ($data['type'] == ) {
                    $itype = '定制';
                } else {
                    $itype = '拍賣';
                }
            }
            $utype = isset($users[$v['uid']]['type']) && $users[$v['uid']]['type'] ==  ? '微信購買注冊' : 'APP内注冊';
            $otype = !$v['otype'] ? 'APP内購買' : '微信購買';
            $paytype = !$v['prepaytype'] ? 'APP内付款' : '微信付款';
            $snapshot = AppHelper::getData(Yii::$app->params['imageServer'] . $v['snapshot']);
            $content[] = [date('Y/m/d H:i:s', floor($v['createtm'] / )), $v['ooid'], isset($snapshot['item']['pid']) ? $snapshot['item']['pid'] : '', $v['iid'], $data['title'], $itype, (isset($v['parentCategory']) ? $v['parentCategory'] . '/' : '') . $v['category'],
                $v['craftsman'], $v['suid'], $v['quantity'], $v['username'], $utype, $v['uid'], $data['address'], $status, $refund, $data['price'], $v['realpay'],
                $otype, $paytype, isset($users[$v['uid']]['mobile']) ? $users[$v['uid']]['mobile'] : '未知', $gender, $type];
        }
        return $content;
    }

    public function mkDir($path)
    {
        if (is_dir($path)) {
            echo '目錄' . $path . '已存在!';
            return true;
        } else {
            $res = mkdir($path, , true);
            echo $res ? '目錄建立成功' : '目錄建立失敗';
            return $res;
        }
    }
}
}
           

Gearman的Client端的代碼:

<?php
...

public function exportExcel($str)
{
    $client = new \GearmanClient();
    $client->addServer('127.0.0.1', );

    $client->setCompleteCallback(completeCallBack);
    $result2 = $client->doBackground('export', $str);//異步進行,隻傳回處理句柄。

//        $result1 = $client->do('export', 'do');//do是同步進行,進行處理并傳回處理結果。
//        $result3 = $client->addTask('export', 'addTask');//添加任務到隊列,同步進行?通過添加task可以設定回調函數。
//        $result4 = $client->addTaskBackground('export', 'addTaskBackground');//添加背景任務到隊列,異步進行?

    $client->runTasks();//運作隊列中的任務,隻是do系列不需要runTask()
    return $result2;

}
//綁定回調函數,隻對addTask有效
function completeCallBack($task)
{
    echo 'CompleteCallback!handle result:'.$task->data().'<br/>';
}
           

ps:要運作上面的代碼,需要在伺服器或者本地安裝Gearman服務,并且需要安裝php_gearman擴充,安裝教程自行搜尋。

如果你的業務邏輯不複雜,到此可以導出幾萬條資料綽綽有餘了,然而,我的問題并沒有是以而解決,上司說,不想用Gearman隊列處理,最好還是java處理。嗯,沒關系,我喜歡這種在技術中跳來跳去的解決問題,既然不滿足上司的需求,那就另行方案。

MqRocket+HDFS+POI

說明:這裡用到的java項目都是基于spring+dubbo/dubbox的項目。所用到的配置或者注解均在spring的相關配置和注解範疇,除了mapper的配置和注解。

三個項目:

  • mq項目:提供rest服務,發送消息(@rxl)
  • biz項目:提供dubbo、restfull接口,處理業務(@lee)
  • data項目:處理資料導出

如上,三個項目分别是不同的工程師所寫,我們不關心怎麼實作的,隻需知道我們能使用每個功能即可。

mq提供的restfull接口:

@Path("/message")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
@Component("sendMessageService")
public class SendMessageImpl implements SendMessageService{

    @Resource
    public IProducer producer;

    @PUT
    @Path("send")
    @Consumes({MediaType.APPLICATION_JSON})
    @Override
    public void sendMessage(Message message) {
        System.out.println("message" + message.getMessage());
        producer.send(message.getTopic(),message.getKey(),message.getMessage());
    }
}
           

這樣我們在php背景通過put方式,調用該接口,将需要處理的資料發送給導出處理服務端。發送put請求可以使用curl強大的request功能。

假如mq提供的rest接口是:

http://localhost:8018/mq/message/send

,我們需要傳遞一個json字元串,該字元串原型是一個關聯數組,數組的key分别為“topic”、“key”和“message”,topic是消息的主題,需要指定的mq主題去消費,key是消息的key,該topic下面會有很多key,是以,我們的消費方即資料導出方需要根據key做判斷處理。message裡面就是具體的一下參數,比如需要導出哪些字段,比如檔案上傳伺服器位址等等資訊。

$message = [
    'topic' => 'order_export',
    'key' => 'order_tag_' . $orderNo,
    'message' => [
        'params' => [
            ...
        ],
        'headers' => [
            ...
        ],
        'options' => [
            ...
        ],
    ],

];
                

完整的接口請求:

http://localhost:8018/mq/message/send?{"topic":"order_export","key":"order_tag_","message":{"params":[],"header":[],"options":[]}}

poi工具類封裝

Java的Excel API很多,唯獨Apache POI這款使用最友善最靈活(或許其他的沒有使用過)。

HSSF is the POI Project's pure Java implementation of the Excel '97(-2007) file format. XSSF is the POI Project's pure Java implementation of the Excel 2007 OOXML (.xlsx) file format.

HSSF and XSSF provides ways to read spreadsheets create, modify, read and write XLS spreadsheets. They provide:

  • low level structures for those with special needs
  • an eventmodel api for efficient read-only access
  • a full usermodel api for creating, reading and modifying XLS files

在gradle引入poi包:

// java excel api
compile 'org.apache.poi:poi:3.10.1'
compile 'org.apache.poi:poi-ooxml:3.9'
           
package cn.test.web.utils;

import cn.test.util.Utils;
import org.apache.commons.io.FilenameUtils;
import org.apache.poi.hssf.record.crypto.Biff8EncryptionKey;
import org.apache.poi.hssf.usermodel.HSSFFont;
import org.apache.poi.hssf.usermodel.HSSFFooter;
import org.apache.poi.hssf.usermodel.HSSFHeader;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.apache.poi.poifs.filesystem.POIFSFileSystem;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.Font;
import org.apache.poi.ss.usermodel.Footer;
import org.apache.poi.ss.usermodel.Header;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:02
 */
public class POIUtils {
    private static final short HEADER_FONT_SIZE = 16; // 大綱字型
    private static final short FONT_HEIGHT_IN_POINTS = 14; // 行首字型

    public static Workbook createWorkbook(String file) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        switch (ext) {
            case "xls":
                wb = createHSSFWorkbook();
                break;
            case "xlsx":
                wb = createXSSFWorkbook();
                break;
            default:
                wb = createHSSFWorkbook();
        }
        return wb;
    }

    public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        try {
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(inputStream);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(inputStream);
                    break;
                default:
                    wb = new HSSFWorkbook(inputStream);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    public static Workbook writeFile(Workbook wb, String file) {
        if (wb == null || Utils.isEmpty(file)) {
            return null;
        }
        FileOutputStream out = null;
        try {
            out = new FileOutputStream(file);
            wb.write(out);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return wb;
    }

    public static Workbook createHSSFWorkbook() {
        //生成Workbook
        HSSFWorkbook wb = new HSSFWorkbook();
        //添加Worksheet(不添加sheet時生成的xls檔案打開時會報錯)
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook createXSSFWorkbook() {
        XSSFWorkbook wb = new XSSFWorkbook();
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook openWorkbook(String file) {
        FileInputStream in = null;
        Workbook wb = null;

        try {
            in = new FileInputStream(file);
            wb = WorkbookFactory.create(in);
        } catch (InvalidFormatException | IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return wb;
    }

    public static Workbook openEncryptedWorkbook(String file, String password) {
        FileInputStream input = null;
        BufferedInputStream binput = null;
        POIFSFileSystem poifs = null;
        Workbook wb = null;
        try {
            input = new FileInputStream(file);
            binput = new BufferedInputStream(input);
            poifs = new POIFSFileSystem(binput);
            Biff8EncryptionKey.setCurrentUserPassword(password);
            String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(poifs);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(input);
                    break;
                default:
                    wb = new HSSFWorkbook(poifs);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    /**
     * 追加一個sheet,如果wb為空且isNew為true,建立一個wb
     *
     * @param wb
     * @param isNew
     * @param type  建立wb類型,isNew為true時有效 1:xls,2:xlsx
     * @return
     */
    public static Workbook appendSheet(Workbook wb, boolean isNew, int type) {
        if (wb != null) {
            Sheet sheet = wb.createSheet();
        } else if (isNew) {
            if (type == 1) {
                wb = new HSSFWorkbook();
                wb.createSheet();
            } else {
                wb = new XSSFWorkbook();
                wb.createSheet();
            }
        }
        return wb;
    }


    public static Workbook setSheetName(Workbook wb, int index, String sheetName) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.setSheetName(index, sheetName);
        }
        return wb;
    }

    public static Workbook removeSheet(Workbook wb, int index) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.removeSheetAt(index);
        }
        return wb;
    }

    public static Workbook insert(Workbook wb, String sheetName, int row, int start,
                                  List<?> columns) {
        if (row == 0 || wb == null) return wb;
        for (int i = start; i < (row + start); i++) {
            Row rows = wb.getSheet(sheetName).createRow(i);
            if (columns != null && columns.size() > 0) {
                for (int j = 0; j < columns.size(); j++) {
                    Cell ceil = rows.createCell(j);
                    ceil.setCellValue(String.valueOf(columns.get(j)));
                }
            }
        }
        return wb;
    }

    /**
     * 設定excel頭部
     *
     * @param wb
     * @param sheetName
     * @param columns   比如:["國家","活動類型","年份"]
     * @return
     */
    public static Workbook setHeader(Workbook wb, String sheetName, List<?> columns) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        return setHeaderStyle(insert(wb, sheetName, 1, 0, columns), sheetName);

    }

    /**
     * 插入資料
     *
     * @param wb        Workbook
     * @param sheetName sheetName,預設為第一個sheet
     * @param start     開始行數
     * @param data      資料,List嵌套List ,比如:[["中國","奧運會",2008],["倫敦","奧運會",2012]]
     * @return
     */
    public static Workbook setData(Workbook wb, String sheetName, int start,
                                   List<?> data) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        if (data != null || data.size() > 0) {
            if (data instanceof List) {
                int s = start;
                for (Object columns : data) {
                    insert(wb, sheetName, data.size() - (s - 1), s, (List<?>) columns);
                    s++;
                }
            }
        }
        return wb;
    }

    /**
     * 移除某一行
     *
     * @param wb
     * @param sheetName sheet name
     * @param row       行号
     * @return
     */
    public static Workbook delRow(Workbook wb, String sheetName, int row) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Row r = wb.getSheet(sheetName).getRow(row);
        wb.getSheet(sheetName).removeRow(r);
        return wb;
    }

    /**
     * 移動行
     *
     * @param wb
     * @param sheetName
     * @param start     開始行
     * @param end       結束行
     * @param step      移動到那一行後(前) ,負數表示向前移動
     *                  moveRow(wb,null,2,3,5); 把第2和3行移到第5行之後
     *                  moveRow(wb,null,2,3,-1); 把第3行和第4行往上移動1行
     * @return
     */
    public static Workbook moveRow(Workbook wb, String sheetName, int start, int end, int step) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        wb.getSheet(sheetName).shiftRows(start, end, step);
        return wb;
    }

    public static Workbook setHeaderStyle(Workbook wb, String sheetName) {
        Font font = wb.createFont();
        CellStyle style = wb.createCellStyle();
        font.setBoldweight(HSSFFont.BOLDWEIGHT_BOLD);
        font.setFontHeightInPoints(FONT_HEIGHT_IN_POINTS);
        font.setFontName("黑體");
        style.setFont(font);
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        int row = wb.getSheet(sheetName).getFirstRowNum();
        int cell = wb.getSheet(sheetName).getRow(row).getLastCellNum();
        for (int i = 0; i < cell; i++) {
            wb.getSheet(sheetName).getRow(row).getCell(i).setCellStyle(style);
        }
        return wb;
    }

    public static Workbook setHeaderOutline(Workbook wb, String sheetName, String title) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Header header = wb.getSheet(sheetName).getHeader();
        header.setLeft(HSSFHeader.startUnderline() +
                HSSFHeader.font("宋體", "Italic") +
                "打雞血的口号!" +  // 比如:愛我中華
                HSSFHeader.endUnderline());
        header.setCenter(HSSFHeader.fontSize(HEADER_FONT_SIZE) +
                HSSFHeader.startDoubleUnderline() +
                HSSFHeader.startBold() +
                title +
                HSSFHeader.endBold() +
                HSSFHeader.endDoubleUnderline());
        header.setRight("時間:" + HSSFHeader.date() + " " + HSSFHeader.time());
        return wb;
    }

    public static Workbook setFooter(Workbook wb, String sheetName, String copyright) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Footer footer = wb.getSheet(sheetName).getFooter();
        if (Utils.isEmpty(copyright)) {
            copyright = "中華人民共和國"; // 版權資訊,自己公司的名字或者app的名字
        }
        footer.setLeft("Copyright @ " + copyright);
        footer.setCenter("Page:" + HSSFFooter.page() + " / " + HSSFFooter.numPages());
        footer.setRight("File:" + HSSFFooter.file());
        return wb;
    }

    public static String create(String sheetNm, String file, List<?> header, List<?> data, String title, String copyright) {
        Workbook wb = createWorkbook(file);
        if (Utils.isEmpty(sheetNm)) {
            sheetNm = wb.getSheetAt(0).getSheetName();
        }
        setHeaderOutline(wb, sheetNm, title);
        setHeader(wb, sheetNm, header);
        setData(wb, sheetNm, 1, data);
        setFooter(wb, sheetNm, copyright);
        writeFile(wb, file);
        if (wb != null) {
            return file;
        }
        return null;
    }

    public static String getSystemFileCharset() {
        Properties pro = System.getProperties();
        return pro.getProperty("file.encoding");
    }
    // TODO 後面增加其他設定

}
           

HDFS工具類封裝

Hadoop分布式檔案系統(HDFS)被設計成适合運作在通用硬體(commodity hardware)上的分布式檔案系統。它和現有的分布式檔案系統有很多共同點。但同時,它和其他的分布式檔案系統的差別也是很明顯的。HDFS是一個高度容錯性的系統,适合部署在廉價的機器上。HDFS能提供高吞吐量的資料通路,非常适合大規模資料集上的應用。HDFS放寬了一部分POSIX限制,來實作流式讀取檔案系統資料的目的。HDFS在最開始是作為Apache Nutch搜尋引擎項目的基礎架構而開發的。HDFS是Apache Hadoop Core項目的一部分。

HDFS有着高容錯性(fault-tolerant)的特點,并且設計用來部署在低廉的(low-cost)硬體上。而且它提供高吞吐量(high throughput)來通路應用程式的資料,适合那些有着超大資料集(large data set)的應用程式。HDFS放寬了(relax)POSIX的要求(requirements)這樣可以實作流的形式通路(streaming access)檔案系統中的資料。

在gradle中引入hdfs:

// jersey
    compile 'com.sun.jersey:jersey-core:1.19.1'
    compile 'com.sun.jersey:jersey-server:1.19.1'
    compile 'com.sun.jersey:jersey-client:1.19.1'
    compile 'com.sun.jersey:jersey-json:1.19.1'

    // hadoop
    compile ('org.apache.hadoop:hadoop-common:2.7.2') {
        exclude(module: 'jersey')
        exclude(module: 'contribs')
    }
    compile ('org.apache.hadoop:hadoop-hdfs:2.7.2') {
        exclude(module: 'jersey')
        exclude(module: 'contribs')
    }
    compile ('org.apache.hadoop:hadoop-client:2.7.2') {
        exclude(module: 'jersey')
        exclude(module: 'contribs')
    }`
           
package cn.test.web.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.poi.ss.usermodel.Workbook;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:41
 */
public class HDFSUtils {
    private static FileSystem fs = null;

    public static FileSystem getFileSystem(Configuration conf) throws IOException,
            URISyntaxException {
        fs = FileSystem.get(conf);
        //fs = FileSystem.newInstance(conf);
        return fs;
    }

    /**
     * 判斷路徑是否存在
     *
     * @param conf
     * @param path
     * @return
     * @throws IOException
     */
    public static boolean exits(Configuration conf, String path) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 建立檔案
     *
     * @param conf
     * @param filePath
     * @param contents
     * @throws IOException
     */
    public static void createFile(Configuration conf, String filePath, byte[] contents)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        FSDataOutputStream outputStream = fs.create(path);
        outputStream.write(contents, , contents.length);
        outputStream.hflush();
        outputStream.close();
        fs.close();
    }

    /**
     * 建立檔案
     *
     * @param conf
     * @param filePath
     * @param fileContent
     * @throws IOException
     */
    public static void createFile(Configuration conf, String fileContent, String filePath)
            throws IOException, URISyntaxException {
        createFile(conf, filePath, fileContent.getBytes());
    }

    /**
     * 上傳檔案
     *
     * @param conf
     * @param localFilePath
     * @param remoteFilePath
     * @throws IOException
     */
    public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        fs.copyFromLocalFile(true, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 删除目錄或檔案
     *
     * @param conf
     * @param remoteFilePath
     * @param recursive
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath, boolean recursive)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        boolean result = fs.delete(new Path(remoteFilePath), recursive);
        fs.close();
        return result;
    }

    /**
     * 删除目錄或檔案(如果有子目錄,則級聯删除)
     *
     * @param conf
     * @param remoteFilePath
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath)
            throws IOException, URISyntaxException {
        return deleteFile(conf, remoteFilePath, true);
    }

    /**
     * 檔案重命名
     *
     * @param conf
     * @param oldFileName
     * @param newFileName
     * @return
     * @throws IOException
     */
    public static boolean renameFile(Configuration conf, String oldFileName, String newFileName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path oldPath = new Path(oldFileName);
        Path newPath = new Path(newFileName);
        boolean result = fs.rename(oldPath, newPath);
        fs.close();
        return result;
    }

    /**
     * 建立目錄
     *
     * @param conf
     * @param dirName
     * @return
     * @throws IOException
     */
    public static boolean createDirectory(Configuration conf, String dirName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path dir = new Path(dirName);
        boolean result = fs.mkdirs(dir);
        fs.close();
        return result;
    }

    /**
     * 列出指定路徑下的所有檔案(不包含目錄)
     *
     * @param fs
     * @param basePath
     * @param recursive
     */
    public static RemoteIterator<LocatedFileStatus> listFiles(FileSystem fs, String basePath, boolean recursive)
            throws IOException {

        RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs.listFiles(new Path(basePath), recursive);

        return fileStatusRemoteIterator;
    }

    /**
     * 列出指定路徑下的檔案(非遞歸)
     *
     * @param conf
     * @param basePath
     * @return
     * @throws IOException
     */
    public static RemoteIterator<LocatedFileStatus> listFiles(Configuration conf, String basePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(basePath), false);
        fs.close();
        return remoteIterator;
    }

    /**
     * 列出指定目錄下的檔案\子目錄資訊(非遞歸)
     *
     * @param conf
     * @param dirPath
     * @return
     * @throws IOException
     */
    public static FileStatus[] listStatus(Configuration conf, String dirPath) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath));
        fs.close();
        return fileStatuses;
    }


    /**
     * 讀取檔案内容并寫入outputStream中
     *
     * @param conf 配置
     * @param filePath 檔案路徑
     * @param os 輸出流
     * @return
     * @throws IOException
     */
    public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        try (FSDataInputStream inputStream = fs.open(path)) {
            Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream);
            wb.write(os);
            inputStream.close();
        } finally {
            fs.close();
        }
    }

    /**
     * 讀取檔案内容并傳回
     * @param conf
     * @param filePath
     * @return
     * @throws IOException
     * @throws URISyntaxException
     */
    public static String readFile(Configuration conf, String filePath) throws IOException,
            URISyntaxException {
        String fileContent = null;
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        InputStream inputStream = null;
        ByteArrayOutputStream outputStream = null;
        try {
            inputStream = fs.open(path);
            outputStream = new ByteArrayOutputStream(inputStream.available());
            IOUtils.copyBytes(inputStream, outputStream, conf);
            byte[] lens = outputStream.toByteArray();
            fileContent = new String(lens, "UTF-8");
        } finally {
            IOUtils.closeStream(inputStream);
            IOUtils.closeStream(outputStream);
            fs.close();
        }
        return fileContent;
    }
}
                

對于hdfs我單獨有謝了兩個類,一個是HDFSFileUploader,一個是Configuration。如類名,前者用于檔案上傳,後者用于hdfs的配置。

HDFSFileUploader

package cn.test.web.utils.hadoop;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.common.util.Utils;
import cn.test.web.utils.HDFSUtils;
import org.apache.commons.lang.NullArgumentException;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.UUID;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:42
 */
public class HDFSFileUploader {
    public static final byte FROM_LOCAL_COPY = ; // 從本地上傳檔案
    public static final byte FROM_CONTENT_WRITE = ; // 讀取字元串或位元組,生成檔案

    private static final Log LOGGER = LogFactory.getLog(HDFSFileUploader.class);
    private static final String HDFS_SCHEMA = "hdfs://";
    private static final String SEPARATOR = "/";
    private static final String SUFFIX_PREFIX = ".";

    private static final int BUFFER_SIZE = ;
    private static final Configuration CONF = new Configuration();


    /**
     * 上傳二進制檔案,使用預設配置的域名,随機生成檔案名
     *
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String path, String suffix, byte[] contents) {
        return upload(null, path, suffix, contents);
    }

    /**
     * 上傳二進制檔案,随機生成檔案名
     *
     * @param domain
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String domain, String path, String suffix, byte[] contents) {
        return upload(domain, path, null, suffix, contents);
    }

    /**
     * 上傳二進制檔案,指定檔案名,隻能通過流上傳
     *
     * @param domain
     * @param path
     * @param filename
     * @param suffix
     * @param content
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix,
                                final byte[] content) {
        return upload(domain, path, filename, suffix, new String(content), FROM_CONTENT_WRITE);
    }

    /**
     * 上傳檔案,預設域名和随機檔案名
     *
     * @param path
     * @param suffix
     * @param src
     * @return
     */
    public static String upload(String path, String suffix, String src, byte fromLocal) {
        return upload(null, path, suffix, src, fromLocal);
    }

    /**
     * 上傳檔案到指定域名的指定目錄,檔案名随機生成
     *
     * @param domain 域名,比如 10.25.126.28:9000
     * @param path   檔案路徑,比如 /usr/local/com.hd.test/2016-08-08/
     * @param suffix 檔案字尾,比如 .xsl,xsl
     * @param src    檔案内容,字元串 || 本地檔案路徑
     * @return String 完整的檔案名
     */
    public static String upload(String domain, String path, String suffix, String src, byte
            fromLocal) {
        return upload(domain, path, null, suffix, src, fromLocal);
    }

    /**
     * 上傳檔案,指定了域名,路徑,檔案名,字尾
     *
     * @param domain   域名
     * @param path     路徑
     * @param filename 檔案名
     * @param suffix   字尾
     * @param src      内容 || 本地路徑
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix, String
            src, byte fromLocal) {
        String filePath = getRealAddr(domain, path, suffix, filename);
        System.out.println(filePath);
        try {
            switch (fromLocal) {
                case FROM_LOCAL_COPY:
                    HDFSUtils.copyFromLocalFile(CONF, src, filePath);
                    break;
                case FROM_CONTENT_WRITE:
                    HDFSUtils.createFile(CONF, src, filePath);
                    break;
            }
            return filePath;
        } catch (IOException | URISyntaxException e) {
            LOGGER.warn("上傳檔案失敗:{}",e.getMessage());
        }
        return null;
    }

    /**
     * 檔案完整的路徑
     *
     * @param domain   域名
     * @param path     目錄路徑
     * @param suffix   字尾
     * @param filename 檔案名
     * @return
     */
    private static String getRealAddr(String domain, String path, String suffix, String filename) {
        if (!Utils.isEmpty(domain) && !domain.startsWith(HDFS_SCHEMA)) {
            domain = HDFS_SCHEMA + domain;
        } else {
            domain = "";
        }
        path = getPath(path);
        filename = getFilename(filename, suffix);
        return String.format("%s%s%s", domain, path, filename);

    }

    /**
     * 檔案路徑
     *
     * @param path
     * @return
     */
    private static String getPath(String path) {
        if (Utils.isEmpty(path)) {
            throw new NullArgumentException("path id null");
        }
        if (!path.startsWith(SEPARATOR)) {
            path = SEPARATOR + path;
        }
        if (!path.endsWith(SEPARATOR)) {
            path = path + SEPARATOR;
        }
        return path;
    }

    /**
     * 生成檔案名
     *
     * @param filename
     * @param suffix
     * @return
     */
    private static String getFilename(String filename, String suffix) {
        if (Utils.isEmpty(filename)) {
            filename = generateFilename();
        }
        if (!Utils.isEmpty(suffix)) {
            filename = suffix.equals(SEPARATOR) ? filename : (filename.endsWith(suffix) ?
                    filename : ((filename.endsWith(SUFFIX_PREFIX)
                    || suffix.startsWith(SUFFIX_PREFIX)) ? filename + suffix
                    : filename + SUFFIX_PREFIX + suffix));
        }
        return filename;
    }

    /**
     * 生成檔案名
     *
     * @return
     */
    private static String generateFilename() {
        return getUuid(false);
    }

    /**
     * 生成UUID
     *
     * @param isNeedHyphen
     * @return
     */
    public static String getUuid(boolean isNeedHyphen) {
        UUID uuid = UUID.randomUUID();
        String str = uuid.toString();
        if (isNeedHyphen) {
            str = str.replaceAll("-", "");
        }
        return str;
    }

    public static void setConfResource(final Configuration config) {
        CONF.addResource(config);
    }
}
           

HDFSFileUploader中的一系列方法,用于上傳不同類型的檔案,比如二進制檔案,字元串等,還有hdfs的copy本地檔案以及檔案名uuid生成等方法。

Configuration

package cn.test.web.utils.hadoop;

import cn.test.web.utils.CommonUtils;
import org.apache.commons.io.FilenameUtils;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 上午9:30
 * 建議使用方法:
 * <bean id="hadoopConfig" class="cn.test.util.hadoop.Configuration">
 * <property name="resources">
 * <list>
 * <value>classpath:/spring/core-site.xml</value>
 * </list>
 * </property>
 * </bean>
 * 在使用的地方直接注入hadoopConfig:
 *
 * @Resource private Configuration hadoopConfig;
 */
public class Configuration extends org.apache.hadoop.conf.Configuration {
    private Resource[] resources;

    public void setResources(List<String> filenames) throws IOException {
        List<Resource> resources = new ArrayList<>();
        if (filenames != null && filenames.size() > ) {
            for (String filename : filenames) {
                filename = filename.trim();
                String realName = getFileName(filename);
                String ext = FilenameUtils.getExtension(realName);
                if (ext.equals("xml")) {
                    PathMatchingResourcePatternResolver pathMatchingResourcePatternResolver =
                            new PathMatchingResourcePatternResolver();
                    try {
                        Resource[] resourceList = pathMatchingResourcePatternResolver.getResources(filename);
                        Collections.addAll(resources, resourceList);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        for (Resource resource : resources) {
            this.addResource(resource.getURL());
        }
    }

    private String getFileName(String fileName) {
        return CommonUtils.getFileName(fileName);
    }
}
           

這個類很簡單,其實是內建了hadoop的org.apache.hadoop.conf.Configuration類,目的是為了在spring配置檔案中,靈活的指定hadoop的配置檔案,所用到的就是org.apache.hadoop.conf.Configuration的addResource(String name)方法,下面是在spring xml中的配置。

<!-- hadoop配置 -->
    <bean id="hadoopConfig" class="cn.test.web.utils.hadoop.Configuration">
        <property name="resources">
            <list>
                <value>classpath:META-INF/hadoop/*.xml</value>
            </list>
        </property>
    </bean>
           

導出訂單處理(mq消費端)

package cn.test.web.mq.consumer;
... // 很多依賴包

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 下午2:14
 */
public class OrderExportHandler implements IMessageHandler<String, String> {

    private static final Log LOGGER = LogFactory.getLog(OrderExportHandler.class);
    private static final int MUL_SEC = ;
    private static final Gson GSON = new Gson();
    
    @Value("${image_server}") 
    private String imageServer;
    @Autowired
    private DataManager manager;
 
    @Override
    public void handle(final String key, final String message) {
        System.out.println("message" + message);
        Pattern p = Pattern.compile("-");
        String[] skey = p.split(key);
        if (skey.length < ) {
            return;
        }
        int res = insert(skey[], skey[], skey[]);
        LOGGER.debug("主鍵:{}", res);
        if (res > ) {
            //插入資料成功,執行導出資料邏輯
            Map data = manager.parseData(message);
            List<?> header = null;
            List<?> content = null;
            List<Order> orders = null;

            DataExportLog log = new DataExportLog();
            log.setDelid(res);
            log.setUid(Integer.valueOf(skey[]));

            if (data.containsKey("params")) {
                LOGGER.debug("params:{}", data.get("params"));
                orders = manager.getOrders(data.get("params"));
                LOGGER.debug("導出資料的條數:{}", orders.size());
            }
            if (orders == null || orders.size() == ) {
                log.setStatus((byte) );
            } else if (data.containsKey("header") && (data.get("header") instanceof Map)) {
                Object obj = data.get("header");
                Map<String, List> map = (obj instanceof Map) ?
                        manager.parseHeader((Map<String, String>) obj) : null;

                if (map != null && map.size() > ) {
                    if (map.containsKey("header")) {
                        header = getHeader(map.get("header"));
                    }
                    if (map.containsKey("key")) {
                        content = getContent(orders, map.get("key"));
                    }
                }
                // 調用hdfs 接口,上傳檔案
                if (!Utils.isEmpty(header) || !Utils.isEmpty(content)) {
                    // 生成excel檔案
                    String fName = getFilename(data);
                    String localFile = manager.writeExecelFile(fName, header, content, null, null);
                    String file = manager.copyFileFromLocal(skey[], localFile);

                    if (Utils.isEmpty(localFile) || Utils.isEmpty(file)) {
                        log.setStatus((byte) );
                    } else {
                        log.setStatus((byte) );
                        log.setLink(file);
                    }
                    LOGGER.info("本地臨時檔案:{}", localFile);
                    LOGGER.info("上傳到hadoop伺服器中的檔案:{}", file);
                }

            }
            update(log);
        }
    }
    
    // TODO 
    // 處理資料,這裡面會調用biz項目的dubbo接口
    // 具體的操作不在這裡面寫
    
}
           

訂單導出邏輯都在上面的類,以及DataManager中進行處理,期間擷取資料等接口則由biz項目的dubbo接口提供,具體業務邏輯在此不涉及。

下面會給出

manager.writeExecelFile(fName, header, content, null, null);

方法和

manager.copyFileFromLocal(skey[0], localFile);

方法的code:

public String writeExecelFile(String filename, List<?> header, List<?> datas, String title, String copyright) {
    SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd");
       String date = sd.format(new Date());
    if (Utils.isEmpty(filename)) {
        filename = HDFSFileUploader.getUuid(true) + this.ext;
    }
    String filePath = this.tmpDir + "/" + date + "/" + filename;
    filePath = filePath.replaceAll("//", "/");
    File f = new File(CommonUtils.getFilePath(filePath));
    if (!f.exists() && !f.isDirectory()) {
        f.mkdir();
    }
    if (Utils.isEmpty(title)) {
        title = DEFAULT_TITLE;
    }
    if (Utils.isEmpty(copyright)) {
        copyright = this.copyright;
    }
    return POIUtils.create(null, filePath, header, datas, title, copyright);
}
           

writeExecelFile

方法調用了poi的create方法,此時臨時檔案已生成。

還有一點需要說一下,比如臨時路徑,上傳到hdfs的路徑,版權資訊等最好是在配置檔案中可配置的,這就依賴予spring的

org.springframework.beans.factory.config.PropertyPlaceholderConfigurer

類,他可以做到,我們隻需要在代碼中這麼寫并且在properties檔案中寫入相應的配置即可:

@Value("${hdfs_upload_dir}")
    private String uploadDir;

    @Value("${file_tmp_dir}")
    private String tmpDir;

    @Value("${copyright}")
    private String copyright;

    @Value("${default_file_ext}")
    private String ext;
           

再看看

copyFileFromLocal

這個方法:

/**
     * 寫hdfs檔案
     *
     * @param type
     * @param file
     * @return
     */
    public String copyFileFromLocal(String type, String file) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        String date = format.format(new Date());
        String path = this.uploadDir + type + '/' + date + '/';
        HDFSFileUploader.setConfResource(hadoopConfig);
        return HDFSFileUploader.upload(path, this.ext, file, HDFSFileUploader.FROM_LOCAL_COPY);
    }
           

這個方法中調用了HDFSFileUploader.upload的方法,即上面展示的一個封裝類中的方法。需要注意的是,這地方注入了hadoop的配置檔案

HDFSFileUploader.setConfResource(hadoopConfig);

。而hadoop得Configuration這樣引入在DataMananager類中:

@Resource
private Configuration hadoopConfig;
           

到此,我們把生成的excel檔案上傳到了hdfs的指定檔案路徑。可以使用hadoop用戶端的指令檢視:

hadoop fs -ls /cn/test/order/ (這裡是上傳路徑)
           

訂單導出(下載下傳)

訂單導出,這裡由java後端直接提供rest接口,如果使用php的hdfs第三方包phdfs(github),用起來并不那麼書順暢,編譯時報錯。

好吧,看看這個接口是怎麼寫的:

package cn.test.web.impl;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.util.Utils;
import cn.test.web.manager.DataManager;
import cn.test.web.service.DownloadService;
import cn.test.web.utils.CommonUtils;
import com.alibaba.dubbo.rpc.protocol.rest.support.ContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/16
 * Time 下午5:21
 */
@Path("download")
@Component("downloads")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
public class DownloadServiceImpl implements DownloadService {
    private static final Log LOGGER = LogFactory.getLog(DownloadServiceImpl.class);
    @Autowired
    private DataManager manager;
    @Override
    @GET
    @Path("order")
    public void down(@Context HttpServletResponse response, @QueryParam("url") String url,
                             @QueryParam("uid") Integer uid) {
        LOGGER.debug("下載下傳位址:{}", url);
        if (Utils.isEmpty(url)) {
            return;
        }
        String filename = CommonUtils.getFileName(url);
        // 設定頭部
        response.setContentType(MediaType.APPLICATION_OCTET_STREAM);
        response.setContentType("application/vnd.ms-excel;charset=gb2312");
        response.setHeader("Content-Disposition", "attachment;filename=" + filename);
        try {
            // 讀取并寫入下載下傳資料
            manager.readFile(url, response.getOutputStream());
            response.flushBuffer();
        } catch (IOException | URISyntaxException e) {
            LOGGER.error(e.getMessage());
        }
    }
}
           

PHP頁面隻需要一個超級連結即可。優化了一下,線上接口全部走内網的,是以,在a标簽中不可能直接把該接口的ip暴露出去,是以在nginx伺服器做了代理配置,隻需要通路一個downloads/order?url=xxx&uid=xxx即可。

location /downloads/ {
    proxy_pass http://.:/presentation/download/;
}
           

踩過的坑

多線程擷取調用biz接口

public List<Order> getOrders(Object params) {
        OrderSearch search = null;
        if (params != null && (params instanceof Map)) {
            System.out.println("params:" + params);
            search = GSON.fromJson(GSON.toJson(params), OrderSearch.class);
            System.out.println("title:" + search.getTitle());
        } else {
            search = new OrderSearch();
        }
        int count = orderService.searchCount(search);
        int cycleTimes = (int) Math.ceil(count *  / TIMES_IN_SIGNEL_PROCESSOR);
        LOGGER.debug("資料總條數count:{},外部循壞執行次數:times:{}", count, cycleTimes);
        // 擷取所有并發任務的運作結果
        List<Order> orders = new ArrayList<>();
        int page = ;
        for (int j = ; j < cycleTimes; j++) {
            int signel = (count > TIMES_IN_SIGNEL_PROCESSOR) ? TIMES_IN_SIGNEL_PROCESSOR : count;
            count = count - signel;
            int poolNum = (int) Math.ceil(signel *  / LIMIT);
            LOGGER.debug("線程池數量:{}", poolNum);
            // 建立一個線程池
            ExecutorService pool = Executors.newFixedThreadPool(poolNum);
            // 建立多個有傳回值的任務
            List<Future> list = new ArrayList<Future>();
            for (int i = ; i < poolNum; i++) {
                Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search);
                // 執行任務并擷取Future對象
                Future f = pool.submit(c);
                list.add(f);
            }
            // 關閉線程池
            pool.shutdown();
            try {
                Thread.sleep(THREAD_SLEEP);
            } catch (InterruptedException e) {
                LOGGER.debug("線程休眠時,引起中斷異常:{}", e.getMessage());
            }
            for (Future f : list) {
                // 從Future對象上擷取任務的傳回值
                try {
                    orders.addAll((Collection<? extends Order>) f.get());
                    LOGGER.debug(">>>線程:{}傳回的資料條數:{}", f.toString(),
                            ((Collection<? extends Order>) f.get()).size());
                } catch (InterruptedException | ExecutionException e) {
                    LOGGER.warn("調用OrderService接口的search方法失敗:{}", e.getMessage());
                    return null;
                }
            }

        }

        return orders;
    }
           

該方法是一個多線程調用dubbo接口,傳回訂單資料。在調用

Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search);

這個方法之後,發現每次擷取的資料都是最好設定的過濾條件,比如分頁,不管傳入的page是1還是2,假如最後一次傳入的是5,那麼起作用的就是5,并不是1或者2,原因到底出在哪裡呢?經過列印日志,發現OrderExportCallable類中,傳入的參數如果是對象,則是引用傳遞,不管哪一個線程去修改,都會修改原來的對象屬性值,是以,問題找到了,解決方法也就出來了。

/**
 * 多線程處理資料
 */
class OrderExportCallable implements Callable<List<Order>> {
    private static final int THREAD_SLEEP = ;

    private String taskNum;
    private int page;
    private int limit;
    private OrderService orderService;
    private OrderSearch orderSearch;

    OrderExportCallable(String taskNum, int page, int limit, OrderService orderService,
                        OrderSearch orderSearch) {
        this.taskNum = taskNum;
        this.page = page;
        this.limit = limit;
        this.orderService = orderService;
        this.orderSearch = orderSearch;
    }


    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    @Override
    public List<Order> call() throws Exception {
        System.out.println(">>>" + taskNum + "任務啟動");
        Thread.sleep(THREAD_SLEEP);
        OrderSearch os = new OrderSearch(); 
        BeanUtils.copyProperties(orderSearch, os);
        os.setPage(this.page);
        os.setLimit(this.limit);
        return orderService.search(os);
    }
}
           

在OrderExportCallable重新new一個對象,把傳入的對象屬性copy到新建立的對象即可。至于為什麼選擇Future,因為Future線程執行完會有傳回結果。而且為了處理資料的順序性,将Future對象加入到list,等待結果傳回,依次處理傳回結果。

如果資料量過大,超過并發線程單次請求的數量,則需要等待結果傳回,重新建立線程。每次請求500條資料,如果有1萬條,那麼開20個線程,這樣就有點不劃算了,是以1萬條資料分成兩次執行,每次10個并發線程。是以,在建立線程的時候使用了兩次for循環。

PS:這裡需要優化的是如何讓傳回結果的線程不關閉,繼續執行下一次請求,直到沒有後續的請求再關閉線程,減少建立線程的資源消耗。

導出excel資料流問題

導出資料的時候,一開始想到的是把excel讀取成流,轉換成字元串直接由rest響應到前端,但是這個方法失敗了,無論如何,導出的excel都是亂碼。

問題:HDFS讀取excel内容出現亂碼

上面有相關嘗試過的代碼,各種常用流都嘗試過,均失敗了,就在我絕望的時候,上司@hsj幫我解決了此問題。在rest接口中看到這句code了嗎?

是的,就是傳入一個response的output流,但是,僅僅這句話還不能解決此問題,繼續往下看:

public void readFile(String filename, OutputStream os) throws IOException, URISyntaxException {
    if (HDFSUtils.exits(hadoopConfig, filename)) {
       HDFSUtils.readFile(hadoopConfig, filename, os);
    }
}
           

就是上面封裝的hdfs工具類的一個方法:

/**
     * 讀取檔案内容并寫入outputStream中
     *
     * @param conf 配置
     * @param filePath 檔案路徑
     * @param os 輸出流
     * @return
     * @throws IOException
     */
    public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        try (FSDataInputStream inputStream = fs.open(path)) {
            Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream);
            wb.write(os);
            inputStream.close();
        } finally {
            fs.close();
        }
    }
           

然後調用poi工具類的方法:

public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
    String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
    Workbook wb = null;
    try {
        switch (ext) {
            case "xls":
                wb = new HSSFWorkbook(inputStream);
                break;
            case "xlsx":
                wb = new XSSFWorkbook(inputStream);
                break;
            default:
                wb = new HSSFWorkbook(inputStream);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    return wb;
}
           

我們看到,最終調用了poi的createWorkbookByIS方法,而該方法僅僅做了一件事,就是根據檔案擴充名建立了一個已有輸入流的Workbook對象,然後readFile将調用Workbook對象的write方法,将輸入流寫入到輸出列,并且response到request請求。同時,在rest接口中指定了請求響應的内容類型:

response.setContentType("application/vnd.ms-excel;charset=gb2312");

POI導出大量資料卡死的現象(記憶體不足導緻的)

public static Workbook createWorkbook(String file) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = createSXSSFWorkbook(MEM_ROW);
        /*switch (ext) {
            case "xls":
                wb = createHSSFWorkbook();
                break;
            case "xlsx":
                wb = createXSSFWorkbook();
                break;
            default:
                wb = createHSSFWorkbook();
        }*/
        return wb;
    }
    
    public static Workbook createSXSSFWorkbook(int memRow) {
        Workbook wb = new SXSSFWorkbook(memRow);
        Sheet sheet = wb.createSheet();
        return wb;
    }                

使用SXSSFWorkbook建立wb對象。

java.lang.OutOfMemoryError:GC overhead limit exceeded

java.lang.OutOfMemoryError:GC overhead limit exceeded填坑心得

java.lang.OutOfMemoryError: Java heap space解決方法

org.apache.poi.poifs.filesystem.OfficeXmlFileException: The supplied data appears to be in the Office 2007+ XML. You are calling the part of POI that deals with OLE2 Office Documents. You need to call a different part of POI to process this data (eg XSSF instead of HSSF)

java解析擷取Excel中的資料--同時相容2003及2007

public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        try {
            wb = new XSSFWorkbook(inputStream);
        } catch (Exception e) {
            try {
                wb = new HSSFWorkbook(inputStream);
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
        return wb;
    }
           

遺留問題待解決

PS:然而,這篇文章所描述的内容并沒有徹底解決大資料導出問題,比如,此刻導出的資料如果達到上萬條,CPU吃緊記憶體爆滿(4核,32G),還有一個吻頭疼的問題,導出4w+的資料需要3個小時,逆天了,這個還不是重點,重點是下載下傳的慢的要死,4w的資料能導出1G之多。

繼續閱讀