天天看點

C#RDKAFKA消費者的兩種工作模式:消息模式,線程模式

最近開發中,用到了這2種模式。線程模式處理資料的速度和記憶體可控,資料處理不及的話,都存儲在kafka種;消息模式并行處理,壓力過大時記憶體可能不友善控制。

消息模式:

private EventConsumer CreateKafkaConsumer()

        {

            string strKafkaBrboker = "127.0.0.1";

            int intKafkaPort = 9092;

            string strKafkaGroupId = "kafka_test_group";

            string strKafkaConsumeTopic = "mytopic";

            var config = new Config() { GroupId = strKafkaGroupId, EnableAutoCommit = false };

            EventConsumer objKafkaConsumer = null;

            try

            {

                System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象...", strKafkaBrboker, intKafkaPort));

                objKafkaConsumer = new EventConsumer(config, strKafkaBrboker + ":" + intKafkaPort.ToString());

                objKafkaConsumer.OnMessage += ObjKafkaConsumer_OnMessage;

                objKafkaConsumer.OnConsumerError += ObjKafkaConsumer_OnConsumerError;

                objKafkaConsumer.OnError += ObjKafkaConsumer_OnError;

                System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象成功!", strKafkaBrboker, intKafkaPort));

            }

            catch (Exception e)

            {

                System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象錯誤:{2}", strKafkaBrboker, intKafkaPort, e.Message));

                objKafkaConsumer = null;

                return null;

            }

            try

            {

                List<string> listTopic = new List<string>();

                listTopic.Add(strKafkaConsumeTopic);

                objKafkaConsumer.Subscribe(listTopic);

                objKafkaConsumer.Start();

                System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}成功!", strKafkaConsumeTopic, strKafkaGroupId));

            }

            catch (Exception e)

            {

                System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}錯誤:{2}", strKafkaConsumeTopic, strKafkaGroupId, e.Message));

                objKafkaConsumer = null;

            }

            return objKafkaConsumer;

        }

        private void ObjKafkaConsumer_OnError(object sender, Handle.ErrorArgs e)

        {

            //寫錯誤資訊

        }

        private void ObjKafkaConsumer_OnConsumerError(object sender, ErrorCode e)

        {

            //寫錯誤資訊

        }

        private void ObjKafkaConsumer_OnMessage(object sender, Message e)

        {

            EventConsumer client = (EventConsumer)sender;

            ProcessData(client, Encoding.UTF8.GetString(e.Payload));

        }

線程模式:

 private void ThreadProcessData()

        {

            iLog.Debug("資料處理線程啟動...");

            EventConsumer consumer = CreateKafkaConsumer();

            if (consumer == null)

            {

                return;

            }

            MessageAndError? msg = null;

            DateTime dtErrorLogtime = default(DateTime);

            while (bRunning)

            {

                msg = consumer.Consume(new TimeSpan(0, 0, 1));

                if (msg == null)

                {                    

                    continue;

                }

                if (msg?.Error != ErrorCode.NO_ERROR)

                {                    

                    continue;

                }              

                string strData = Encoding.UTF8.GetString(msg?.Message.Payload));                

                ProcessData(strData.ToString());

            }

            if (consumer != null)

            {

                consumer.Unsubscribe();

                consumer.Stop();

                consumer = null;

            }

            iLog.Debug("資料處理線程退出!");

            GC.Collect();

        }

private  EventConsumer CreateKafkaConsumer()

        {

            string strKafkaBrboker = "127.0.0.1";

            int intKafkaPort = 9092;

            string strKafkaGroupId = "kafka_test_group";

            string strKafkaConsumeTopic = "mytopic";

            var config = new Config() { GroupId = strKafkaGroupId, EnableAutoCommit = false };

            EventConsumer objKafkaConsumer = null;

            try

            {

                System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象...", strKafkaBrboker, intKafkaPort));

                objKafkaConsumer = new EventConsumer(config, strKafkaBrboker + ":" + intKafkaPort.ToString());                

                System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象成功!", strKafkaBrboker, intKafkaPort));

            }

            catch (Exception e)

            {

                System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象錯誤:{2}", strKafkaBrboker, intKafkaPort, e.Message));

                objKafkaConsumer = null;

                return null;

            }

            try

            {

                List<string> listTopic = new List<string>();

                listTopic.Add(strKafkaConsumeTopic);

                objKafkaConsumer.Subscribe(listTopic);

                objKafkaConsumer.Start();

                System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}成功!", strKafkaConsumeTopic, strKafkaGroupId));

            }

            catch (Exception e)

            {

                System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}錯誤:{2}", strKafkaConsumeTopic, strKafkaGroupId, e.Message));

                objKafkaConsumer = null;

            }

            return objKafkaConsumer;

        }