天天看點

基于commons-pool2實作KafkaProducer池來提升kafka發送消息性能

業務場景

Spark用fileStream實時從NFS擷取一批檔案,将檔案中JSON結構裡面的大小圖二進制資料上傳雲存儲擷取url然後再将url以string回寫到json中發送kafka,最早使用多線程并行發送内個線程建立一個KafkaProducer速度慢的驚人無法滿足現場,接着線上程中共用一個KafkaProducer再測試發現沒有改觀。

問題分析

分析kafka日志發現,每次發送資料大部分時間在0-1ms,出現時延的情況時發現都是連續出現的,由于發送端隻有一個producer執行個體,這樣當一個message發送阻塞了,将會瞬間導緻TPS急劇下降,正常情況下一個kafka執行個體在1秒内能夠處理上千個發送請求(由于我們的消息每個在323970B左右,千兆網用盡上行io差不多也隻能發送11710241024/323970 = 378),但出現1秒的時延将會導緻1秒隻能處理1個發送請求,這樣會阻塞後面資料的處理。

問題原因

由于producer是線程安全的,是以采用單執行個體,但一次發送阻塞(因為使用同步發送,每次發送都會等待結果,這個過程是同步的),将會影響到後續的資料處理,那就隻能緩存producer執行個體了。

實作方案

對象池工廠實作的代碼實作

public class KafkaProducerPooledObjectFactory implements PooledObjectFactory<KafkaProducer<String, String>>, Serializable {

    Properties props;
    public KafkaProducerPooledObjectFactory(Properties props) {
        this.props = props;
    }

    @Override
    public PooledObject<KafkaProducer<String, String>> makeObject() throws Exception {
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        return new DefaultPooledObject<KafkaProducer<String, String>>(kafkaProducer);
    }

    @Override
    public void destroyObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
        KafkaProducer<String, String> o = p.getObject();
        o = null;
    }

    @Override
    public boolean validateObject(PooledObject<KafkaProducer<String, String>> p) {
        return false;
    }

    @Override
    public void activateObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
        //System.out.println("activateObject");
    }

    @Override
    public void passivateObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
        //System.out.println("passivateObject");
    }

}           

對象池工對外的代碼實作

public class KafkaProducerPool implements Serializable {

    private GenericObjectPool<KafkaProducer<String, String>> objectPool;

    public KafkaProducerPool(Properties props) {

        KafkaProducerPooledObjectFactory kafkaProducerPooledObjectFactory = new KafkaProducerPooledObjectFactory(props);

        GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 池子配置檔案
        config.setMaxTotal(100);                                        // 整個池最大值
        config.setMaxIdle(10);                                          // 最大空閑
        config.setMinIdle(0);                                           // 最小空閑
        config.setMaxWaitMillis(5000);                                  // 最大等待時間,-1表示一直等
        config.setBlockWhenExhausted(true);                             // 當對象池沒有空閑對象時,新的擷取對象的請求是否阻塞。true阻塞。預設值是true
        config.setTestOnBorrow(false);                                  // 在從對象池擷取對象時是否檢測對象有效,true是;預設值是false
        config.setTestOnReturn(false);                                  // 在向對象池中歸還對象時是否檢測對象有效,true是,預設值是false
        config.setTestWhileIdle(false);                                 // 在檢測空閑對象線程檢測到對象不需要移除時,是否檢測對象的有效性。true是,預設值是false
        config.setMinEvictableIdleTimeMillis(60000L);                   // 可發呆的時間,10mins
        config.setTestWhileIdle(true);                                  // 發呆過長移除的時候是否test一下先
        config.setTimeBetweenEvictionRunsMillis(3000);                  // 回收資源線程的執行周期 3s
        config.setNumTestsPerEvictionRun(10);

        objectPool = new GenericObjectPool<>(kafkaProducerPooledObjectFactory, config);
    }

    public static Properties getConfig(String hosts) {
        Properties props = new Properties();
        props.put("bootstrap.servers", hosts);
        // procedure要求leader在考慮完成請求之前收到的确認數,用于控制發送記錄在服務端的持久化,其值可以為如下:
        // acks = 0 如果設定為零,則生産者将不會等待來自伺服器的任何确認,該記錄将立即添加到套接字緩沖區并視為已發送。在這種情況下,無法保證伺服器已收到記錄,并且重試配置将不會生效(因為用戶端通常不會知道任何故障),為每條記錄傳回的偏移量始終設定為-1。
        // acks = 1 這意味着leader會将記錄寫入其本地日志,但無需等待所有副本伺服器的完全确認即可做出回應,在這種情況下,如果leader在确認記錄後立即失敗,但在将資料複制到所有的副本伺服器之前,則記錄将會丢失。
        // acks = all 這意味着leader将等待完整的同步副本集以确認記錄,這保證了隻要至少一個同步副本伺服器仍然存活,記錄就不會丢失,這是最強有力的保證,這相當于acks = -1的設定。
        // 可以設定的值為:all, -1, 0, 1
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 10000);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    public KafkaProducer<String, String> getProducer() {
        try {
            KafkaProducer<String, String> producer = objectPool.borrowObject();
            return producer;
        } catch (Exception e) {
            throw new RuntimeException("擷取KafkaProducer連接配接異常", e);
        }
    }

    public void returnProducer(KafkaProducer<String, String> producer) {
        try {
            objectPool.returnObject(producer);// 将對象放回對象池
        } catch (Exception e) {
            throw new RuntimeException("釋放KafkaProducer連接配接異常", e);
        }
    }


    public static void main(String[] args) {
        String topic = "TEST_1";
        String hosts = "hdh109:9092";
        Properties props = getConfig(hosts);

        KafkaProducerPool pool = new KafkaProducerPool(props);
        for (int i = 0; i < 10000; i++) {
            KafkaProducer<String, String> producer = pool.getProducer();
            pool.returnProducer(producer);
        }
    }
}           

調用代碼片段1

val pool = new KafkaProducerPool(KafkaProducerPool.getConfig(hosts))
val executors:ExecutorService = Executors.newFixedThreadPool(40)
while (i.hasNext) {
    val item = i.next().toString
    val exec = new Exec(ACCESS_KEY, SECRET_KEY, gateHost, gatePort, serialId, poolId, token, topic, hosts, item, pool)
    val future = executors.submit(exec)
    future.get()
}
executors.shutdown()           

Exec中的代碼片段2

public class Exec implements Runnable {

    String msg;
    String topic, hosts;
    KafkaProducerPool pool;

    public Exec(String topic, String hosts, String msg, KafkaProducerPool pool) {

        this.msg = msg;

        this.topic = topic;
        this.hosts = hosts;
        this.pool = pool;
    }

    public Exec(String topic, String hosts, String msg, KafkaProducerExample prod) {
        this.msg = msg;

        this.topic = topic;
        this.hosts = hosts;
        this.prod = prod;
    }

    @Override
    public void run() {

            //省略一些消息處理相關代碼

            KafkaProducer<String, String> producer = pool.getProducer();
            ProducerRecord rec = new ProducerRecord<>(topic, UUID.randomUUID().toString(), jo.toJSONString());
            producer.send(rec);
            pool.returnProducer(producer);

        } catch (IOException exec) {
            exec.printStackTrace();
        }

    }
}