Kafka和Spark Streaming结合:代码示例和游戏状态

原文:http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ OCT 1ST, 2014 Spark Streaming 作为一种实时数据处理工具近来越来越受到关注,与 Apache Storm 一起被提及。如果你问我,没和kafka结合而做实时数据处理的工具是不完整的,因此我添加了Spark Streaming的示例应用程序(kafka-storm-starter )以演示如何从kafka读取数据和写入数据到kafka,用Avro作为数据格式 Twitter Bijection 处理数据序列化。

什么是 Spark Streaming?

Spark StreamingApache Spark的子项目。Spark 是一个类似于Hadoop的批处理平台,Spark Streaming是一个运行在 Spark引擎之上的实时数据处理工具。

Spark Streaming vs. Apache Storm

Spark Streaming 的用例与Apache Storm密切相关。这无疑是当今最流行的实时大数据处理平台。Bobby Evans and Tom Graves of Yahoo! Engineering 最近在Spark and Storm at Yahoo!有一场讲座,他们比较这两个平台,包括关于什么时候、为什么选择其中一种。P. Taylor Goetz of HortonWorks 分享了一个标题为 Apache Storm and Spark Streaming Compared的幻灯片。 这是我个人非常简短的比较: Storm相对于Spark Streaming具有较高的行业认可度和更好的稳定性。在另一方面,Spark具有更易表达、更高层级API,可以说更易使用,如果你用Scala写你的Spark 应用的话(我更喜欢Spark API).但是别相信我的话,请查看上面提到的讲座和幻灯片。 Spark和Strom都是Apache的顶级项目,供应商已经开始使用其中一种或者两种共同使用在它们的商业服务,例如 HortonWorks (Storm, Spark) and Cloudera (Spark).

Excursus: Machines, cores, executors, tasks, and receivers in Spark

文章后面的单元讲了很多关于在Spark和Kafka的并行。你至少要基本理解Spark的一些技术才可以跟上这些单元里的讨论。

  • A Spark cluster contains 1+ worker nodes aka slave machines (simplified view; I exclude pieces like cluster managers here.)
  • A worker node can run 1+ executors.
  • An executor is a process launched for an application on a worker node, which runs tasks and keeps data in memory or disk storage across them. Each application has its own executors. An executor has a certain amount of cores aka “slots” available to run tasks assigned to it.
  • A task is a unit of work that will be sent to one executor. That is, it runs (part of) the actual computation of your application. The SparkContext sends those tasks for the executors to run. Each task occupies one slot aka core in the parent executor.
  • A receiver (API, docs) is run within an executor as a long-running task. Each receiver is responsible for exactly one so-called input DStream (e.g. an input stream for reading from Kafka), and each receiver – and thus input DStream – occupies one core/slot.
  • An input DStream: an input DStream is a special DStream that connects Spark Streaming to external data sources for reading input data. For each external data source (e.g. Kafka) you need one such input DStream implementation. Once Spark Streaming is “connected” to an external data source via such input DStreams, any subsequent DStream transformations will create “normal” DStreams.

在Spark的执行模型中,每个应用程序有自己的executors,which stay up for the duration of the whole application and run 1+ tasks in multiple threads. 这种分离的实现类似于Storm的执行模型。这种架构变得越来越复杂一旦你介绍cluster managers像YARN或者Mesos,这里我不深讲。看看Spark文档里的Cluster Overview深入了解。

Kafka和Spark Streaming结合

概览

简单说,Spark Streaming 支持Kafka但还是有一些rough edges. 一个好的起步点对我来说是KafkaWordCount 这个例子,在Spark的代码库里。当我读这些代码还是有一些遗留的问题。 我很想理解如何:

  • 并行地_从Kafka读取数据. 在Kafka里, 一个 topic 可以有 _N partitions, 理想情况下我们更想并行的读取这些 N partitions. 这就是Kafka spout in Storm 所做的.
  • _并行地从一个Spark Streaming application_写入数据到Kafka。

On top of those questions I also ran into several known issues in Spark and/or Spark Streaming, most of which have been discussed in the Spark mailing list. I’ll summarize the current state and known issues of the Kafka integration further down below.

Primer on topics, partitions, and parallelism in Kafka

For details see my articles Apache Kafka 0.8 Training Deck and Tutorial and Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node. Kafka 存储数据在 topics,每个topic包括可以配置数量的 partitions。_topic的partitions的数量对于性能的考量是非常重要的,_作为consumer并行处理的上限:如果一个 topic 有 N partitions, 那么你的应用程序只能最多N threads 并行地consume 这个 topic (At least this is the case when you use Kafka’s built-in Scala/Java consumer API.) 当我说“application”时,我应该说成 consumer group在Kafka的术语中。一个consumer group被由你定义的字符串标识,在集群范围内的标识。所有的consumers是同一个consumer group的一部分,共同承担从指定的Kafka topic读取数据,最多允许N(= partitions数)个线程通过同一个group所有的consumers从topic读取。任何多余的线程将处于空闲状态。

多个Kafka consumer groups可以并行运行:当然你可以运行多个独立的逻辑 consumer applications 对于同一个Kafka topic. 每个逻辑application 将会运行它的consumer threads在唯一的consumer group id下.每个 application也可以用不同的并行读取 (见下文). 当我讲到各种方法配置并行读取在随后的单元时,我是指这些逻辑 consumer applications中的单个。

这里有些简单的例子.

  • Your application uses the consumer group id “terran” to read from a Kafka topic “zerg.hydra” that has 10 partitions. If you configure your application to consume the topic with only 1 thread, then this single thread will read data from all 10 partitions.
  • Same as above, but this time you configure 5 consumer threads. Here, each thread will read from 2 partitions.
  • Same as above, but this time you configure 10 consumer threads. Here, each thread will read from a single partition.
  • Same as above, but this time you configure 14 consumer threads. Here, 10 of the 14 threads will read from a single partition each, and the remaining 4 threads will be idle.

让我们介绍一些现实世界的复杂性,在这张简单的图片– Kafka 中的 rebalancing事件. Rebalancing 是贯穿Kafka生命周期的事件,当consumers加入或者离开一个consumer group(there are more conditions that trigger rebalancing but these are not important in this context; see my Kafka training deck for details on rebalancing).

  • 你的应用程序用 consumer group id “terran” 开始 consuming 在 1 线程下. 这个线程从10 partitions读取。在运行时, 你将增加线程数到14。那么, 突然在同一个consumer group有一个并行处理的改变。这将触发 rebalancing 在 Kafka. 当rebalancing 完成,你将有14个线程中的10个, 每个consuming单个partition ,剩余的4个线程将空闲。 你可能猜想到,最初的线程将会仅从一个partition读取,不会在从其他9个partition读取了。

现在我们对于topics,partitions有个基本的理解了, partition的数量是并行从Kafka读取的上限。But what are the resulting implications for an application – such as a Spark Streaming job or Storm topology – that reads its input data from Kafka?

  1. 并行读取: 你通常想要并行地从Kafka的N partitions读取,用N个线程。 And depending on the data volume you want to spread those threads across different NICs, 这通常意味着跨越多台机器. In Storm, this is achieved by setting the parallelism of the Kafka spout to N via TopologyBuilder#setSpout(). The Spark equivalent is a bit trickier, and I will describe how to do this in further detail below.
  2. Downstream processing parallelism: Once retrieved from Kafka you want to process the data in parallel. Depending on your use case this level of parallelism must be different from the read parallelism. If your use case is CPU-bound, for instance, you want to have many more processing threads than read threads; this is achieved by shuffling or “fanning out” the data via the network from the few read threads to the many processing threads. Hence you pay for the access to more cores with increased network communication, serialization overhead, etc. In Storm, you perform such a shuffling via a shuffle groupingfrom the Kafka spout to the next downstream bolt. The Spark equivalent is the repartition transformation on DStreams.

The important takeaway is that it is possible – and often desired – to decouple the level of parallelisms for_reading from Kafka_ and for processing the data once read. In the next sections I will describe the various options you have at your disposal to configure read parallelism and downstream processing parallelism in Spark Streaming.

从Kafka读取

Spark Streaming中的并行读取

就像Kafka Spark Streaming也有_partitions_的概念.重要的是,理解Kafka每个topic 的partition与 the partitions of RDDs in Spark不是对应的。 The KafkaInputDStream of Spark Streaming – aka its Kafka “connector” – uses Kafka’s high-level consumer API, which means you have two 控制选项in Spark that determine read parallelism for Kafka:

  1. The number of input DStreams. Because Spark will run one receiver (= task) per input DStream, this means using multiple input DStreams will parallelize the read operations across multiple cores and thus, hopefully, across multiple machines and thereby NICs.
  2. The number of consumer threads per input DStream. Here, the same receiver (= task) will run multiple threads. That is, read operations will happen in parallel but on the same core/machine/NIC.

实际使用中推荐选项1 为什么呢?第一也是首要原因是从Kafka读取通常受限于network/NIC,例如你通常不会通过增加更多的线程在同一台机器来增加读取量。换句话说,CPU不太可能是从Kafka读取的瓶颈。第二,如果你使用选项2那么多个线程将会产生竞争锁,发送数据到称之为blocks (the += method of BlockGenerator that is used behind the scenes is synchronized on the block generator instance).

Number of partitions of the RDDs created by the input DStreams: The KafkaInputDStream will store individual messages received from Kafka into so-called blocks. From what I understand, a new block is generated every spark.streaming.blockInterval milliseconds, and each block is turned into a partition of the RDD that will eventually be created by the DStream. If this assumption of mine is true, then the number of partitions in the RDDs created by KafkaInputDStream is determined by batchInterval / spark.streaming.blockInterval, where batchInterval is the time interval at which streaming data will be divided into batches (set via a constructor parameter of StreamingContext). For example, if the batch interval is 2 seconds (default) and the block interval is 200ms (default), your RDD will contain 10 partitions. Please correct me if I’m mistaken.

选项1: 控制 input DStreams的数量

下面的例子来源于 Spark Streaming Programming Guide.

val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */)

val numInputDStreams = 5
val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }  

这个例子中我们创建了5个 input DStreams, 使得从Kafka读取数据的负担跨越了5个核,但愿是5个机器/NICs (我说 “但愿”是因为我不确定是否 Spark Streaming task 的放置策略会将receivers在不同的机器。) 所有的input DStreams 是 “terran” consumer group的一部分, Kafka API会确保这5个input DStreams a)会看到这个topic所有可用的数据,因为它分配各个partition到各个 input DStream b) 不会看到重复的数据,因为同一时间一个partition只被分配到一个 input DStream。换句话说, this setup of “collaborating” input DStreams works because of the consumer group behavior provided by the Kafka API, which is used behind the scenes by KafkaInputDStream. 我没在这个例子中展示出的是每个_input DStream_会创建多少个线程,这是通过给 KafkaUtils.createStream 方法传递参数来完成的 (实际输入的topic也是在这个方法的参数里定义的),我们将在下个单元做这件事。 但是在我们继续之前让我强调下几个已知的问题,this setup and with Spark Streaming in particular, which are caused on the one hand by current limitations of Spark in general and on the other hand by the current implementation of the Kafka input DStream in particular:

[当你使用我上面描述的 multi-input-stream实现时, 那么] 这些 consumers 工作在一个 [Kafka] consumer group, 它们尝试决定哪个 consumer consumes 哪个 partitions.它可能 syncpartitionrebalance失败, 那么你只有少量的consumers 真正地 consuming.要减轻这个问题 , 你可以设置rebalance retries很高的数值,并祈祷它奏效.

接下来,我们来看另一个“特性” —如果你的receiver 挂了 (OOM,硬件故障), 你将停止从Kafka接收数据!

spark-user discussionmarkmail.org/message/…

“停止从Kafka接收数据” 的问题需要一些解释( some explanation.) 现在,当你通过 ssc.start() 启动streaming application,进程开始运行并无限持续下去 – 甚至是输入数据源不可用时(例如Kafka)。这样, 不可能判断它与上行数据源失去连接从而不能响应这个事件,例如重连或者停止执行. 类似地如果你失去一个从数据源读取数据的receiver,then your streaming application will generate empty RDDs.

这是非常不幸的情况。一个粗糙的解决办法是重启你的streaming application ,不管是它上行数据源失效或者是一个receiver不可用。这个解决办法可能无法帮到你,如果你的用例须要你设置Kafka的配置选项 auto.offset.reset to “smallest” –因为一个已知的bug在 Spark Streaming,你的streaming application最终的行为可能不是你想要的,查看下文的这个单元 Known issues in Spark Streaming 了解详情。

选项 2: 控制每个input DStream的consumer 线程数

在这个例子中我们创建单个 input DStream ,并配置3个consumer线程执行 – in the same receiver/task and thus on the same core/machine/NIC – to read from the Kafka topic “zerg.hydra”.

val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)

val consumerThreadsPerInputDstream = 3
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)

KafkaUtils.createStream 方法是重载的, 所以有一些不同的方法特征。 In this example we pick the Scala variant that gives us the most control.

组合选项1和2

这是更完整的例子结合了前面提到的两种技术:

val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)

val numDStreams = 5
val topics = Map("zerg.hydra" -> 1)
val kafkaDStreams = (1 to numDStreams).map { _ =>
    KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  }

我们创建5个 input DStreams,每个运行在单独的consumer线程。如果 topic “zerg.hydra” 有5个 partitions (或更少), 那么这通常是最好的方法并行地读取操作,如果你主要考虑最大吞吐量。

Spark Streaming中的下行并行处理

之前的单元我们涵盖了并行地从Kafka读取。 Now we can tackle parallelizing the downstream data processing in Spark. Here, you must keep in mind how Spark itself parallelizes its processing. Like Kafka, Spark ties the parallelism to the number of (RDD) partitions by running one task per RDD partition (sometimes partitions are still called “slices” in the docs).

就像其他的Spark应用: 一旦Spark Streaming应用收到输入的数据,任何更进一步的处理跟non-streaming Spark 应用一样.那么, 你完全用和你处理“normal” Spark data flows完全一样的工具和方式。See Level of Parallelism in Data Processing.

This gives us two control knobs:

  1. The number of input DStreams, i.e. what we receive as a result of the previous sections on read parallelism. This is our starting point, which we can either take as-is or modify with the next option.
  2. The repartition DStream 变换. It returns a new DStream with an increased or decreased level N of parallelism. Each RDD in the returned DStream has exactly N partitions. DStreams are a continuous series of RDDs, and behind the scenes DStream.repartition calls RDD.repartition. The latter “reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.” In other words, DStream.repartitionis very similar to Storm’s shuffle grouping.

Hence repartition is our primary means to decouple read parallelism from processing parallelism. It allows us to set the number of processing tasks and thus the number of cores that will be used for the processing. Indirectly, we also influence the number of machines/NICs that will be involved. 一个相关的 DStream 变换是 union. (这个方法也存在于StreamingContext,where it returns the unified DStream from multiple DStreams of the same type and same slide duration. Most likely you would use the StreamingContext variant.) union会返回一个基于UnionRDD的 UnionDStream. UnionRDD是由所有被联合的RDDs的所有partitions组成, i.e.如果你联合3RDDs每个有10 partitions那么你的联合RDD实例将会包含30partitions。换句话说, union 会把多个DStreams塞入一个DStream/RDD,但是它不会改变并行处理的等级。你是否需要用 union 得根据你的用例是否需要来源于Kafka 的所有partitions“在一个地方”,它主要是因为语义上的需要。像这样的一个例子是当你需要进行不同元素的(全局)计数时。

Note: RDDs are not ordered. 当你 union RDDs, 那么最终的 RDD 不会有个良好的排序。 如果你需要 sort the RDD.

Your use case will determine which knobs and which combination thereof you need to use. Let’s say your use case is CPU-bound. Here, you may want to consume the Kafka topic “zerg.hydra” (which has five Kafka partitions) with a read parallelism of 5 – i.e. 5 receivers with 1 consumer thread each – but bump up the processing parallelism to 20:

val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val readParallelism = 5
val topics = Map("zerg.hydra" -> 1)

val kafkaDStreams = (1 to readParallelism).map { _ =>
    KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  }
// collection of five \*input\* DStreams = handled by five receivers/tasks

val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
// single DStream

val processingParallelism = 20
val processingDStream = unionDStream(processingParallelism)
// single DStream but now with 20 partitions

在下一个单元我们将所有这些片段联系在一起,也会涉及实际的数据处理。

写入到 Kafka

写入到Kafka须要通过 foreachRDD 输出操作来完成:

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to a external system, like saving the RDD to files, or writing it over the network to a database. Note that the function func is executed at the driver, and will usually have RDD actions in it that will force the computation of the streaming RDDs. Note: The remark “the function func is executed at the driver” does not mean that, say, a Kafka producer itself would be run from the driver. Rather, read this remark more as “the function func is_evaluated_ at the driver”. The actual behavior will become more clear once you read Design Patterns for using foreachRDD.

You should read the section Design Patterns for using foreachRDD in the Spark docs, which explains the recommended patterns as well as common pitfalls when using foreachRDD to talk to external systems. In my case, I decided to follow the recommendation to re-use Kafka producer instances across multiple RDDs/batches via a pool of producers. I implemented such a pool with Apache Commons Pool, see PooledKafkaProducerAppFactory. Factories are helpful in this context because of Spark’s execution and serialization model. The pool itself is provided to the tasks via a broadcast variable. The end result looks as follows:

val producerPool = {
  // See the full code on GitHub for details on how the pool is created
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)
}

stream.map { ... }.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    // Get a producer from the shared pool
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
    }
    // Returning the producer to the pool also shuts it down
    producerPool.value.returnObject(p)
  })
})

Keep in mind that Spark Streaming creates many RRDs per minute, each of which contains multiple partitions, so preferably you shouldn’t create new Kafka producers for each partition, let alone for each Kafka message. The setup above minimizes the creation of Kafka producer instances, and also minimizes the number of TCP connections that are being established with the Kafka cluster. You can use this pool setup to precisely control the number of Kafka producer instances that are being made available to your streaming application (if in doubt, use fewer).

Complete example

The code example below is the gist of my example Spark Streaming application (see the full code for details and explanations). Here, I demonstrate how to:

  • Read Avro-encoded data (the Tweet class) from a Kafka topic in parallel. We use a the optimal read parallelism of one single-threaded input DStream per Kafka partition.
  • Deserialize the Avro-encoded data back into pojos, then serializing them back into binary. The serialization is performed via Twitter Bijection.
  • Write the results back into a different Kafka topic via a Kafka producer pool.
// Set up the input DStream to read from Kafka (in parallel)
val kafkaStream = {
  val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
  val kafkaParams = Map(
    "zookeeper.connect" -> "zookeeper1:2181",
    "group.id" -> "spark-streaming-test",
    "zookeeper.connection.timeout.ms" -> "1000")
  val inputTopic = "input-topic"
  val numPartitionsOfInputTopic = 5
  val streams = (1 to numPartitionsOfInputTopic) map { _ =>
    KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
  }
  val unifiedStream = ssc.union(streams)
  val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.
  unifiedStream.repartition(sparkProcessingParallelism)
}

// We use accumulators to track global "counters" across the tasks of our streaming app
val numInputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages consumed")
val numOutputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages produced")
// We use a broadcast variable to share a pool of Kafka producers, which we use to write data from Spark to Kafka.
val producerPool = {
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)
}
// We also use a broadcast variable for our Avro Injection (Twitter Bijection)
val converter = ssc.sparkContext.broadcast(SpecificAvroCodecs.toBinary[Tweet])

// Define the actual data flow of the streaming job
kafkaStream.map { case bytes =>
  numInputMessages += 1
  // Convert Avro binary data to pojo
  converter.value.invert(bytes) match {
    case Success(tweet) => tweet
    case Failure(e) => // ignore if the conversion failed
  }
}.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
      numOutputMessages += 1
    }
    producerPool.value.returnObject(p)
  })
})

// Run the streaming job
ssc.start()
ssc.awaitTermination()

See the full source code for further details and explanations._ Personally, I really like the conciseness and expressiveness of the Spark Streaming code. As Bobby Evans and Tom Graves are eluding to in their talk, the Storm equivalent of this code is more verbose and comparatively lower level: The KafkaStormSpec in kafka-storm-starter wires and runs a Storm topology that performs the same computations. Well, the spec file itself is only a few lines of code once you exclude the code comments, which I only keep for didactic reasons; however, keep in mind that in Storm’s Java API you cannot use Scala-like anonymous functions as I show in the Spark Streaming example above (e.g. the mapand foreach steps). Instead you must write “full” classes – bolts in plain Storm, functions/filters in Storm Trident – to achieve the same functionality, see e.g. AvroDecoderBolt. This feels a bit similar to, say, having to code against Spark’s own API using Java, where juggling with anonymous functions is IMHO just as painful. Lastly, I also liked the Spark documentation. It was very easy to get started, and even some more advanced use is covered (e.g. Tuning Spark). I still had to browse the mailing list and also dive into the source code, but the general starting experience was ok – only the Kafka integration part was lacking (hence this blog post). Good job to everyone involved maintaining the docs!

Spark Streaming的已知问题

现在你可能已经猜到的确有许多尚未解决的问题在 Spark Streaming.我试着总结我所发现的 一方面有一些关于如何正确的从Kafka读取和写入的混乱,这你可以循着邮件列表讨论比如 Multiple Kafka Receivers and UnionHow to scale more consumer to Kafka stream . 另一方面显然还有一些Spark固有的问题在Spark Streaming ,尤其是在故障情况下的数据丢失。换句话说,你不想在生产中遇到的问题 !

  • The current (v1.1) driver in Spark does not recover such raw data that has been received but not processed (source). Here, your Spark application may lose data under certain conditions. Tathagata Das points out that driver recovery should be fixed in Spark v1.2, which will be released around the end of 2014.
  • The current Kafka “connector” of Spark is based on Kafka’s high-level consumer API. One effect of this is that Spark Streaming cannot rely on its KafkaInputDStream to properly replay data from Kafka in case of a downstream data loss (e.g. Spark machines died).
    • Some people even advocate that the current Kafka connector of Spark should not be used in production because it is based on the high-level consumer API of Kafka. Instead Spark should use the simple consumer API (like Storm’s Kafka spout does), which allows you to control offsets and partition assignment deterministically.
  • The Spark community has been working on filling the previously mentioned gap with e.g. Dibyendu Bhattacharya’s kafka-spark-consumer. The latter is a port of Apache Storm’s Kafka spout, which is based on Kafka’s so-called simple consumer API, which provides better replaying control in case of downstream failures.
  • Even given those volunteer efforts, the Spark team would prefer to not special-case data recovery for Kafka, as their goal is “to provide strong guarantee, exactly-once semantics in all transformations” (source), which is understandable. On the flip side it still feels a bit like a wasted opportunity to not leverage Kafka’s built-in replaying capabilities. Tough call!
  • SPARK-1340: In the case of Kafka input DStreams, receivers are not getting restarted if the worker running the receiver fails. So if a worker dies in production, you will simply miss the data the receiver(s) was/were responsible to retrieve from Kafka.
  • See also Failure of a Worker Node for further discussions on data loss scenarios (“lost input data!”) as well as data duplication scenarios (“wrote output data twice!”). Applies to Kafka, too.
  • Spark’s usage of the Kafka consumer parameter auto.offset.reset is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = "smallest", your application will completely re-process all available Kafka data. Doh! See this discussion and that discussion.
  • SPARK-1341: Ability to control the data rate in Spark Streaming. This is relevant in so far that if you are already in trouble because of the other Kafka-relatd issues above (e.g. the auto.offset.resetmisbehavior), then what may happen is that your streaming application must or thinks it must re-process a lot of older data. But since there is no built-in rate limitation this may cause your workers to become overwhelmed and run out of memory.

Apart from those failure handling and Kafka-focused issues there are also scaling and stability concerns. Again, please refer to the Spark and Storm talk of Bobby and Tom for further details. Both of them have more experience with Spark than I do. I also came across one comment that there may be issues with the (awesome!) G1 garbage collector that is available in Java 1.7.0u4+, but I didn’t run into any such issue so far.