Cassandra Connection closed using Spark Cassandra Connector in Spark Streaming

Description

Spark job runs every five minutes reading from kafka stream (5 minutes of data) and writing to cassandra database. Most often it is successful, but once every ~25-26 hours the connection to cassandra is closed or less often it was unable to connect (possibly just another flavor of the closed error based on causedby exception). After the exception, the streaming spark job process dies.

Note that we do receive the following log on start-up, but this is by design. We only use the connection once every 5 minutes.

Currently spark.cassandra.connection.keep_alive_ms is set to
5000 ms, which is less than 300000 ms. This will cause connections to
be closed and recreated between batches. If this is not what you intended, increase the value
of spark.cassandra.connection.keep_alive_ms to a larger value.

Here is the connection closed exception:
com.datastax.driver.core.exceptions.TransportException: [cassandra/172.17.0.2:9042] Connection has been closed
at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:38) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:24) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.prepareNextRow(ArrayBackedResultSet.java:313) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.isExhausted(ArrayBackedResultSet.java:269) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.ArrayBackedResultSet$1.hasNext(ArrayBackedResultSet.java:143) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.rdd.reader.PrefetchingResultSetIterator.hasNext(PrefetchingResultSetIterator.scala:21) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) ~[scala-library-2.11.8.jar:na]
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) ~[scala-library-2.11.8.jar:na]
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:99) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_92]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_92]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: com.datastax.driver.core.exceptions.TransportException: [cassandra/172.17.0.2:9042] Connection has been closed
at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1210) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1195) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.Connection.defunct(Connection.java:445) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.Connection$Dispatcher.exceptionCaught(Connection.java:1128) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:291) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:854) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:369) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:934) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:405) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:310) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
... 1 common frames omitted

Here is the unable to open connection exception:
java.io.IOException: Failed to open native connection to Cassandra at {172.17.0.2}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:168) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$8.apply(CassandraConnector.scala:154) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$8.apply(CassandraConnector.scala:154) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:32) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.RefCountedCache.syncAcquire(RefCountedCache.scala:69) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:57) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:79) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.DefaultScanner.<init>(Scanner.scala:27) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.CassandraConnectionFactory$class.getScanner(CassandraConnectionFactory.scala:30) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.getScanner(CassandraConnectionFactory.scala:35) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:361) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:99) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) ~[spark-core_2.11-2.1.0.jar:2.1.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_92]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_92]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra/172.17.0.2:9042 (com.datastax.driver.core.exceptions.TransportException: [cassandra/172.17.0.2:9042] Connection has been closed))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1483) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:399) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:161) ~[spark-cassandra-connector_2.11-2.0.1.jar:2.0.1]
... 28 common frames omitted

Environment

AWS virtual host with fedora OS
Docker container also with fedora connecting to another docker container with cassandra. Both docker containers are running on the same host.

Cassandra version: 3.0.8
Spark version: 2.1.0

Pull Requests

None

Status

Assignee

Unassigned

Reporter

Wally Baggaley

Labels

None

Reviewer

None

Reviewer 2

None

Tester

None

Pull Request

None

Affects versions

Priority

Major
Configure