DSV2 bytesRead is not updated
Description
Environment
The assertion for bytesReads.sum > 0
in this integration test fails:
package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0) // Succeeds
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}
Pull Requests
None
Activity
Show:
recordsRead is updated, however, bytesRead is 0. This looks similar to https://issues.apache.org/jira/browse/SPARK-37585