最近開發中,用到了這2種模式。線程模式處理資料的速度和記憶體可控,資料處理不及的話,都存儲在kafka種;消息模式并行處理,壓力過大時記憶體可能不友善控制。
消息模式:
private EventConsumer CreateKafkaConsumer()
{
string strKafkaBrboker = "127.0.0.1";
int intKafkaPort = 9092;
string strKafkaGroupId = "kafka_test_group";
string strKafkaConsumeTopic = "mytopic";
var config = new Config() { GroupId = strKafkaGroupId, EnableAutoCommit = false };
EventConsumer objKafkaConsumer = null;
try
{
System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象...", strKafkaBrboker, intKafkaPort));
objKafkaConsumer = new EventConsumer(config, strKafkaBrboker + ":" + intKafkaPort.ToString());
objKafkaConsumer.OnMessage += ObjKafkaConsumer_OnMessage;
objKafkaConsumer.OnConsumerError += ObjKafkaConsumer_OnConsumerError;
objKafkaConsumer.OnError += ObjKafkaConsumer_OnError;
System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象成功!", strKafkaBrboker, intKafkaPort));
}
catch (Exception e)
{
System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象錯誤:{2}", strKafkaBrboker, intKafkaPort, e.Message));
objKafkaConsumer = null;
return null;
}
try
{
List<string> listTopic = new List<string>();
listTopic.Add(strKafkaConsumeTopic);
objKafkaConsumer.Subscribe(listTopic);
objKafkaConsumer.Start();
System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}成功!", strKafkaConsumeTopic, strKafkaGroupId));
}
catch (Exception e)
{
System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}錯誤:{2}", strKafkaConsumeTopic, strKafkaGroupId, e.Message));
objKafkaConsumer = null;
}
return objKafkaConsumer;
}
private void ObjKafkaConsumer_OnError(object sender, Handle.ErrorArgs e)
{
//寫錯誤資訊
}
private void ObjKafkaConsumer_OnConsumerError(object sender, ErrorCode e)
{
//寫錯誤資訊
}
private void ObjKafkaConsumer_OnMessage(object sender, Message e)
{
EventConsumer client = (EventConsumer)sender;
ProcessData(client, Encoding.UTF8.GetString(e.Payload));
}
線程模式:
private void ThreadProcessData()
{
iLog.Debug("資料處理線程啟動...");
EventConsumer consumer = CreateKafkaConsumer();
if (consumer == null)
{
return;
}
MessageAndError? msg = null;
DateTime dtErrorLogtime = default(DateTime);
while (bRunning)
{
msg = consumer.Consume(new TimeSpan(0, 0, 1));
if (msg == null)
{
continue;
}
if (msg?.Error != ErrorCode.NO_ERROR)
{
continue;
}
string strData = Encoding.UTF8.GetString(msg?.Message.Payload));
ProcessData(strData.ToString());
}
if (consumer != null)
{
consumer.Unsubscribe();
consumer.Stop();
consumer = null;
}
iLog.Debug("資料處理線程退出!");
GC.Collect();
}
private EventConsumer CreateKafkaConsumer()
{
string strKafkaBrboker = "127.0.0.1";
int intKafkaPort = 9092;
string strKafkaGroupId = "kafka_test_group";
string strKafkaConsumeTopic = "mytopic";
var config = new Config() { GroupId = strKafkaGroupId, EnableAutoCommit = false };
EventConsumer objKafkaConsumer = null;
try
{
System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象...", strKafkaBrboker, intKafkaPort));
objKafkaConsumer = new EventConsumer(config, strKafkaBrboker + ":" + intKafkaPort.ToString());
System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象成功!", strKafkaBrboker, intKafkaPort));
}
catch (Exception e)
{
System.Diagnostics.Debug.WriteLine(string.Format("建立kafka伺服器:{0}:{1}消費者對象錯誤:{2}", strKafkaBrboker, intKafkaPort, e.Message));
objKafkaConsumer = null;
return null;
}
try
{
List<string> listTopic = new List<string>();
listTopic.Add(strKafkaConsumeTopic);
objKafkaConsumer.Subscribe(listTopic);
objKafkaConsumer.Start();
System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}成功!", strKafkaConsumeTopic, strKafkaGroupId));
}
catch (Exception e)
{
System.Diagnostics.Debug.WriteLine(string.Format("訂閱kafka伺服器消息主題:{0},分組ID:{1}錯誤:{2}", strKafkaConsumeTopic, strKafkaGroupId, e.Message));
objKafkaConsumer = null;
}
return objKafkaConsumer;
}