天天看點

使用Java編寫Spark Streaming來做大資料處理(一)

寫在前面:一般來說spark都是由Scala來實作,但Java一手遮天,非要以一己之力實作(不是)

一、使用JAVA編寫有哪幾部組成?

1.擷取資料

//往往檔案都是根據配置檔案修改的時間,動态擷取對應的檔案來處理
 String filePath = "/租戶目錄/名額/名額分類/名額分類子分支" + startTime.substring(0, 6) + ".csv";
 //通過調用JavaSparkContext的擷取文本的方法擷取資料
 jsc.textFile(名額路徑)      

一般來說,想要使用JavaSparkContext,就要先定義一個JavaSparkContext的執行個體化對象

JavaSparkContext jsc = SparkConfig.Instance("自定義的名字");      

2.處理資料

(1)擷取資料時一般會根據分隔符(比如逗号)将元素劃分命名,如下:

使用mapToPair方法操作資料,先将資料按照逗号分割成兩部分,給到二進制組Tuple2

jsc.textFile(filePath).mapToPair(new PairFunction<String, String, String>() {
                        @Override
                        public Tuple2<String, String> call(String t) throws Exception {
                            String[] split = t.split(",", -1);
                            return new Tuple2<>(split[0],split[1]);
                        }
                    })      

(2)具體實際使用中,我們将結果傳回一個JavaPairRDD<String, String>類型的自定義資料變量,并且做一個持久化操作

JavaPairRDD<String, String> 自定義資料名稱= jsc.textFile(filePath).mapToPair(new PairFunction<String, String, String>() {
                        @Override
                        public Tuple2<String, String> call(String t) throws Exception {
                            String[] split = t.split(",", -1);
                            return new Tuple2<>(split[0],split[1]);
                        }
                    }).persist(StorageLevel.MEMORY_AND_DISK());      

以此類推,可以定義好幾個這樣的​

​JavaPairRDD<String,String>​

​​來暫時存儲資料,有了這些資料,就可以進行Join等操作

除了直接使用,還可以把擷取字段定義成一種方法,例如:

public static JavaPairRDD<String, String> get字段1字段2(JavaSparkContext jsc, String 字段1字段2) {
    JavaPairRDD<String, String> imsiAttr = jsc.textFile(filePath).mapToPair(new PairFunction<String, String, String>() {
        @Override
        public Tuple2<String, String> call(String s) throws Exception {
            String 字段1= s.split(",", -1)[0];
            String 字段2= s.split(",", -1)[1];
            return new Tuple2<>(字段1, 字段2);
        }
    });
    return 字段1字段2;
}      

這樣使用的時候就可以直接調用 get字段1字段2方法擷取 字段1字段2

JavaPairRDD<String, String> 字段1字段2= get1_2(jsc, filePath);      

注:在命名和使用中,往往會将一些字段的csv檔案路徑存為公參,比如

在公參檔案中寫

public static final String 字段1字段2= 租戶目錄 + "配置目錄/字段1_字段2.csv";      

然後再擷取字段1字段2所對應的檔案路徑

String imsiattr = GetFilePathUtils.字段1字段2;      

這樣一來,實際使用中的代碼就是:

String 字段1字段2= GetFilePathUtils.字段1字段2;
JavaPairRDD<String, String> 字段1字段2= get1_2(jsc, 字段1字段2).persist(StorageLevel.MEMORY_AND_DISK());      

二、操作資料常用的方法有哪些?

一般為了提高代碼的複用性,會将對資料的常用操作寫成一個函數方法來調用,其中包含了很多的函數,具體常用的如下:

groupByKey

leftOuterJoin

mapToPair

flatMapToPair

reduceByKey