DSV2 bytesRead is not updated

Description

recordsRead is updated, however, bytesRead is 0. This looks similar to https://issues.apache.org/jira/browse/SPARK-37585

Environment

The assertion for bytesReads.sum > 0 in this integration test fails:

package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) // Succeeds assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }

Pull Requests

None

Activity

Show:

Details

Assignee

Reporter

Components

Priority

Created July 23, 2024 at 1:44 PM
Updated July 29, 2024 at 2:33 PM

Flag notifications