天天看點

OpenShift 4 之AMQ Streams(3) - 用Kafka MirrorMaker在Kafka叢集間複制資料什麼是MirrorMaker配置MirrorMaker

《OpenShift 4.x HOL教程彙總》

文章目錄

  • 什麼是MirrorMaker
  • 配置MirrorMaker
    • 确認Kafka環境
    • 建立Source和Target
    • 建立MirrorMaker
    • 測試驗證MirrorMaker
      • 發送測試資料
      • 接收測試資料

什麼是MirrorMaker

MirrorMaker是Kafka中用于不同的Kafka叢集之間鏡像、複制、同步資料的工具。MirrorMaker可從源叢集中消費并發送到目标群集。

OpenShift 4 之AMQ Streams(3) - 用Kafka MirrorMaker在Kafka叢集間複制資料什麼是MirrorMaker配置MirrorMaker

配置MirrorMaker

确認Kafka環境

為了恢複環境,可以執行以下指令删除以前操作的資源;還可以建立一個OpenShift項目,然後安裝AMQ Stream Operator或AStrimzi Operator即可。

oc delete kafka -n kafka my-cluster
oc delete deploy -n kafka kafka-consumer
oc delete deploy -n kafka kafka-producer
oc delete deploy -n kafka connector-consumer
oc delete deploy -n kafka my-cluster-entity-operator
 
$ oc get deployment
NAME                                  READY   UP-TO-DATE   AVAILABLE   AGE
amq-streams-cluster-operator-v1.4.0   1/1     1            1           6h
my-connect-connect                    1/1     1            1           1h
           

建立Source和Target

  1. 建立内容如下的kafka-source.yaml的檔案,其中定義了名為my-source-cluster的Kafka相關資源,它是MirrorMaker的source kafka叢集。
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
  name: my-source-cluster
spec:
  kafka:
    replicas: 3
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
    storage:
      type: ephemeral
    resources:
      requests:
        memory: 512Mi
        cpu: 500m
      limits:
        memory: 2Gi
        cpu: 1000m
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
    resources:
      requests:
        memory: 512Mi
        cpu: 500m
      limits:
        memory: 2Gi
        cpu: 1000m
  entityOperator:
    topicOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: 500m
        limits:
          memory: 2Gi
          cpu: 1000m
  userOperator:
    resources:
      requests:
        memory: 512Mi
        cpu: 500m
      limits:
        memory: 2Gi
        cpu: 1000m
           
  1. 執行指令建立名為my-source-cluster的Kafka叢集。
oc apply -f kafka-source.yaml -n kafka
           
  1. 建立内容如下的kafka-target.yaml的檔案,其中定義了名為my-target-cluster的Kafka相關資源,它是MirrorMaker的target kafka叢集。
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
  name: my-target-cluster
spec:
  kafka:
    replicas: 3
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
    storage:
      type: ephemeral
    resources:
      requests:
        memory: 512Mi
        cpu: 500m
      limits:
        memory: 2Gi
        cpu: 700m
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
    resources:
      requests:
        memory: 512Mi
        cpu: 500m
      limits:
        memory: 2Gi
        cpu: 700m
  entityOperator:
    topicOperator: 
      resources:
        requests:
          memory: 512Mi
          cpu: 500m
        limits:
          memory: 2Gi
          cpu: 700m
    userOperator: 
      resources:
        requests:
          memory: 512Mi
          cpu: 500m
        limits:
          memory: 2Gi
          cpu: 700m
           
  1. 執行指令建立名為my-target-cluster的Kafka叢集。
$ oc apply -f kafka-target.yaml -n kafka
           
  1. 檢視目前的Kafka叢集和相關Pod資源。
$ oc get kafka -n kafka
NAME                DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
my-source-cluster   3                        3
my-target-cluster   3                        3

$ oc get pod -n kafka
NAME                                                   READY   STATUS              RESTARTS   AGE
amq-streams-cluster-operator-v1.4.0-59c7778c88-7bvzx   1/1     Running             4          46h
my-connect-connect-75ddc48968-tmbt6                    1/1     Running             0          49m
my-source-cluster-entity-operator-669bfdbb5b-vbc5k     2/2     Running             0          26m
my-source-cluster-kafka-0                              2/2     Running             0          27m
my-source-cluster-kafka-1                              2/2     Running             1          27m
my-source-cluster-kafka-2                              2/2     Running             0          27m
my-source-cluster-zookeeper-0                          2/2     Running             0          27m
my-source-cluster-zookeeper-1                          2/2     Running             0          27m
my-source-cluster-zookeeper-2                          2/2     Running             0          27m
my-target-cluster-entity-operator-6c877d758c-cz79p     3/3     Running             0          2s
my-target-cluster-kafka-0                              2/2     Running             1          48s
my-target-cluster-kafka-1                              2/2     Running             0          48s
my-target-cluster-kafka-2                              2/2     Running             1          48s
my-target-cluster-zookeeper-0                          2/2     Running             0          82s
my-target-cluster-zookeeper-1                          2/2     Running             0          82s
my-target-cluster-zookeeper-2                          2/2     Running             0          82s
           

建立MirrorMaker

  1. 建立内容如下的kafka-mirror-maker.yaml檔案,它定義了名為my-mirror-maker的KafkaMirrorMaker。
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  image: strimzi/kafka-mirror-maker:0.8.0
  replicas: 1
  consumer:
    bootstrapServers: my-source-cluster-kafka-bootstrap:9092
    groupId: my-source-group-id
  producer:
    bootstrapServers: my-target-cluster-kafka-bootstrap:9092
  whitelist: ".*"
           
  1. 檢視KafkaMirrorMaker資源,确認正常運作。
$ oc apply -f kafka-mirror-maker.yaml -n kafka
 
$ oc get KafkaMirrorMaker
NAME              DESIRED REPLICAS
my-mirror-maker   1
 
$ oc get pod -l strimzi.io/name=my-mirror-maker-mirror-maker
NAME                                            READY   STATUS    RESTARTS   AGE
my-mirror-maker-mirror-maker-646b477695-xlq87   1/1     Running   0          28m
           

測試驗證MirrorMaker

發送測試資料

  1. 執行指令,進入my-source-cluster-kafka-2。
  1. 發送測試資料,然後退出Pod。
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> testmessage1
> testmessage2
> lasttestmessage
> <ctrl-c>
$ <ctrl-d>
           

接收測試資料

  1. 執行指令,進入my-target-cluster-kafka-2。
  1. 确認能接收到資料,然後退出Pod。
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
message1
message2
lastmessage
<ctrl-d>