天天看点

解决提交SparkStreaming应用读取启用了Kerberos的Kafka中数据的问题

集群版本:

CDH: 5.14.2

Kafka:1.0.1-kafka-3.1.1

Spark:2.3.0

遇到的问题 1:

SparkStreaming读取Kafka数据时抛出jaas.conf文件不存在的错误

jaas.conf文件的内容

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  debug=true
  useKeyTab=true
  renewTicket=true
  keyTab="/home/zy/kerberos/hive.keytab"
  principal="hive";
};

Client{
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  useTicketCache=false
  serviceName=zookeeper
  keyTab="/home/zy/kerberos/hive.keytab"
  renewTicket=true
  principal="hive";
};
           

以client模式提交到yarn的命令为

spark2-submit --master yarn --deploy-mode client \
--class com.zy.KrbKafkaStreaming --num-executors 2 \
--principal [email protected].COM \
--keytab /home/zy/kerberos/hive.keytab \
--executor-memory 4G --executor-cores 2 \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.memory.storageFraction=0.4 \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/zy/kerberos/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/zy/kerberos/jaas.conf" \
/home/zy/zhangyan/streamsimu/sparktrain-ch12-1.0.jar
           

并且,文件 /home/zy/kerberos/jaas.conf 在CDH集群的各个节点都是存在的。

在Kafka Topic存在此SparkStreaming应用所在Consumer Group没有消费的数据时,此应用读取Kafka数据时报错,错误内容是

19/11/06 17:59:24 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 0.0 (TID 0, node-11, executor 2): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:789)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:608)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:589)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.<init>(CachedKafkaConsumer.scala:45)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:194)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.(KafkaRDD.scala:252)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:212)
......
Caused by: java.lang.SecurityException: java.io.IOException: /home/zy/kerberos/jaas.conf (没有这样的文件或目录)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:112)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:710)
… 17 more
Caused by: java.io.IOException: /home/zy/kerberos/jaas.conf (没有这样的文件或目录)
at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)
… 34 more
           

问题1的解决

因为Executor执行任务时需要在自己的工作目录下查找需要依赖和使用的文件,而在命令中spark.executor.extraJavaOptions给java.security.auth.login.config赋予的jaas.conf路径是在Linux本地文件系统中的路径,而非executor工作目录。

所以,需要使jaas.conf文件位于Executor的工作目录下。

以client模式提交到yarn的命令改为

spark2-submit --master yarn --deploy-mode client \
--class com.zy.KrbKafkaStreaming --num-executors 2 \
--principal [email protected].COM \
--keytab /home/zy/kerberos/hive.keytab \
--executor-memory 4G --executor-cores 2 \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.memory.storageFraction=0.4 \
--conf "spark.files=/home/zy/kerberos/jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/zy/kerberos/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
/home/zy/zhangyan/streamsimu/sparktrain-ch12-1.0.jar
           

虽然找不到jaas.conf文件的错误消失了,但依然有错误抛出

19/11/07 10:09:10 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, node-12, executor 2): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:789)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:608)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:589)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.<init>(CachedKafkaConsumer.scala:45)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:194)
        at org.apache.spark.streaming.kafka010.KafkaRDDIterator.<init>(KafkaRDD.scala:252)
        at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:212)
......
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:112)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:114)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:710)
        ... 17 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:940)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:98)
        at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:53)
        at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:82)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
        ... 21 more       
           

由此产生了问题2。

遇到的问题2

无法获取密码

问题2的解决

思路与问题1一样,应该是Executor没有在工作目录下按照指定的目录找到的Keytab文件,所以

  1. 将以client模式提交到yarn的命令改为
spark2-submit --master yarn --deploy-mode client \
--class com.zy.KrbKafkaStreaming --num-executors 2 \
--principal [email protected].COM \
--keytab /home/zy/kerberos/hive.keytab \
--executor-memory 4G --executor-cores 2 \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.memory.storageFraction=0.4 \
--conf "spark.files=/home/zy/kerberos/jaas.conf,/home/zy/kerberos/hive.keytab" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/zy/kerberos/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
/home/zy/zhangyan/streamsimu/sparktrain-ch12-1.0.jar
           
  1. 将 jaas.conf的内容改为
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  debug=true
  useKeyTab=true
  renewTicket=true
  keyTab="./hive.keytab"
  principal="hive";
};

Client{
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  useTicketCache=false
  serviceName=zookeeper
  keyTab="/home/zy/kerberos/hive.keytab"
  renewTicket=true
  principal="hive";
};
           

但执行后遇到了第3个问题

遇到的问题3

按照问题2的解决办法提交修改后的spark-submit命令和jaas.conf后,报出keytab文件多次被添加到分布式缓存的问题

19/11/08 10:20:09 WARN yarn.Client: Same name resource file:///home/zy/kerberos/hive.keytab added multiple times to distributed cache
19/11/08 10:20:09 ERROR spark.SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: Attempt to add (file:///home/zy/kerberos/hive.keytab) multiple times to the distributed cache.
        at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$10$$anonfun$apply$7.apply(Client.scala:618)
        at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$10$$anonfun$apply$7.apply(Client.scala:609)
        at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
        at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$10.apply(Client.scala:609)
......
           

问题3的解决

让Kafka Client做Kerberos认证时使用另一个主体就可以了

将 jaas.conf的内容改为

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  debug=true
  useKeyTab=true
  renewTicket=true
  keyTab="./hdfs.keytab"
  principal="hdfs";
};

Client{
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  useTicketCache=false
  serviceName=zookeeper
  keyTab="/home/zy/kerberos/hive.keytab"
  renewTicket=true
  principal="hive";
};
           

再重新提交后就没有问题了。

这只是以 client 模式提交到 YARN 的方法,还没有测试以cluster模式提交到YARN是否可行。

继续阅读