天天看點

Java 調用Kafka

  1. 建立maven工程,pom.xml中增加如下:  
  2.  <dependency>  
  3.         <groupId>org.apache.kafka</groupId>  
  4.         <artifactId>kafka_2.10</artifactId>  
  5.         <version>0.8.2.0</version>  
  6.     </dependency>  
  7. 2 java代碼:  向主題test内寫入資料  
  8. import java.util.Properties;  
  9. import java.util.concurrent.TimeUnit;  
  10. import kafka.javaapi.producer.Producer;  
  11. import kafka.producer.KeyedMessage;  
  12. import kafka.producer.ProducerConfig;  
  13. import kafka.serializer.StringEncoder;  
  14. public class kafkaProducer extends Thread{  
  15.     private String topic;  
  16.     public kafkaProducer(String topic){  
  17.         super();  
  18.         this.topic = topic;  
  19.     }  
  20.     @Override  
  21.     public void run() {  
  22.         Producer producer = createProducer();  
  23.         int i=0;  
  24.         while(true){  
  25.             producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));  
  26.             try {  
  27.                 TimeUnit.SECONDS.sleep(1);  
  28.             } catch (InterruptedException e) {  
  29.                 e.printStackTrace();  
  30.             }  
  31.         }  
  32.     }  
  33.     private Producer createProducer() {  
  34.         Properties properties = new Properties();  
  35.         properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//聲明zk  
  36.         properties.put("serializer.class", StringEncoder.class.getName());  
  37.         properties.put("metadata.broker.list", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 聲明kafka broker  
  38.         return new Producer<Integer, String>(new ProducerConfig(properties));  
  39.      }  
  40.     public static void main(String[] args) {  
  41.         new kafkaProducer("test").start();// 使用kafka叢集中建立好的主題 test   
  42.     }  
  43. }  
  44. 3  kafka叢集中消費主題test的資料:  
  45. [root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin  
  46. 4   啟動java代碼,然後在看叢集消費的資料如下:  
  47. message: 0  
  48. message: 1  
  49. message: 2  
  50. message: 3  
  51. message: 4  
  52. message: 5  
  53. message: 6  
  54. message: 7  
  55. message: 8  
  56. message: 9  
  57. message: 10  
  58. message: 11  
  59. message: 12  
  60. message: 13  
  61. message: 14  
  62. message: 15  
  63. message: 16  
  64. message: 17  
  65. message: 18  
  66. message: 19  
  67. message: 20  
  68. message: 21  

 3 kafka 使用Java寫消費者,這樣 先運作kafkaProducer ,在運作kafkaConsumer,即可得到生産者的資料:

Java代碼  

Java 調用Kafka
  1. import java.util.HashMap;  
  2. import java.util.List;  
  3. import java.util.Map;  
  4. import java.util.Properties;  
  5. import kafka.consumer.Consumer;  
  6. import kafka.consumer.ConsumerConfig;  
  7. import kafka.consumer.ConsumerIterator;  
  8. import kafka.consumer.KafkaStream;  
  9. import kafka.javaapi.consumer.ConsumerConnector;  
  10. public class kafkaConsumer extends Thread{  
  11.     private String topic;  
  12.     public kafkaConsumer(String topic){  
  13.         super();  
  14.         this.topic = topic;  
  15.     }  
  16.     @Override  
  17.     public void run() {  
  18.         ConsumerConnector consumer = createConsumer();  
  19.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  20.         topicCountMap.put(topic, 1); // 一次從主題中擷取一個資料  
  21.          Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
  22.          KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 擷取每次接收到的這個資料  
  23.          ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
  24.          while(iterator.hasNext()){  
  25.              String message = new String(iterator.next().message());  
  26.              System.out.println("接收到: " + message);  
  27.          }  
  28.     }  
  29.     private ConsumerConnector createConsumer() {  
  30.         Properties properties = new Properties();  
  31.         properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//聲明zk  
  32.         properties.put("group.id", "group1");// 必須要使用别的組名稱, 如果生産者和消費者都在同一組,則不能通路同一組内的topic資料  
  33.         return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
  34.      }  
  35.     public static void main(String[] args) {  
  36.         new kafkaConsumer("test").start();// 使用kafka叢集中建立好的主題 test   
  37.     }  
  38. }