天天看點

kafka的api操作(釋出和訂閱)

消息釋出api

pom.xml

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
</dependency>
           

producer.properties

生産資料的配置資訊,放在resources下面

# Kafka服務端的主機名和端口号
bootstrap.servers=mini1:9092,mini2:9092,mini3:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# 等待所有副本節點的應答 (follower角色的分區從leader角色的分區中同步完畢消息後,給leader回報資訊)
acks=all

#消息發送最大嘗試次數
retries=1

#一批消息處理大小
batch.size=16384

# name of the partitioner class for partitioning events; default partition spreads data randomly
partitioner.class=Partition.Mypartition

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
# 請求延時
linger.ms=1

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#發送緩存區記憶體大小
buffer.memory=33554432

#消息的key對應的序列化類
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer

#消息的value對應的序列化類
value.serializer=org.apache.kafka.common.serialization.StringSerializer

           

consumer.properties

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
#定義kakfa 服務的位址
bootstrap.servers=mini1:9092,mini2:9092,mini3:9092
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
# 是否自動确認offset
enable.auto.commit=true
#  自動确認offset的時間間隔
auto.commit.interval.ms=500
# key的反序列化類
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value的反序列化類
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
           

第一個Demo

package api;
import org.apache.kafka.clients.producer.*;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties prop = new Properties();
        try {
            prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        Producer<String, String> producer = new KafkaProducer<String, String>(prop);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", i+"", i+"!"));
        }
        producer.close();
    }
}

           

帶回調函數的Demo

package api;
import org.apache.kafka.clients.producer.*;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer1 {
    /**
     * 回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,
     * 分别是RecordMetadata和Exception,如果Exception為null,說明消息發送成功,
     * 如果Exception不為null,說明消息發送失敗。
     * 注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties prop = new Properties();
        try {
			prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }

        Producer<Integer, String> producer = new KafkaProducer<Integer, String>(prop);
        for (int i = 0; i < 100; i++) {
            final ProducerRecord<Integer, String> record
                    = new ProducerRecord<Integer, String>("second", i, i + "!");
            producer.send(record, new Callback() {
                //回調函數,該方法會在Producer收到ack時調用,為異步調用
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("topic->" + metadata.topic()+
                                ",partition->"+metadata.partition()+
                                ",key->"+record.key());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}
           

消息訂閱api

package api;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {

    public static void main(String[] args) {
        Properties prop = new Properties();
        try {
            prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s, partition = %s%n",
                        record.offset(),
                        record.key(),
                        record.value(),
                        record.partition()
                );
            }
            consumer.commitSync();
            /**
             * 手動送出offset的方法有兩種:
             * 分别是commitSync(同步送出)和commitAsync(異步送出)。
             * 兩者的相同點是,都會将本次poll的一批資料最高的偏移量送出;
             * 不同點是,commitSync會失敗重試,一直到送出成功(如果由于不可恢複原因導緻,也會送出失敗);
             * 而commitAsync則沒有失敗重試機制,故有可能送出失敗。
             */
        }
    }
}
           

從各個分區的開始進行消費

subscribe的第二個參數

package api;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class CustomConsumer1 {

    public static void main(String[] args) {
        Properties prop = new Properties();
        try {
            prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
        consumer.subscribe(Arrays.asList("first","second"), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }
            //從各個分區的開始位置開始訂閱
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s, offset = %d, key = %s, value = %s, partition = %s%n",
                        record.topic(),
                        record.offset(),
                        record.key(),
                        record.value(),
                        record.partition()
                );
            }
            //所有消息訂閱完畢就退出
            if(records.isEmpty()){
                break;
            }
            consumer.commitSync();
            /**
             * 手動送出offset的方法有兩種:
             * 分别是commitSync(同步送出)和commitAsync(異步送出)。
             * 兩者的相同點是,都會将本次poll的一批資料最高的偏移量送出;
             * 不同點是,commitSync會失敗重試,一直到送出成功(如果由于不可恢複原因導緻,也會送出失敗);
             * 而commitAsync則沒有失敗重試機制,故有可能送出失敗。
             */
        }
    }
}
           
改這個參數也可
props.put("auto.offset.reset","smallest")  以前的也消費 或者 隻從新産生的開始消費