saveToCassandra blocked
Description
Environment
Components
Cassandra : version 2.2.4
Streaming application with versions
<spark.version>1.6.0</spark.version>
<java.driver.version>3.0.0</java.driver.version>
<spark.cassandra.connector.version>1.5.0</spark.cassandra.connector.version>
Pull Requests
Attachments
Activity
Brian Cantoni April 22, 2016 at 4:02 PM(edited)
@Ravindar Roopreddy this should be fixed now by SPARKC-330. The problem came when the connector could not achieve the desired consistency level when writing to Cassandra. The failure should happen right away so it will be more obvious when the consistency level can't be met.
Ravindar Roopreddy March 12, 2016 at 7:54 AM
To be precede, I got the following error after 15 min or so
16/03/11 23:52:10 ERROR [cluster1-timeouter-0] writer.QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@bf0cd36
com.datastax.driver.core.exceptions.OperationTimedOutException: [localhost/127.0.0.1] Timed out waiting for server response
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onTimeout(RequestHandler.java:763)
at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:1260)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:655)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
at java.lang.Thread.run(Thread.java:744)
[Stage 1:> (0 + 3) / 5]16/03/11 23:52:27 ERROR [cluster1-nio-worker-1] writer.QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@59cd1248
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
at com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:114)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:354)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:744)
Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 15 more
Ravindar Roopreddy March 12, 2016 at 7:33 AM
Here are the steps
1) We had a data dump on test in stance with 3-cluster nodes and replication factor of 2 as below
CREATE KEYSPACE test_ks WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '2'
};
2) Run single node Cassandra cluster on local machine
3) Use sstableloader to load all 3-nodes data from step (1)
4) Run Spark Streaming app against Cassandra from (2) - app blocked (with error about replication factor emitted after 10 min)
5) Alter key space like below on cassadra from (2)
ALTER KEYSPACE test_ks WITH REPLICATION =
... { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
6) Run Spark Streaming app against Cassandra from (5) - app runs as expected now
Russell Spitzer March 12, 2016 at 1:54 AM
If you could give more info about the "replication factor error" we can see if we can surface that much earlier. I would have hoped an error like that would be loud and announce it self very quickly
Ravindar Roopreddy March 11, 2016 at 11:55 PM
I may have found the problem. Cassandra save was blocked because the local DB was setup with as single node with data restored from a 3 node system. The driver printed an error after a long time that 'replication_factor of 1' did not match what the restored data says, i.e: 2. Once alter the replication_factor to '1' it started to work
I will update if I see any new problems
Trying to upgrade to latest version of cassandra connector and spark. "saveToCassandra" simply blocked on the streaming job
at "foreachRDD at DStreamFunctions.scala:54"
The same works fine without any problem on spark/connector version 1.4.1". The call below trying to save a Stream to Cassandra
events.saveToCassandra(
"keyspace",
"table_name",
SomeColumns( "......." ),
writeConf = WriteConf(ttl = TTLOption.constant(Constants.DEFAULT_TTL)))