Cassandra Timeouts Leaking Connections and causing deadlocks
SPARKC-567
Upgrade Cassandra OSS Java Driver Version in the SCC Due to Bug
SPARKC-565
Solr_query Virtual Column Should be Ignored
SPARKC-541
NullPointerException on load RDD
SPARKC-469
SessionProxy not caching Prepared Statements of RegularStatement
SPARKC-237
Regular Statements are Not Cached in Session Proxy
SPARKC-558
connector issue in writing streaming data
SPARKC-562
When using spark 2.4.0 with spark cassandra connector 2.4.0 on kubernetes getting a fatal error.
SPARKC-561
Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
SPARKC-563
Case needed in the StringTypeConverter for returning a properly formatted InetAddress host address
SPARKC-559
Case needed in the DateConverter for converting a java.time.Instant to java.util.Date
SPARKC-560
camelCased Parameters are thrown away in Spark 2.0.X
SPARKC-551
Fix Thread Safety Issue in AnyObjectFactory
SPARKC-550
Support for TTL per insertion while using DataFrame.save method
SPARKC-345
Dataframe implementation for reading/writing row level TTL
SPARKC-548
duplicate entries in a list when using saveToCassandra()
SPARKC-545
org.apache.spark.sql.AnalysisException - on querying a Cassandra table which has many columns
SPARKC-544
Backport SPARKC-503 to 1.6.x
SPARKC-542
Add implicit writer for Spark Row type
SPARKC-475
Fix Query Throttling
SPARKC-503
TimeUUID filter not being pushed down on Cassandra materialized view
SPARKC-537
Spark Connector does not work with Spark 2.3
SPARKC-530
Pushdown filter for composite partition keys doesn't work when IN is used
SPARKC-490
Timestamp.converter do not parse Z and +10:00 timezone format
SPARKC-533
UDT converters optimization
SPARKC-536
spark.cassandra.* properties with camelcases not working when setting as option to sqlContext.read
SPARKC-479
Pass splitCount to CassandraSourceRelation
SPARKC-527
Long Where Clauses Throw Stack Overflow Exceptions
SPARKC-532
Allow SQL options without a "cluster/" prefix
SPARKC-531
Add Timestamp converter, improve LongConverter for better SparkSQL support
SPARKC-522
CustomDriverConverter should support DataFrames conversions
SPARKC-440
Provide Interface for Deleting Columns based on PK
SPARKC-349
spark-cassandra connectivity with cloudera distribution
SPARKC-509
Release spark connector compatible with Cassandra Java Driver 3.2 or higher
SPARKC-515
Unable to convert nulls in nested Java Bean Classes
SPARKC-318
Allow 'spark.cassandra.concurrent.reads' to be read from SparkConf
SPARKC-520
Connector does not load all data from Cassandra table
SPARKC-508
Cannot use Null for UDT
SPARKC-426
Example in doc doesn't work
SPARKC-529
spark is stuck on one big task
SPARKC-518
Adding Idempotent Retries and Increasing Retry Amount
SPARKC-507
Support partially specified writes from case classes
SPARKC-513
WriteConf Settings not used with Update and Delete Query Templates
SPARKC-505
Java 7 / 8 Support Clarification
SPARKC-462
add metrics for write batch size
SPARKC-501
Add missing type conversions for java.time.LocalDate
SPARKC-495
LocalNodeFirstLoadBalancingPolicy.sortNodesByStatusAndProximity always returns nodes in the same order
SPARKC-496
repartitionByCassandraReplica doesn't seem to be using local-awareness
SPARKC-485
MultipleRetry policy may retry with an incorrect Consistency level
SPARKC-494
connection.local_dc is not taken into account for token unaware queries
SPARKC-448
issue 1 of 444

Cassandra Timeouts Leaking Connections and causing deadlocks

Description

This seems to affect both 2.4.1 and 2.4.2 which we recently upgraded to when we started experiencing these issues. We are now on 2.4.2.

We've been experiencing a series of Spark job freezes recently as we've started seeing some read timeouts from Cassandra.

An example: this single stage has been "running" for 19 hours straight with no progression:

Note that there's 47 failed tasks. Some of those tasks are dead executors, some are timeouts reading from Cassandra.

Upon inspecting the tasks themselves, I noticed that there are 47 failed tasks and 21 running tasks. When inspecting the executors, we noticed there were two different types of "deadlocks". Note that we have 3 cores per executor.

1 of the threads (cores) is trying to establish a connection to Cassandra, the other two are waiting for the lock to be released:

You might notice the CustomizedCassandraConnectionFactory. Here is that class:

So nothing too crazy there (I think)

OR

1 of the threads is trying to create a PreparedStatement, and the other 2 are waiting to do one as well. We also noticed in the thread dump here a ThreadPool trying to call CassandraConnector#destroySession:

If it helps, only 1 executor had the prepared statement lock, all the others had the open connection lock.

The reason we think there are connection leaks is we did a netstat on the Cassandra nodes and it gave us back this:

Earlier this morning:

Number of Open Connections to Remote Host

IP Address of Remote Host

213

172.29.157.145

710

172.29.157.13

561

172.29.151.183

44

172.29.144.248

316

172.29.143.136

710

172.29.139.164

32

172.29.136.234

Later in the afternoon

Number of Open Connections to Remote Host

IP Address of Remote Host

212

172.29.157.145

710

172.29.157.13

736

172.29.151.183

584

172.29.144.248

316

172.29.143.136

710

172.29.139.164

387

172.29.136.234

Those IP addresses are the IPs of the spark worker nodes, which each have 30 cores, 3 cores per executor, so 10 executors per worker.
These numbers alone don't seem too crazy, since I think the appropriate number of connections per executor is 71 based on our configuration, however, if you look at the number of connections per PID on the workers, things seem really out of control:

Connections open to Cassandra

PID of process

8

129730

8

129477

8

129473

2

129181

32

129180

2

125796

339

79272

16

50754

266

40046

Ignoring the low numbers, that seems really high for that JVM. I checked a few hours later again and here are the new numbers on the same host:

Connections open to Cassandra

PID of process

25

129730

25

129477

25

129473

355

79272

16

23093

So something is definitely going wrong here. In this job we are querying Cassandra using the cassandraTable implicit defined on SparkContext if that helps.

Environment

None

Pull Requests

None

Status

Assignee

Piotr Kołaczkowski

Reporter

Ed Mitchell

Labels

None

Reviewer

None

Reviewer 2

None

Tester

None

Pull Request

None

Components

Affects versions

Priority

Major
Configure