joinWithCassandraTable throws exception when using aliased join column
Description
Tables
CREATE TABLE k.event (
eventid uuid,
userid text,
PRIMARY KEY (eventid, userid)
)
CREATE TABLE k.user (
id text PRIMARY KEY,
name text
)
When I try to execute this
sc.cassandraTable("k", "event").joinWithCassandraTable("k", "user").on(SomeColumns("userid" as "id")).collect
I get the following exception
java.io.IOException: Column userid not found in table k.user
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.checkSingleColumn$1(CassandraTableRowReaderProvider.scala:60)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$$anonfun$checkColumnsExistence$1.apply(CassandraTableRowReaderProvider.scala:75)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$$anonfun$checkColumnsExistence$1.apply(CassandraTableRowReaderProvider.scala:75)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.checkColumnsExistence(CassandraTableRowReaderProvider.scala:75)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.checkColumnsExistence(CassandraJoinRDD.scala:22)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.joinColumnNames(AbstractCassandraJoin.scala:52)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.joinColumnNames$lzycompute(CassandraJoinRDD.scala:22)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.joinColumnNames(CassandraJoinRDD.scala:22)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.checkValidJoin(AbstractCassandraJoin.scala:68)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.checkValidJoin(CassandraJoinRDD.scala:22)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.getPartitions(AbstractCassandraJoin.scala:184)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.getPartitions(CassandraJoinRDD.scala:22)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
... 53 elided
It works if I rename columns before joining
sc.cassandraTable("k", "event").select("userid" as "id").joinWithCassandraTable("k", "user").on(SomeColumns("id")).collect
Tables
CREATE TABLE k.event ( eventid uuid, userid text, PRIMARY KEY (eventid, userid) ) CREATE TABLE k.user ( id text PRIMARY KEY, name text )
When I try to execute this
sc.cassandraTable("k", "event").joinWithCassandraTable("k", "user").on(SomeColumns("userid" as "id")).collect
I get the following exception
java.io.IOException: Column userid not found in table k.user at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.checkSingleColumn$1(CassandraTableRowReaderProvider.scala:60) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$$anonfun$checkColumnsExistence$1.apply(CassandraTableRowReaderProvider.scala:75) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$$anonfun$checkColumnsExistence$1.apply(CassandraTableRowReaderProvider.scala:75) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.checkColumnsExistence(CassandraTableRowReaderProvider.scala:75) at com.datastax.spark.connector.rdd.CassandraJoinRDD.checkColumnsExistence(CassandraJoinRDD.scala:22) at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.joinColumnNames(AbstractCassandraJoin.scala:52) at com.datastax.spark.connector.rdd.CassandraJoinRDD.joinColumnNames$lzycompute(CassandraJoinRDD.scala:22) at com.datastax.spark.connector.rdd.CassandraJoinRDD.joinColumnNames(CassandraJoinRDD.scala:22) at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.checkValidJoin(AbstractCassandraJoin.scala:68) at com.datastax.spark.connector.rdd.CassandraJoinRDD.checkValidJoin(CassandraJoinRDD.scala:22) at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.getPartitions(AbstractCassandraJoin.scala:184) at com.datastax.spark.connector.rdd.CassandraJoinRDD.getPartitions(CassandraJoinRDD.scala:22) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) ... 53 elided
It works if I rename columns before joining
sc.cassandraTable("k", "event").select("userid" as "id").joinWithCassandraTable("k", "user").on(SomeColumns("id")).collect