saveToCassandra blocked

Description

Trying to upgrade to latest version of cassandra connector and spark. "saveToCassandra" simply blocked on the streaming job

at "foreachRDD at DStreamFunctions.scala:54"

The same works fine without any problem on spark/connector version 1.4.1". The call below trying to save a Stream to Cassandra

events.saveToCassandra(
"keyspace",
"table_name",
SomeColumns( "......." ),
writeConf = WriteConf(ttl = TTLOption.constant(Constants.DEFAULT_TTL)))

Environment

Components

  • Cassandra : version 2.2.4

  • Streaming application with versions

<spark.version>1.6.0</spark.version>
<java.driver.version>3.0.0</java.driver.version>
<spark.cassandra.connector.version>1.5.0</spark.cassandra.connector.version>

Pull Requests

None

Attachments

4

Activity

Brian Cantoni 
April 22, 2016 at 4:02 PM
(edited)

this should be fixed now by SPARKC-330. The problem came when the connector could not achieve the desired consistency level when writing to Cassandra. The failure should happen right away so it will be more obvious when the consistency level can't be met.

Ravindar Roopreddy 
March 12, 2016 at 7:54 AM

To be precede, I got the following error after 15 min or so

16/03/11 23:52:10 ERROR [cluster1-timeouter-0] writer.QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@bf0cd36
com.datastax.driver.core.exceptions.OperationTimedOutException: [localhost/127.0.0.1] Timed out waiting for server response
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onTimeout(RequestHandler.java:763)
at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:1260)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:655)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
at java.lang.Thread.run(Thread.java:744)

[Stage 1:> (0 + 3) / 5]16/03/11 23:52:27 ERROR [cluster1-nio-worker-1] writer.QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@59cd1248
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
at com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:114)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:354)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:744)
Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 15 more

Ravindar Roopreddy 
March 12, 2016 at 7:33 AM

Here are the steps

1) We had a data dump on test in stance with 3-cluster nodes and replication factor of 2 as below

CREATE KEYSPACE test_ks WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '2'
};

2) Run single node Cassandra cluster on local machine

3) Use sstableloader to load all 3-nodes data from step (1)

4) Run Spark Streaming app against Cassandra from (2) - app blocked (with error about replication factor emitted after 10 min)

5) Alter key space like below on cassadra from (2)

ALTER KEYSPACE test_ks WITH REPLICATION =
... { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }

6) Run Spark Streaming app against Cassandra from (5) - app runs as expected now

Russell Spitzer 
March 12, 2016 at 1:54 AM

If you could give more info about the "replication factor error" we can see if we can surface that much earlier. I would have hoped an error like that would be loud and announce it self very quickly disappointed face

Ravindar Roopreddy 
March 11, 2016 at 11:55 PM

I may have found the problem. Cassandra save was blocked because the local DB was setup with as single node with data restored from a 3 node system. The driver printed an error after a long time that 'replication_factor of 1' did not match what the restored data says, i.e: 2. Once alter the replication_factor to '1' it started to work

I will update if I see any new problems

Fixed

Details

Assignee

Reporter

Fix versions

Components

Affects versions

Priority

Created February 26, 2016 at 6:14 PM
Updated May 11, 2016 at 1:32 PM
Resolved April 22, 2016 at 4:02 PM

Flag notifications