天天看點

基于 Kafka 和 Elasticsearch 建構實時站内搜尋功能的實踐

作者:京東雲開發者

作者:京東物流 紀卓志

目前我們在建構一個多租戶多産品類網站,為了讓使用者更好的找到他們所需要的産品,我們需要建構站内搜尋功能,并且它應該是實時更新的。本文将會讨論建構這一功能的核心基礎設施,以及支援此搜尋能力的技術棧。

問題的定義與決策

為了建構一個快速、實時的搜尋引擎,我們必須做出某些設計決策。我們使用 MySQL 作為主資料庫存儲,是以有以下選擇:

  1. 直接在 MySQL 資料庫中查詢使用者在搜尋框中輸入的每個關鍵詞,就像 %#{word1}%#{word2}%... 這樣。
  2. 使用一個高效的搜尋資料庫,如 Elasticsearch。

考慮到我們是一個多租戶應用程式,同時被搜尋的實體可能需要大量的關聯操作(如果我們使用的是 MySQL 一類的關系型資料庫),因為不同類型的産品有不同的資料結構,是以我們還可以能需要同時周遊多個資料表來查詢使用者輸入的關鍵詞。是以我們決定不使用直接在 MySQL 中查詢關鍵詞的方案。

是以,我們必須決定一種高效、可靠的方式,将資料實時地從 MySQL 遷移到 Elasticsearch 中。接下來需要做出如下的決定:

  1. 使用 Worker 定期查詢 MySQL 資料庫,并将所有變化的資料發送到 Elasticsearch。
  2. 在應用程式中使用 Elasticsearch 用戶端,将資料同時寫入到 MySQL 和 Elasticsearch 中。
  3. 使用基于事件的流引擎,将 MySQL 資料庫中的資料更改作為事件,發送到流處理伺服器上,經過處理後将其轉發到 Elasticsearch。

選項 1 并不是實時的,是以可以直接排除,而且即使我們縮短輪詢間隔,也會造成全表掃描給資料庫造成查詢壓力。除了不是實時的之外,選項 1 無法支援對資料的删除操作,如果對資料進行了删除,那麼我們需要額外的表記錄之前存在過的資料,這樣才能保證使用者不會搜尋到已經删除了的髒資料。

對于其他兩種選擇,不同的應用場景做出的決定可能會有所不同。在我們的場景中,如果選擇選項 2,那麼我們可以預見一些問題:如過 Elasticsearch 建立網絡連接配接并确認更新時速度很慢,那麼這可能會降低我們應用程式的速度;或者在寫入 Elasticsearch 時發生了未知異常,我們該如何對這一操作進行重試來保證資料完整性。

不可否認開發團隊中不是所有開發人員都能了解所有的功能,如果有開發人員在開發新的與産品有關的業務邏輯時沒有引入 Elasticsearch 用戶端,那麼我們将在 Elasticsearch 中更新這次資料的更改,無法保證 MySQL 與 Elasticsearch 間的資料一緻性。

接下來我們該考慮如何将 MySQL 資料庫中的資料更改作為事件,發送到流處理伺服器上。我們可以在資料庫變更後,在應用程式中使用消息管道的用戶端同步地将事件發送到消息管道,但是這并沒有解決上面提到的使用 Elasticsearch 用戶端帶來的問題,隻不過是将風險從 Elasticsearch 轉移到了消息管道。最終我們決定通過采集 MySQL Binlog,将 MySQL Binlog 作為事件發送到消息管道中的方式來實作基于事件的流引擎。關于 binlog 的内容可以點選連結,在這裡不再贅述。

服務簡介

基于 Kafka 和 Elasticsearch 建構實時站内搜尋功能的實踐

為了對外提供統一的搜尋接口,我們首先需要定義用于搜尋的資料結構。對于大部分的搜尋系統而言,對使用者展示的搜尋結果通常包括為标題和内容,這部分内容我們稱之可搜尋内容(Searchable Content)。在多租戶系統中我們還需要在搜尋結果中标示出該搜尋結果屬于哪個租戶,或用來過濾目前租戶下可搜尋的内容,我們還需要額外的資訊來幫助使用者篩選自己想要搜尋的産品類别,我們将這部分通用的但不用來進行搜尋的内容稱為中繼資料(Metadata)。最後,在我們展示搜尋結果時可能希望根據不同類型的産品提供不同的展示效果,我們需要在搜尋結果中傳回這些個性化展示所需要的原始内容(Raw Content)。到此為止我們可以定義出了存儲到 Elasticsearch 中的通用資料結構:

{
	"searchable": {
		"title": "string",
		"content": "string"
	},
	"metadata": {
		"tenant_id": "long",
		"type": "long",
		"created_at": "date",
		"created_by": "string",
		"updated_at": "date",
		"updated_by": "string"
	},
	"raw": {}
}
           

基礎設施

Apache Kafka: Apache Kafka 是開源的分布式事件流平台。我們使用 Apache kafka 作為資料庫事件(插入、修改和删除)的持久化存儲。

mysql-binlog-connector-java: 我們使用 mysql-binlog-connector-java 從 MySQL Binlog 中擷取資料庫事件,并将它發送到 Apache Kafka 中。我們将單獨啟動一個服務來完成這個過程。

在接收端我們也将單獨啟動一個服務來消費 Kafka 中的事件,并對資料進行處理然後發送到 Elasticsearch 中。

Q:為什麼不使用Elasticsearch connector之類的連接配接器對資料進行處理并發送到Elasticsearch中?
A:在我們的系統中是不允許将大文本存入到MySQL中的,是以我們使用了額外的對象存儲服務來存放我們的産品文檔,是以我們無法直接使用連接配接器将資料發送到Elasticsearch中。
Q:為什麼不在發送到Kafka前就将資料進行處理?
A:這樣會有大量的資料被持久化到Kafka中,占用Kafka的磁盤空間,而這部分資料實際上也被存儲到了Elasticsearch。
Q:為什麼要用單獨的服務來采集binlog,而不是使用Filebeat之類的agent?
A:當然可以直接在MySQL資料庫中安裝agent來直接采集binlog并發送到Kafka中。但是在部分情況下開發者使用的是雲服務商或其他基礎設施部門提供的MySQL伺服器,這種情況下我們無法直接進入伺服器安裝agent,是以使用更加通用的、無侵入性的C/S結構來消費MySQL的binlog。
           

配置技術棧

我們使用 docker 和 docker-compose 來配置和部署服務。為了簡單起見,MySQL 直接使用了 root 作為使用者名和密碼,Kafka 和 Elasticsearch 使用的是單節點叢集,且沒有設定任何鑒權方式,僅供開發環境使用,請勿直接用于生産環境。

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - 3306:3306
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local
           

在服務啟動成功後我們需要為 Elasticsearch 建立索引,在這裡我們直接使用 curl 調用 Elasticsearch 的 RESTful API,也可以使用 busybox 基礎鏡像建立服務來完成這個步驟。

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
  "mappings": {
    "properties": {
      "searchable": {
        "type": "nested",
        "properties": {
          "title": {
            "type": "text"
          },
          "content": {
            "type": "text"
          }
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {
            "type": "long"
          },
          "type": {
            "type": "integer"
          },
          "created_at": {
            "type": "date"
          },
          "created_by": {
            "type": "keyword"
          },
          "updated_at": {
            "type": "date"
          },
          "updated_by": {
            "type": "keyword"
          }
        }
      },
      "raw": {
        "type": "nested"
      }
    }
  }
}'
           

核心代碼實作(SpringBoot + Kotlin)

Binlog 采集端:

override fun run() {
        client.serverId = properties.serverId
        val eventDeserializer = EventDeserializer()
        eventDeserializer.setCompatibilityMode(
            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        )
        client.setEventDeserializer(eventDeserializer)
        client.registerEventListener {
            val header = it.getHeader<EventHeader>()
            val data = it.getData<EventData>()
            if (header.eventType == EventType.TABLE_MAP) {
                tableRepository.updateTable(Table.of(data as TableMapEventData))
            } else if (EventType.isRowMutation(header.eventType)) {
                val events = when {
                    EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                    EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                    EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                    else -> emptyList()
                }
                logger.info("Mutation events: {}", events)
                for (event in events) {
                    kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
                }
            }
        }
        client.connect()
    }
           

在這段代碼裡面,我們首先是對 binlog 用戶端進行了初始化,随後開始監聽 binlog 事件。binlog 事件類型有很多,大部分都是我們不需要關心的事件,我們隻需要關注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以。當我們接收到 TABLE_MAP 事件,我們會對記憶體中的資料庫表結構進行更新,在後續的 WRITE/UPDATE/DELETE 事件中,我們會使用記憶體緩存的資料庫結構進行映射。整個過程大概如下所示:

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
	"id": 1,
	"title": "Foo",
	"content": "Bar"
}
           

随後我們将收集到的事件發送到 Kafka 中,并由 Event Processor 進行消費處理。

事件處理器

@Component
class KafkaBinlogTopicListener(
    val binlogEventHandler: BinlogEventHandler
) {

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    }

    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {
        val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}
           

首先使用 SpringBoot Message Kafka 提供的注解對事件進行消費,接下來将事件委托到 binlogEventHandler 去進行處理。實際上 BinlogEventHandler 是個自定義的函數式接口,我們自定義事件處理器實作該接口後通過 Spring Bean 的方式注入到 KafkaBinlogTopicListener 中。

@Component
class ElasticsearchIndexerBinlogEventHandler(
    val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
    override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        // Should delete from Elasticsearch
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
            val deleteRequest = DeleteRequest()
            deleteRequest
                .index("search")
                .id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            // Not ever WRITE or UPDATE, just reindex
            val indexRequest = IndexRequest()
            indexRequest
                .index("search")
                .id(documentId)
                .source(
                    mapOf<String, Any>(
                        "searchable" to mapOf(
                            "title" to payload["title"],
                            "content" to payload["content"]
                        ),
                        "metadata" to mapOf(
                            "tenantId" to payload["tenantId"],
                            "type" to payload["type"],
                            "createdAt" to payload["createdAt"],
                            "createdBy" to payload["createdBy"],
                            "updatedAt" to payload["updatedAt"],
                            "updatedBy" to payload["updatedBy"]
                        )
                    )
                )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}
           

在這裡我們隻需要簡單地判斷是否為删除操作就可以,如果是删除操作需要在 Elasticsearch 中将資料删除,而如果是非删除操作隻需要在 Elasticsearch 重新按照為文檔建立索引即可。這段代碼簡單地使用了 Kotlin 中提供的 mapOf 方法對資料進行映射,如果需要其他複雜的處理隻需要按照 Java 代碼的方式編寫處理器即可。

總結

其實 Binlog 的處理部分有很多開源的處理引擎,包括 Alibaba Canal,本文使用手動處理的方式也是為其他使用非 MySQL 資料源的同學類似的解決方案。大家可以按需所取,因地制宜,為自己的網站設計屬于自己的實時站内搜尋引擎!