卡夫卡制作人挂在发送 [英] Kafka producer hangs on send
问题描述
逻辑是,从自定义源获取数据的流式作业必须同时写入 Kafka 和 HDFS.
The logic is that a streaming job, getting data from a custom source has to write both to Kafka as well as HDFS.
我编写了一个(非常)基本的 Kafka 生产者来执行此操作,但是整个流工作都挂在 send 方法上.
I wrote a (very) basic Kafka producer to do this, however the whole streaming job hangs on the send method.
class KafkaProducer(val kafkaBootstrapServers: String, val kafkaTopic: String, val sslCertificatePath: String, val sslCertificatePassword: String) {
val kafkaProps: Properties = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
kafkaProps.put("acks", "1")
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("ssl.truststore.location", sslCertificatePath)
kafkaProps.put("ssl.truststore.password", sslCertificatePassword)
val kafkaProducer: KafkaProducer[Long, Array[String]] = new KafkaProducer(kafkaProps)
def sendKafkaMessage(message: Message): Unit = {
message.data.foreach(list => {
val producerRecord: ProducerRecord[Long, Array[String]] = new ProducerRecord[Long, Array[String]](kafkaTopic, message.timeStamp.getTime, list.toArray)
kafkaProducer.send(producerRecord)
})
}
}
以及调用生产者的代码:
And the code calling the producer:
receiverStream.foreachRDD(rdd => {
val messageRowRDD: RDD[Row] = rdd.mapPartitions(partition => {
val parser: Parser = new Parser
val kafkaProducer: KafkaProducer = new KafkaProducer(kafkaBootstrapServers, kafkaTopic, kafkaSslCertificatePath, kafkaSslCertificatePass)
val newPartition = partition.map(message => {
Logger.getLogger("importer").error("Writing Message to Kafka...")
kafkaProducer.sendKafkaMessage(message)
Logger.getLogger("importer").error("Finished writing Message to Kafka")
Message.data.map(singleMessage => parser.parseMessage(Message.timeStamp.getTime, singleMessage))
})
newPartition.flatten
})
val df = sqlContext.createDataFrame(messageRowRDD, Schema.messageSchema)
Logger.getLogger("importer").info("Entries-count: " + df.count())
val row = Try(df.first)
row match {
case Success(s) => Persister.writeDataframeToDisk(df, outputFolder)
case Failure(e) => Logger.getLogger("importer").warn("Resulting DataFrame is empty. Nothing can be written")
}
})
从日志中我可以看出每个执行程序都到达了发送到 kafka"点,但不会再进一步了.所有执行者都挂在那里,没有抛出异常.
From the logs I can tell that each executor is reaching the "sending to kafka" point, however not any further. All executors hang on that and no exception is thrown.
Message 类是一个非常简单的 case 类,有 2 个字段、一个时间戳和一个字符串数组.
The Message class is a very simple case class with 2 fields, a timestamp and an array of strings.
推荐答案
这是由于 Kafka 中的 acks 字段造成的.
This was due to the acks field in Kafka.
Acks 设置为 1,发送速度更快.
Acks was set to 1 and sends went ahead a lot faster.
这篇关于卡夫卡制作人挂在发送的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!