https://github.com/MarcoGhise/SpringKafka.git
1 package it.demo.kafka.springkafka.listener;
2
3 import org.springframework.beans.BeansException;
4 import org.springframework.context.ApplicationContext;
5 import org.springframework.context.ApplicationContextAware;
6 import org.springframework.integration.endpoint.EventDrivenConsumer;
7 import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
8 import org.springframework.integration.kafka.support.ConsumerConfiguration;
9 import org.springframework.integration.kafka.support.KafkaConsumerContext;
10
11 import com.yammer.metrics.Metrics;
12
13 public class KafkaConsumerStarter implements ApplicationContextAware
14 {
15 private ApplicationContext appContext;
16
17 private SourcePollingChannelAdapter kafkaInboundChannelAdapter;
18
19 private KafkaConsumerContext kafkaConsumerContext;
20
21 public void initIt() throws Exception
22 {
23 kafkaInboundChannelAdapter = appContext.getBean("kafka-inbound-channel-adapter", SourcePollingChannelAdapter.class);
24 kafkaInboundChannelAdapter.start();
25
26 kafkaConsumerContext = appContext.getBean("consumerContext", KafkaConsumerContext.class);
27 }
28
29 public void cleanUp() throws Exception
30 {
31 if (kafkaInboundChannelAdapter != null)
32 {
33 kafkaInboundChannelAdapter.stop();
34 }
35
36 Thread.sleep(1000);
37
38 Metrics.defaultRegistry().shutdown();
39 }
40
41 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
42 {
43 this.appContext = applicationContext;
44 }
45
46