Connection factory configuration is ignored for cluster level settings

Description

When specifying a cluster level connection configuration, org.apache.spark.sql.cassandra.CassandraSourceRelation.consolidateConfs does not consolidate spark.cassandra.connection.factory configuration.

The spark.cassandra.connection.factory configuration still works as intended when it is set for the SparkContext but this bug prevents us from using cluster level configuration (e.g. when each cluster has different client-node encryption key stores).

To reproduce:

sqlContext.setConf("ClusterA/spark.cassandra.connection.host", "10.0.0.10")
sqlContext.setConf("ClusterA/spark.cassandra.auth.username", "bbrodriguez")
sqlContext.setConf("ClusterA/spark.cassandra.auth.password", "bitemyshinymetalass")
sqlContext.setConf("ClusterA/spark.cassandra.connection.factory", "com.example.spark.S3ConnectionFactory")
sqlContext.setConf("ClusterA/spark.cassandra.connection.ssl.enabled", "true")
sqlContext.setConf("ClusterA/spark.cassandra.connection.ssl.trustStore.password", "supersecurepassword")
sqlContext.setConf("ClusterA/spark.cassandra.connection.ssl.trustStore.path", "s3://bucket/cluster-a.jks")

dataset.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "my_table", "keyspace" -> "ks", "cluster" -> "ClusterA"))
.save()

This will result in an exception:

java.io.IOException: Failed to open native connection to Cassandra at {10.0.0.10}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:163)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory$.forSystemLocalPartitioner(TokenFactory.scala:98)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:255)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:82)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 52 elided
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.0.0.10:9042 (com.datastax.driver.core.exceptions.TransportException: /10.0.0.10 Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1424)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:403)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:156)
... 63 more

To check whether the conf was consolidated properly:

sqlContext.setConf("ClusterA/spark.cassandra.connection.host", "10.0.0.10")
sqlContext.setConf("ClusterA/spark.cassandra.connection.factory", "com.example.spark.S3ConnectionFactory")

val consolidatedConfs = org.apache.spark.sql.cassandra.CassandraSourceRelation.consolidateConfs(sc.getConf, sqlContext.getAllConfs, TableRef("some_table", "keyspace_name", Some("ClusterA")), Map[String, String]())

consolidatedConfs.getOption("spark.cassandra.connection.host") // correctly returns Some("10.0.0.10")
consolidatedConfs.getOption("spark.cassandra.connection.factory") // returns None

My guess is ConnectionFactoryParam was forgotten in CassandraConnectorConf which is why DefaultSource.confProperties didn't return it as part of the property set.

Pull Requests

None

Status

Assignee

Alex Liu

Reporter

Andrew Jo

Labels

Reviewer

None

Reviewer 2

None

Tester

None

Pull Request

None

Components

Affects versions

Priority

Major
Configure