天天看點

大資料元件-Kafka的javaAPI操作,Kafka StreamingAPI開發,1.KafkaJavaApi操作

1.KafkaJavaApi操作

1.添加maven依賴

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.10.0.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- java編譯插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
           

2.生産者代碼

kafkaproducerAPI文檔

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    /**
     * 實作生産資料到kafka test這個topic裡面去
     * @param args
     */

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("acks", "all"); //消息确認機制
        props.put("retries", 0); //消息發送失敗後重試次數
        props.put("batch.size", 16384); //處理一批資料大小
        props.put("linger.ms", 1); //消息每天都進行确認
        props.put("buffer.memory", 33554432); //緩沖區的大小
        //指定k和v序列化類StringSerializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //擷取kafkaProduce這個類
        Producer<String,String> kafkaProducer = new KafkaProducer<>(props);

        //使用循環發送消失
        for (int i = 0; i < 100; i++) {
            Thread.sleep(1200);
            kafkaProducer.send(new ProducerRecord<String, String>("test","mymessage"+i));//向test這個topic發送messagei這這個資訊
        }
        //關閉資源
        kafkaProducer.close();
    }
}

           

3.1生産者分區政策

  • 如果指定分區号,那麼資料直接産生到對應的分區裡面去
  • 如果沒有指定分區号,通過資料的key取其hashCode來計算資料落到那個分區
  • 如果沒有分區号,資料也不存在key,那麼使用round-robin輪詢來實作
package it.yuge;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionProducer {
    /**
     * kafka生成資料
     * @param args
     */
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("acks", "all"); //消息确認機制
        props.put("retries", 0); //消息發送失敗後重試次數
        props.put("batch.size", 16384); //處理一批資料大小
        props.put("linger.ms", 1); //消息每天都進行确認
        props.put("buffer.memory", 33554432); //緩沖區的大小
        //指定k和v序列化類StringSerializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        //比對自定義分區類
        props.put("partitioner.class","it.yuge.MyPartition")
        
        //擷取kafkaProduce這個類
        Producer<String,String> kafkaProducer = new KafkaProducer<>(props);

        //使用循環發送消失
        for (int i = 0; i < 100; i++) {
            
            //第一種分區政策:即沒有指定分區号,又沒有指定資料的key,那麼使用輪詢的方式将資料均勻的發送到不同的分區裡面去
            ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("mypartition", "message" + i);
            //第二種分區政策:沒有指定分區号,指定了資料的key,通過key.hashCode % numPartition來計算資料會落到那個分區
            ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("mypartition", "mykey", "mymessage" + i);
            //第三種分區政策:如果指定了分區号,那麼就會将資料直接寫入到對應的分區裡面去
            ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);
            
            //自定義分區
            ProducerRecord<String, String> producerRecord4 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);

            kafkaProducer.send(producerRecord1);//向test這個topic發送messagei這這個資訊
        }
        //關閉資源
        kafkaProducer.close();
    }
}
           

自定義分區類

package it.yuge;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartition implements Partitioner {
    //這個方法就是确定分區資料到哪一個分區裡面去
    //直接return 2 表示将資料寫入到2号分區裡面去
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
           

3.消費者代碼

cunsumerAPI文檔

  • offsit:記錄了消息消費到了那一條,下一次來的時候,我們繼續從上一次的記錄接着消費
  • 自動送出
  • 手動送出

(1)自動送出offset

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    /**
     * 自動送出offset
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group"); //消費組
        props.put("enable.auto.commit", "true"); //允許自動送出
        props.put("auto.commit.interval.ms", "1000"); //自動送出的間隔時間
        props.put("session.timeout.ms", "30000"); //逾時時間
        //指定k和v的反序列化類StringDeserializer
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //指定消費那個topic裡面的資料
        consumer.subscribe(Arrays.asList("test"));
        //使用死循環來消費test這個topic裡面的資料
        while (true) {
            //records是所有拉取到的資料
            ConsumerRecords<String, String> records = consumer.poll(1000); //1000毫秒沒拉到資料就認為逾時
            for (ConsumerRecord<String, String> record : records) {
                long offset = record.offset();
                String value = record.value();
                System.out.println("消息的offset值為:"+offset+"消息的内容是:"+value);
            }
        }
    }
}

           

(2)手動送出offset

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class manualConsumer {
    /**
     * 實作手動送出offset
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "false"); //禁用自動送出offset,後期我們手動送出offset
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        //消費者訂閱test這個topic
        consumer.subscribe(Arrays.asList("test"));
        
        final int minBatchSize = 100;//達到100條進行批次處理,處理完成後送出offset
        //定義一個集合,用于存儲我們的ConsumerRecord(拉取的資料對象)
        List<ConsumerRecord<String, String>> consumerRecordList = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> consumerRecords1 = consumer.poll(1000);
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords1) {
                consumerRecordList.add(consumerRecord); //拉取的一批批資料往集合中存儲
                if (consumerRecordList.size() >= minBatchSize) {
                    //如果集合當中的資料大于等于200條,我們批量進行一個處理
                    //将這一批次的資料儲存到資料庫裡面
                    //insertTODb(consumerRecordList);//jdbc-僞代碼

                    //送出offset,表示這一批次的資料全部都處理完了
                    //consumer.commitAsync(); //異步送出offset值,異步送出效率更高,不會阻塞代碼的執行.

                    //同步送出offset值,同步是一個進入送出就上鎖,其他等待,以保障線程安全,但是判斷鎖,釋放鎖線程效率低下
                    consumer.commitSync();
                    System.out.println("送出完成");
                    //清空集合資料
                    consumerRecordList.clear();
                }
            }
        }
    }
}
           

(3)處理完每個分區裡面的資料之後,然後就進行一次送出(相比上面兩種方式資料更安全)

package it.yuge;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class ConmsumerPartition {
    /**
     * 處理完每一個分區裡面資料,就馬上送出這個分區裡面的資料
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "false"); //禁用自動送出offset,後期我們手動送出offset
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        kafkaConsumer.subscribe(Arrays.asList("mypartition"));
        while (true){
            //通過while true消費資料
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //擷取mypartition這個topic裡面所有的分區
            Set<TopicPartition> partitions = consumerRecords.partitions();

            //循環周遊每一個分區裡面資料,然後将每一個分區裡面的資料進行處理,處理完成後再進行送出
            for (TopicPartition partition : partitions) {
                //擷取每一個分區裡面的資料
                List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value()+"==="+record.offset());
                }
                //擷取我們的分區裡面最後一條資料的offset,表示我們已經消費到了這個offset了
                long offset = records.get(records.size() - 1).offset();

                //送出offset,使用Collection建立一個線程安全的map集合
                //送出我們offset,并且給offset值加1,表示我們從下沉沒有消費的那一條資料開始消費
                kafkaConsumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)));
            }
        }
    }
}
           

(4)指定消費topic當中某些分區的資料

package it.yuge;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSomePartition {
    //實作消費一個topic裡面某些分區的資料
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "true"); //禁用自動送出offset,後期我們手動送出offset
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //擷取kafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        
        //通過consumer訂閱某一個topic,進行消費,會消費topic裡面所有的分區的資料
        //kafkaConsumer.subscribe();
        
        //通過調用assian發法實作消費mypartition這個topic裡面0号和1号分區裡面的資料
        TopicPartition topicPartition1 = new TopicPartition("mypartition", 0);
        TopicPartition topicPartition2 = new TopicPartition("mypartition", 1);
        kafkaConsumer.assign(Arrays.asList(topicPartition1,topicPartition2));
        
        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            //得到一條條的資料redcord
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("資料值為"+record.value()+"偏移量為:"+record.offset());
            }
        }
    }
}

           

5.kafka Streams API開發

使用場景:

解決這樣的需求:使用StreamAPI擷取test這個topic當中的資料,然後将資料全部轉為大寫,寫入到test2這個topic當中去

大資料元件-Kafka的javaAPI操作,Kafka StreamingAPI開發,1.KafkaJavaApi操作

(1)建立一個topic

cd /export/servers/kafka_2.11-0.10.0.0/
bin/kafka-topics.sh --create  --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
           

–create表示建立

–partition 3 表示有三個分區

–replication-factor 2 表示有兩個副本

–topic test2 表示topic名字叫test2

–zookeeper 指定我們zookeeper的連接配接位址

(2)開發StreamAPI

public class StreamAPI {
	//通過StreamAPI實作将資料從test裡面讀取出來,寫入到test2裡面去
    public static void main(String[] args) {
    	//封裝配置資訊的方法
        Properties props = new Properties();
        //put一些參數
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");//應用id名稱
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");//指定kafka連接配接位址
        //資料序列化反序列化
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

		//擷取核心類KStreamBuilder
        KStreamBuilder builder = new KStreamBuilder();
        //通過KStreamBuilder調用stream方法,表示從那個topic當中擷取資料
        //調用maoValues方法,表示将每一行value都給取出來,做map映射
        //.to("test2")将轉成大寫的資料寫到test2這個topic當中去
        builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
        //通過KStreamBuilder和Properties(所有配置檔案),來建立KafkaStreams,通過KafkaStreams來實作流式程式設計的啟動
        KafkaStreams streams = new KafkaStreams(builder, props);
        //調用start啟動kafka的流API
        streams.start();
    }
}
           

(3)生産資料

//node01執行以下指令,向test這個topic當中生産資料
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
           

(4)消費資料

//node02執行一下指令消費test2這個topic當中的資料
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-consumer.sh --from-beginning  --topic test2 --zookeeper node01:2181,node02:2181,node03:2181