天天看點

C#碼農的大資料之路 - 使用C#編寫MR作業

系列目錄

寫在前面

從Hadoop出現至今,大資料幾乎就是Java平台專屬一般。雖然Hadoop或Spark也提供了接口可以與其他語言一起使用,但作為基于JVM運作的架構,Java系語言有着天生優勢。而且能找到的與大資料架構如Hadoop等使用介紹的文章也都以Java語言作為示例居多。許多C#er為了轉投大資料懷抱也開始學習Java。微軟為了擁抱大資料在這方面也做了許多,提供了一些工具及庫使C#可以更好的與Hadoop等協同工作。本系列中我們一同學習如何以我們熟悉語言來使用Hadoop等大資料平台,畢竟大資料的思想是一緻的,算法是固定的,語言隻是一個工具。做好準備,出發。

本文先來介紹下微軟這些年來在大資料平台上的工作。從中可以了解我們有哪些工具可用,方向是什麼。

微軟的大資料政策

本部分内容參考了各種官方文檔,新聞報道總結而成,雖然已經努力確定正确,但難免出現疏漏,如果有錯誤請各位指出。

微軟的大資料政策大概分了兩個階段。

早期

微軟在Hadoop出現不久就已經開始關注,在Azure雲服務出現後,微軟實作一個Windows平台的Hadoop發行版名為HDInsight,并且在Azure中提供了名為HDInsight的大資料服務。在應用開發中,微軟提供了名為

Microsoft HDInsight Emulator for Windows Azure

的工具來支援開發過程中的調試。這個工具已經過時,不過現在仍然可以使用Web Platform Installer 5.0來安裝。實際上這個時期的微軟版Hadoop - HDInsight是基于Hortonworks Data Platform for Windows開發的,HDInsight在這個時期也是Windows和Azure平台專屬。

這個時期微軟還提供了一系列的Framework用于簡化使用C#開發Hadoop(準确的說是微軟版Hadoop - HDInsight),最初放在Codeplex上,稱為

Microsoft .NET SDK For Hadoop

,現在裡面有部分API已經過時了,像是

Microsoft.Hadoop.Client

等。這些API提供使用C#編寫MapReduce的功能,但僅能用于基于Windows的HDInsight。

當下

後來微軟擁抱開源,擁抱Linux,微軟的産品也不再單一的限于Windows平台或着以.NET作為開發架構。對于HDInsight的開發微軟依然繼續與Hortonworks合作,但這時的HDInsight(3.4版以後)隻有Linux版本。(微軟擁抱Linux,Hortonworks的HDP發行版都沒有Windows版了)運作在Azure上的HDInsight服務也都是跑在Linux伺服器上。對于本地開發可以使用Hortonworks提供的HDP(Hortonworks Data Platform) SandBox作為虛拟環境。HDP SandBox提供了3種不同運作環境的鏡像,VirtualBox、VMware及Docker。樓主比較喜歡Docker,有關Docker版HDP SandBox的配置及使用VS連接配接這個虛拟環境的方法詳見此文。

關于HDP

HDP是一種大資料棧的發行版,包含了如Hadoop,Spark,Storm等元件。同類發行版中,可能更為人所知的是Cloudera所出的發行版CDH。下面放一張Hortonworks官網的圖,2.5版本的HDP包含的元件一目了然:

C#碼農的大資料之路 - 使用C#編寫MR作業

MapReduce

微軟最早開始支援使用自己的技術棧如C#和.NET開發大資料程式就是從MapReduce開始的。時至今日,微軟一共提供了兩種不同的方式讓C#編寫的MapReduce任務可以在Hadoop叢集上執行,當然這些API也都是基于Hadoop Streaming,因為不管是基于Windows的大資料叢集,還是基于Linux的大資料叢集,它們都是運作于JVM之上。至少在很長一段時間内.NET CLR都不能和JVM共同工作。(當下,微軟文檔中也明确提到使用Hadoop Streaming由于資料在JVM和其他運作環境如.NET CLR之間傳輸會導緻性能損失,微軟也建議使用Java來編寫MR程式,文檔中的也有Java編寫MR的示例)

随着微軟擁抱Linux,基于Windows的大資料叢集不再被支援(HDP for Windows也不再有後續版本了),微軟也全面轉向基于Linux的大資料叢集(包括部署在Azure中的HDInsights也都是運作在Linux系統之上),這些C#寫的API都不再被支援(主要原因還是這些基于.NET Framework的程式無法運作在Linux上,隻能等未來.NET Core普及了)。

雖然這些API都已過時,但為了讓大家了解C#技術棧這麼多年來掙紮在大資料邊緣的過程。下面對它們都進行了簡單的介紹。

Hadoop API for .NET

以下代碼示例主要來自CodePlex上那篇多年沒有更新的文章。

Hadoop API for .NET是微軟推出的第一套用于Hadoop的.NET庫。Hadoop元件中Hadoop Streaming用來支援與其它語言協同工作完成MapReduce(按國際慣例下文縮寫為MR)任務的編寫。Hadoop API for .NET包裝了Hadoop Streaming友善使用.NET平台語言來編寫MR任務。

使用Hadoop API for .NET:

  • 可以更友善的送出MR任務,而不用通過Hadoop Streaming指令行
  • 提供了

    Mapper

    Reducer

    Combiner

    更好的包裝類作為基類,友善MR的編寫
  • 自動包含依賴的.NET程式集一起作為streaming任務送出
  • 提供

    StreamingUnit

    進行對

    Mapper

    Reducer

    Combiner

    的本地單元測試
  • 通過JSON序列化及反序列化,支援輸入輸出的強類型

Hadoop API for .NET支援的存儲包括HDFS和Azure Blob Storage。輸入内容的格式就是慣常的\n\r分割行(記錄),\r分割列。

如果既有Mapper又有Reducer則所需要的Key Value中的Key為純文字,以便通過

StringComparison.Ordinal

進行排序并存儲。

其它情況下記錄可以是任意結構化資料的文本表示,如字元串表示的JSON資料,如果是二進制資料(如docx檔案),則記錄将是檔案存儲的路徑。

時至今日,這其中大部分API都已被廢棄。

MR程式組成

一個.NET編寫的MR程式包含如下幾部分:

  • 任務定義,包括

    MapperType

    ReducerType

    CombinerType

    和其它設定
  • Mapper

    Reducer

    Combiner

  • 存儲于HDFS或Azure Blob Storage的輸入資料
  • 任務執行器。可以通過

    MRRunner.exe

    運作.NET MR任務,也可以在Main函數中調用

    HadoopJobExecutor

編寫的.NET MR任務(本示例僅有Mapper)

  1. 建立一個名為

    HadoopNET

    的C#控制台應用程式,查找名為

    Microsoft.Hadoop.MapReduce

    的Nuget包并安裝。這将向項目中添加

    Microsoft.Hadoop.MapReduce.dll

    MRRunner.exe

    等工具。
  2. 開始編寫Mapper。建立一個名為FirstMapper的類并實作MapperBase這個基類。
    public class FirstMapper : MapperBase
     {
         public override void Map(string inputLine, MapperContext context)
         {
             // 輸入
             int inputValue = int.Parse(inputLine);
             // 任務
             var sqrt = Math.Sqrt(inputValue);
             // 寫入輸出
             context.EmitKeyValue(inputValue.ToString(), sqrt.ToString());
         }
     }
               
VS或Resharper的重構功能會自動添加

using Microsoft.Hadoop.MapReduce;

這個引用
  1. 建立任務類

    FirstJob

    ,該類實作

    HadoopJob<FirstMapper>

    public class FirstJob : HadoopJob<FirstMapper>
     {
         public override HadoopJobConfiguration Configure(ExecutorContext context)
         {
             HadoopJobConfiguration config = new HadoopJobConfiguration();
             config.InputPath = "input/SqrtJob";
             config.OutputFolder = "output/SqrtJob";
             return config;
         }
     }
               

配置測試運作環境

要運作這個示例,需要在Windows上安裝前面提到的

Microsoft HDInsight Emulator for Windows Azure

。這個工具需要通過Microsoft Web Platform Installer來安裝,搜尋hdinsight,結果第一項就是我們要安裝的工具:

C#碼農的大資料之路 - 使用C#編寫MR作業

點選添加 - 安裝,耐心等候個半小時(需要下載下傳大約1.2G的檔案)

C#碼農的大資料之路 - 使用C#編寫MR作業

這其中最重要的部分就是用于Windows平台的HDP(Hortonworks Data Platform for Windows),其所使用的是OpenJDK1.7的一個分支 - AZUL公司的Zulu。

安裝完成後,桌面會多出3個快捷方式:

  • Hadoop Command Line
  • Hadoop Name Node Status
  • Hadoop YARN Status

Hadoop Command Line是一個指令行的快捷方式,在這個控制台中我們可以直接使用如

hadoop fs

這樣的指令。預設安裝下控制台進入後目錄為

C:\hdp\hadoop-2.4.0.2.1.3.0-1981

hadoop

等應用程式就在這個目錄下的

bin

目錄中。

這個目錄的上級目錄中(

C:\hdp

)包含了一些批處理及PowerShell腳本,我們就使用其中的

start_local_hdp_services.cmd

stop_local_hdp_services.cmd

來起停這個Windows上的大資料叢集。

運作

start_local_hdp_services.cmd

輸入下面的資訊表示已經成功啟動。

starting zkServer
starting namenode
starting secondarynamenode
starting datanode
starting resourcemanager
starting timelineserver
starting nodemanager
starting jobhistoryserver
starting hiveserver2
starting metastore
starting derbyserver
starting templeton
starting oozieservice
Sent all start commands.
total services
13
running services
13
           

運作另外兩個快捷方式分别可以看到HDFS和YARN的狀态。

為了運作後面的程式,還需要給HDFS建立一個主目錄

hadoop fs -mkdir -p /user/username #username是目前登陸的Windows使用者名
           

建立測試資料并運作MR任務

為了運作程式,首先向HDFS中添加測試輸入資料。建立一個文本檔案

input.txt

,每行一個數字,類似:

9
16
25
36
81
           

将該檔案上傳到

FisrtJob

中配置的那個目錄:

hadoop fs -mkdir input
hadoop fs -mkdir input/SqrtJob #如果目錄不存在,先建立
hadoop fs -put input.txt input/SqrtJob/input.txt
           

剩下的就是編譯這個程式并通過MRRunner執行,我們測試項目的名字為HadoopNet,編譯後得到HadoopNet.exe。

通過代碼可以看到,我們沒有在Main函數中實作任何代碼,我們要介紹的第一種送出作業的方式是使用MRRunner來送出,将輸出的exe作為dll使用,運作下面的指令:

MRRunner.exe -dll HadoopNET.exe
           

不出意外的話,MR會開始執行。輸出中的前半部分部分内容是MR執行過程中的日志。最後會列印實際送出的Hadoop平台的指令,MRRunner的作用也就在于生成這樣的一條指令并将其送出到Hadoop Streaming來處理:

>>CMD: C:\hdp\hadoop-2.4.0.2.1.3.0-1981\bin\hadoop.cmd jar C:\hdp\hadoop-2.4.0.2.1.3.0-1981\share\hadoop\tools\lib\hadoop-streaming-2.4.0.2.1.3.0-1981.jar -D "mapred.reduce.tasks=0" -D "mapred.map.max.attempts=1" -D "mapred.reduce.max.attempts=1" -files "hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/HadoopNET.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.MapReduce.dll,hdfs:///username/hystar/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.WebClient.dll,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Newtonsoft.Json.dll,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.MapDriver.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.ReduceDriver.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.Hadoop.CombineDriver.exe,hdfs:///user/username/dotnetcli/bbc29713-7dfb-4e33-944e-68a42230383c/app/Microsoft.WindowsAzure.Management.Framework.Threading.dll" -input hdfs:///user/hystar/input/SqrtJob -output hdfs:///user/hystar/output/SqrtJob -mapper Microsoft.Hadoop.MapDriver.exe -reducer NONE -cmdenv "MSFT_HADOOP_MAPPER_DLL=HadoopNET.exe" -cmdenv "MSFT_HADOOP_MAPPER_TYPE=HadoopNET.FirstMapper"

使用代碼來送出MR作業

另一個運作MR程式的方式就是使用代碼,我們在

Main

函數中添加如下代碼:

var hadoop = Hadoop.Connect();
hadoop.MapReduceJob.ExecuteJob<FirstJob>();
           

編譯生成HadoopNET.exe。這次我們不需要MRRunner可以直接運作Hadoop.exe。

執行過程和執行結果都與使用MRRunner一緻。

注意這個測試用途的大資料平台,在MR任務輸出目錄已存在時會自動将其删除,而不是像原生Hadoop平台那樣遇到輸出目錄存在時報錯處理。

Microsoft.Azure.Management.HDInsight.Job

後來,微軟技術路線改變,廢棄了Azure Service Manager (ASM)-based tools中一列些工具,包括指令行CLI,PowerShell 和HDInsight .NET SDK(上一小節介紹的Microsoft.WindowsAzure.Management.HDInsight和Microsoft.Hadoop.Client都被廢棄了)。全面轉向Azure Resource Manager所包含的一些列庫與工具。

雖然微軟總是變來變去,但這也是為了迎合開源社群變化,以及讓自身産品更有體系,更有可擴充性的無奈之舉。

新的工具中與MapReduce相關的庫為

Microsoft.Azure.Management.HDInsight.Job

下到一系列類。不同于之前的庫,新的庫隻是提供一種通過.NET平台技術(包括C#與PowerShell)來送出MR任務的方法。而沒有再提供一些輔助的類用于編寫MR任務,新的MR任務編寫應該是借鑒了Python等利用Hadoop Streaming進行大資料處理的方法,将Map和Reduce分别編寫成可以獨立執行的程式,然後送出到Hadoop Streaming去執行。下文會給出一個示例。

使用.NET C#編寫的MR任務,需要基于Windows的Azure HDInsight環境來運作。這個目前沒有模拟器,是以我們使用真實環境來測試。

這裡沒有使用世紀互聯營運的Azure,而是使用了Azure全球服務,地區選擇香港,可以使用大陸的手機号和大陸發行的Visa卡進行驗證即可順利激活1600HKD的試用額度。

登陸Azure控制台後,在左側導航欄選擇大象圖示的HDInsight叢集菜單(預設情況下,需要點選更多,在彈出的菜單中才能看到此項,可以點選後面的星按鈕添加收藏,以使此菜單出現在第一屏中)。

建立叢集,類型選擇Windows,可以自定義叢集,在測試目的中選擇最低配置的叢集結點(節省銀子,雖然是免費額度),如下圖。

C#碼農的大資料之路 - 使用C#編寫MR作業
按下圖所示選擇結點的配置
C#碼農的大資料之路 - 使用C#編寫MR作業

第一次使用Azure一般都需要建立新的存儲賬戶,這裡的容器就相當于我們這個叢集存儲的根目錄。在Azure中,使用Azure Storage Blob作為類似HDFS的存在。

C#碼農的大資料之路 - 使用C#編寫MR作業

最後的摘要頁面也有明确提示,從叢集建立到被删除這個過程中将會一直按照右下角顯示價格進行計費,無論是否運作任務。是以對于Azure HDInsight的新使用者記得用後要删除是很重要的。另外Azure PowerShell和Azure .NET SDK都提供了使用代碼建立與删除HDInsight叢集的方法,友善将叢集的建立,任務部署與叢集删除作為一系列自動化任務來完成。

C#碼農的大資料之路 - 使用C#編寫MR作業

點選建立按鈕,叢集建立工作随之開始進行。

建立前,可以點選下載下傳模闆連結,将叢集建立參數作為模闆儲存,由于建立過程還是稍顯複雜,而有了這個模闆就可以在本地通過Azure CLI,PowerShell或C#代碼來完成叢集的建立。詳見此博文
C#碼農的大資料之路 - 使用C#編寫MR作業

這個過程大約持續20分鐘,直到儀表盤中的正在建立變成正在運作。

叢集建立好後,可以進入叢集的控制台看看:

C#碼農的大資料之路 - 使用C#編寫MR作業

除了首頁為Windows平台Azure HDInsights專有,其他頁面都是到YARN,HDFS(Azure Blob Storage)及Job History原有管理頁面的連結。通過各自的管理頁面,可以了解到MR任務的工作情況,資料存儲等。

有了運作環境,我們來實作兩個簡單的基于.NET的MR任務,分别建立名為NetMapper和NetReducer的兩個控制台應用程式。然後添加如下代碼:

這些代碼來自微軟官方示例
// Mapper
class NetMapper
{
    static void Main(string[] args)
    {
        if (args.Length > 0)
        {
            Console.SetIn(new StreamReader(args[0]));
        }

        string line;
        while ((line = Console.ReadLine()) != null)
        {
            Console.WriteLine(line);
        }
    }
}

// Reducer
class NetReducer
{
    static void Main(string[] args)
    {
        string line;
        var count = 0;

        if (args.Length > 0)
        {
            Console.SetIn(new StreamReader(args[0]));
        }

        while ((line = Console.ReadLine()) != null)
        {
            count += line.Count(cr => (cr == ' ' || cr == '\n'));
        }
        Console.WriteLine(count);
    }
}
           

将這兩個項目分别生成,得到NetMapper.exe與NetReducer.exe。

接着在解決方案中建立一個項目SubmitNet用于送出MR任務到Azure中,送出代碼如下:

class Program
{
    private static HDInsightJobManagementClient _hdiJobManagementClient;

    private const string ExistingClusterUri = "test-netcore.azurehdinsight.net";
    private const string ExistingClusterUsername = "admin"; //HDInsight叢集預設使用者名就是admin
    private const string ExistingClusterPassword = "建立叢集時設定的密碼";

    private const string DefaultStorageAccountName = "hdinsighthystar"; //存儲賬戶名
    private const string DefaultStorageAccountKey = "存儲賬戶Key";
    private const string DefaultStorageContainerName = "與HDInsight叢集關聯的容器的名稱";

    static void Main(string[] args)
    {
        Console.WriteLine("The application is running ...");

        var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = ExistingClusterUsername, Password = ExistingClusterPassword };
        _hdiJobManagementClient = new HDInsightJobManagementClient(ExistingClusterUri, clusterCredentials);

        SubmitMRJob();

        Console.WriteLine("Press ENTER to continue ...");
        Console.ReadLine();
    }

    private static void SubmitMRJob()
    {
        var paras = new MapReduceStreamingJobSubmissionParameters
        {
            Files = new List<string>() { "/example/app/NetMapper.exe", "/example/app/NetReducer.exe" },
            Mapper = "NetMapper.exe",
            Reducer = "NetReducer.exe",
            Input= "/example/data/gutenberg/davinci.txt",
            Output = "/example/data/StreamingOutput/wc.txt"
        };

        Console.WriteLine("Submitting the MR job to the cluster...");
        var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras);
        var jobId = jobResponse.JobSubmissionJsonResponse.Id;
        Console.WriteLine("Response status code is " + jobResponse.StatusCode);
        Console.WriteLine("JobId is " + jobId);

        Console.WriteLine("Waiting for the job completion ...");

        // Wait for job completion
        var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
        while (!jobDetail.Status.JobComplete)
        {
            Thread.Sleep(1000);
            jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
        }

        // Get job output
        var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
            DefaultStorageContainerName);
        var output = (jobDetail.ExitValue == 0)
            ? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success
            : _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure

        Console.WriteLine("Job output is: ");

        using (var reader = new StreamReader(output, Encoding.UTF8))
        {
            string value = reader.ReadToEnd();
            Console.WriteLine(value);
        }
    }
}
           

代碼改自文檔中送出Job的代碼,原文檔中使用

MapReduceJobSubmissionParameters

送出JVM平台語言編寫的jar檔案。上面的例子使用

MapReduceStreamingJobSubmissionParameters

作為替換來送出Hadoop Streaming任務。

代碼中的存儲賬戶關聯的容器可以使用Azure PowerShell快速檢視。使用Azure PowerShell需要首先安裝。在管理者提升的PowerShell視窗中執行:

Install-Module AzureRM
           

第一次使用Install-Module可能會提示需要安裝NuGet提供程式,按Y繼續就可以了。(PowerShell基于.NET,是以其擴充元件也被托管在NuGet上)

如果提示不受信任的存儲庫,也按Y繼續即可。

成功安裝基于PowerShell的Azure Resource Management工具後,在使用與賬戶相關的指令前都要先登入:

Login-AzureRmAccount
           

按提示輸入相關資訊,登陸成功後就可以執行查詢指令了。必須擷取HDInsight叢集關聯的存儲賬戶。

Get-AzureRmHDInsightCluster -ClusterName "testmr"
           

傳回資訊中會包含如下内容,這就是我們想要的。

DefaultStorageAccount     : hdinsighthystar.blob.core.windows.net
DefaultStorageContainer   : testmr-2017-03-26t03-57-40-342z
           

程式準備完後,在送出任務前,我們還需要将兩個exe檔案與輸入上傳到Azure Blob存儲中。

向Azure Blob存儲中拷貝檔案有n多種方式,詳見此文檔。最直覺的方式是使用宇宙第一IDE - VS中的Azure插件。Azure插件更新較頻繁,請確定使用最新版的軟體。本文編寫時最新版本為Azure SDK for .NET 2.9.6。

安裝插件後,在Cloud Explorer中可以看到容器(在伺服器資料總管中Azure結點也能看到存儲賬戶的容器),右鍵快捷菜單中選擇檢視容器。

C#碼農的大資料之路 - 使用C#編寫MR作業

打開容器浏覽界面,點選上傳按鈕打開上傳對話框,選擇要上傳的檔案,并輸入路徑。

注意輸入路徑會自動建立檔案夾,這也是使用此插件來建立檔案夾的唯一方式。

C#碼農的大資料之路 - 使用C#編寫MR作業

點選确定上傳即可。

其他向Blob上傳檔案的方式還包括使用Azure CLI,AzCopy應用程式及Azure PowerShell。

另外由于Azure Blob存儲與HDFS相容,也可以直接使用hadoop fs指令來操作Azure Blob存儲。

比如:

hadoop fs -put /home/testfile.txt wasbs://[email protected]/testpath/
           

其中的容器名與賬戶名可以通過前文介紹的Azure PowerShell指令來擷取。

如果本地Windows中沒有安裝hadoop,可以通過遠端桌面連接配接到HDInsight叢集,将檔案複制到遠端機器,并在遠端桌面中使用hadoop導入檔案到Azure Blob存儲。(遠端桌面環境下導入可以省略wasbs://[email protected],直接使用相對路徑)

一切準備妥當後,運作SubmitNet項目,就可以送出任務:

C#碼農的大資料之路 - 使用C#編寫MR作業

測試結束後要記得删除叢集:

C#碼農的大資料之路 - 使用C#編寫MR作業

删除HDInsight叢集,不會同時删除存儲賬戶,存儲賬戶隻要存在也會按照使用量計費,區域香港的話,每個容器每天要0.01港币。

是以,存儲賬戶幾乎不占成本,可以保留,以後建立新叢集時直接選擇現有存儲賬戶的容器,可以直接執行其中的任務,檢視其中的檔案。

基于Windows的Azure HDInsight将于未來幾個月内停止服務。但我們上面介紹的内容隻要做一定就該就可以遷移到基于Linux的HDInsight中。首先,送出任務部分的代碼是幾乎不需要變(需要變的是指定Map和Reduce任務的參數,後文有說明),隻要我們還在Windows系統中送出任務即可。使用.NET編寫的Map和Reduce程式需要進行一定修改以便可以在.NET Core下運作。這個在下一小節有詳細介紹。

.NET Core

随着.NET Core的日益完善,基于.NET Core開發可以讓C# MapReduce程式運作在基于Linux的大資料叢集中的程式已經成為可能。在Azure HDInsight(基于Linux)中(Ubuntu16.04)甚至都已經預裝了.NET Core。參照上一小節的方法,我們可以讓.NET Core編寫的MR任務運作在基于Linux的HDInsight中。

首先我們在Azure中建立一個叢集基于Linux的HDInsight叢集。Linux版的HDInsight使用Ambari作為門戶頁面(關于Ambari的介紹,官方文檔在此),而沒有了像Windows版中那樣一個頁面,同時為了安全考慮YARN,JobHistory及存儲的Web管理界面也沒有直接暴露出來。通路這些頁面需要使用SSH與Linux叢集建立隧道,這個道理和我們平時用SSH搭梯子FQ是一樣的(官方文檔在此)。

首先做一點準備工作,使用

https://[cluster-name].azurehdinsight.net

這個位址(或直接在叢集首頁點選儀表盤或Ambari視圖)打開Ambari,登陸後,查找并記錄下headnode0主機的位址。

C#碼農的大資料之路 - 使用C#編寫MR作業

如圖,Hosts标簽頁中名稱hn0開頭的就是我們要找的結點。點開檢視詳情:

C#碼農的大資料之路 - 使用C#編寫MR作業

記錄下這個ip,後面可以直接使用ip來登陸這個結點。

樓主一般比較喜歡用SecureCRT作為終端,使用BitviseSSH來搭梯子。(雖然它們各自都可以完成另一方的工作,但個人還是喜歡這樣搭配使用)

使用BitviseSSH建立與HDInsight Linux叢集的通道設定方式見下圖:

C#碼農的大資料之路 - 使用C#編寫MR作業

幾個注意的地方,SSH Host的位址不同于叢集門戶頁面位址,其中多了個

-ssh

,預設ssh使用者使用者名為sshuser(在建立叢集時可以更改)。第二圖可以不設定,這樣設定可以讓我們的BitviseSSH功能保持單一,就是進行轉發。第三圖最終要的就是端口号。連結成功後,我們可以在Firefox中使用FoxProxy等類似代理軟體配置轉發。

配置好浏覽器轉發後,我們打開之前記錄的ip的8080端口。由于我們通過SSH建立了隧道,是以這個ip會被正确的路由。

注意:某些情況下,使用headnode0無法通路,可以嘗試使用headnode1。

如果一切正常,會再次彈出Ambari的登陸框,登陸後,在這個新的Ambari登陸中的HDFS或YARN界面中的Quick Links菜單中就可以找到我們需要的那些管理網站入口。如圖:

C#碼農的大資料之路 - 使用C#編寫MR作業
C#碼農的大資料之路 - 使用C#編寫MR作業
HDP2.7版本之後的HDInsight叢集,ResourceMananger等管理頁面貌似可以直接通路而不再需要上面的限制。

下面大緻來介紹如果在基于Linux的叢集上運作基于.NET Core的App。我們還是使用上一部分的Map和Reduce程式的代碼。為了使用.NET Core的Framework,需要進行一些小小的改造。

建立兩個.NET Core控制台應用程式,分别命名為

NetCoreMapper

NetCoreReducer

。其各自的

Main

方法如下:

namespace NetCoreMapper
{
    public class Program
    {
        public static void Main(string[] args)
        {
            if (args.Length > 0)
            {
                Stream stream = new FileStream(args[0], FileMode.Open, FileAccess.Read, FileShare.Read);
                Console.SetIn(new StreamReader(stream));
            }

            string line;
            while ((line = Console.ReadLine()) != null)
            {
                Console.WriteLine(line);
            }

        }
    }
}
           
namespace NetCoreReducer
{
    public class Program
    {
        static void Main(string[] args)
        {
            ILoggerFactory loggerFactory = new LoggerFactory()
                .AddConsole()
                .AddDebug();
            ILogger logger = loggerFactory.CreateLogger<Program>();
            logger.LogInformation(
                "This is a test of the emergency broadcast system.");

            string line;
            var count = 0;

            if (args.Length > 0)
            {
                Console.SetIn(new StreamReader(new FileStream(args[0], FileMode.Open, FileAccess.Read)));
            }

            while ((line = Console.ReadLine()) != null)
            {
                count += line.Count(cr => (cr == ' ' || cr == '\n'));
            }
            Console.WriteLine(count);
        }
    }
}
           

使用指令分别生成項目:

dotnet publish --framework netcoreapp1.0 --configuration release --output publish
           

把兩個項目的輸出放到一個檔案夾中,按上面的配置,應該最終得到8個檔案,其中兩個pdb檔案可以安全删除。

C#碼農的大資料之路 - 使用C#編寫MR作業

然後,我們将這些檔案上傳到Azure Blob Storage與HDInsight叢集對應的容器中的

/example/coreapp

目錄下(如果沒有這個目錄請先建立,如果不使用這個目錄,在下文的送出任務的代碼中要把程式路徑換成相應的路徑)

接下來需要修改下送出任務的代碼:

private static void SubmitMRJob()
{
    var paras = new MapReduceStreamingJobSubmissionParameters
    {
        Files = new List<string>()
        {
            "/example/coreapp",
        },
        Mapper = "dotnet coreapp/NetCoreMapper.dll",
        Reducer = "dotnet coreapp/NetCoreReducer.dll",
        Input = "/example/data/gutenberg/davinci.txt",
        Output = "/example/data/StreamingOutput/wc.txt"

    };

    Console.WriteLine("Submitting the MR job to the cluster...");
    var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras);
    var jobId = jobResponse.JobSubmissionJsonResponse.Id;
    Console.WriteLine("Response status code is " + jobResponse.StatusCode);
    Console.WriteLine("JobId is " + jobId);

    Console.WriteLine("Waiting for the job completion ...");

    // Wait for job completion
    var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
    while (!jobDetail.Status.JobComplete)
    {
        Thread.Sleep(1000);
        jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
    }

    // Get job output
    var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
        DefaultStorageContainerName);
    var output = (jobDetail.ExitValue == 0)
        ? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success
        : _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure

    Console.WriteLine("Job output is: ");

    using (var reader = new StreamReader(output, Encoding.UTF8))
    {
        string value = reader.ReadToEnd();
        Console.WriteLine(value);
    }
}
           

尤其需要注意的是

Files

Mapper

Reducer

這三個屬性中路徑的寫法,隻有這樣MR任務才能成功執行。不要問我怎麼知道的,說多了都是淚。

運作任務送出程式,當看到控制台如下輸出時表示任務已經成功送出并執行完畢。

C#碼農的大資料之路 - 使用C#編寫MR作業

否則,控制台會輸出錯誤。

下面總結一下樓主遇到的錯誤以及MR任務查找錯誤原因的方法。

查找錯誤原因,可以通路ResourceManager管理界面(位址一般為headnode:8088),在界面中找到最近出錯的任務,如下圖:

C#碼農的大資料之路 - 使用C#編寫MR作業

點選進入任務詳情界面:

C#碼農的大資料之路 - 使用C#編寫MR作業

點選“任務曆史”連結進入HistoryServer的UI界面:

C#碼農的大資料之路 - 使用C#編寫MR作業

在任務曆史中,點選圖檔中标出的Url,可以進入失敗的Map或Reduce任務的清單界面:

C#碼農的大資料之路 - 使用C#編寫MR作業

點選Logs連結可以看到任務的詳情,從中一定可以找到任務失敗的原因。

下面兩圖就是樓主因為在

SubmitMRJob

方法中沒有以正确的方式填充Files屬性所報的錯。

C#碼農的大資料之路 - 使用C#編寫MR作業
C#碼農的大資料之路 - 使用C#編寫MR作業

當時,樓主将.NET Core程式所輸出的每個檔案作為Files屬性數組的一條進行送出,導緻如圖,程式檔案被傳到Blob後被放在不同的臨時目錄中,進而導緻運作程式失敗。

另外樓主遇到的問題還包括,目前Azure HDInsight預裝的.NET Core版本較低(1.0.1),而使用VS2017來生成.NETCore App,即使所選的Target Framework為netcoreapp1.0,生成出來的程式也是1.0.4版,直接放到叢集的主機上執行會報如下錯誤。

在使用Hadoop執行前,先直接執行下程式看看是否可以正确啟動運作是一種很好的排除錯誤保證成功率的方法。
The specified framework 'Microsoft.NETCore.App', version '1.0.4' was not found.
  • Check application dependencies and target a framework version installed at:

    /usr/share/dotnet/shared/Microsoft.NETCore.App

  • The following versions are installed:

    1.0.1

  • Alternatively, install the framework version '1.0.4'.

上面的報錯可以清楚的了解到,HDInsight叢集的主機安裝了.NETCore但是版本隻有1.0.1,而VS2017生成的.NETCore App至少需要1.0.4版本的運作時。

按照官方文檔的說明,解除安裝舊版并安裝新版。安裝後繼續測試程式是否可以在shell裡直接運作知道成功。

特别注意,我們需要在實際幹活的結點,即NodeManager運作的結點,安裝适當的.NET Core,因為這些結點是實際運作.NET Core程式的結點。參考前文圖檔,這些結點以

wn

開頭,我們可以在建立隧道後通過IP ssh到這些主機并安裝.NET Core以及測試。

如果不友善使用Azure HDInsight,來進行測試,使用HDP沙盒也同樣可以。

唯一不便的就是不能使用Microsoft.Azure.Management.HDInsight.Job來友善的送出任務,而需要自行使用hadoop CLI來完成,但這對于我們測試.NET Core編寫的業務邏輯沒有影響。

關于這個話題詳見這篇博文。

微軟4月份更新版的Azure線上文檔中介紹了使用Mono在基于Linux的HDP叢集上運作C#編寫的MapReduce的方法,具體文章連結(這個連接配接可能在将來随時發生變化)。

最後

本文及後續文章都是對于将微軟技術用于大資料可能性的一種探讨,歡迎大家一起讨論。

本文示例相關示例在此