Select search mode

 
50 of 194

Something went wrong on our end

If this keeps happening, share this information with your admin, who should contact support.

Hash 19PXJTF Trace 7c92f7d4c96e45fa95c5d678d381faad

Description

None

Environment

None

Pull Requests

None

Details

Assignee

Unassigned

Reporter

N

Reproduced in

Affects versions

Priority

Major

Activity

Show:
Sina Siadat
updated the environmentJuly 29, 2024 at 2:33 PM
The assertion for {{bytesReads.sum > 0}} in this integration test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) // Succeeds assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
The assertion for {{bytesReads.sum > 0}} in this integration test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) // Succeeds assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
Sina Siadat
updated the environmentJuly 29, 2024 at 11:34 AM
The assertion for {{bytesReads.sum > 0}} in this integration test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
The assertion for {{bytesReads.sum > 0}} in this integration test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) // Succeeds assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
Sina Siadat
made 4 changes
July 23, 2024 at 2:23 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test integration test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
The assertion for {{bytesReads.sum > 0}} in this integration test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test integration test (it:test) fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
The assertion for {{bytesReads.sum > 0}} in this test integration test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
The assertion for {{bytesReads.sum > 0}} in this test integration test (it:test) fails: {code:scala}package com.datastax.spark.connector.datasource import scala.collection.mutable import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.CassandraConnector import org.scalatest.BeforeAndAfterEach import com.datastax.spark.connector.datasource.CassandraCatalog import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
Sina Siadat
updated the environmentJuly 23, 2024 at 2:15 PM
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } {code}
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}package com.datastax.spark.connector.datasource import java.util.concurrent.LinkedTransferQueue import com.datastax.oss.driver.api.core.CqlSession import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.{DefaultCluster, SeparateJVM} import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd} import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.concurrent.Eventually import org.scalatest.time.{Seconds, Span} import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.internal.SQLConf import com.datastax.spark.connector.datasource.CassandraCatalog import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach { override lazy val conn = CassandraConnector(defaultConf) override def beforeClass { conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName) } it should "update bytesRead" in { val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } } }{code}
Sina Siadat
made 2 changes
July 23, 2024 at 1:59 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster { override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } }{code}
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } {code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster { override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = spark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) } }{code}
Sina Siadat
made 2 changes
July 23, 2024 at 1:52 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""") session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""") session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(10).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""") session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(1000 * 2).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
Sina Siadat
made 3 changes
July 23, 2024 at 1:47 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""") session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""") for (i <- 1 to 1000 * 3) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsReads } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(10).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""") session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""") for (i <- 1 to 1000 * 10) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(10).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""") session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""") for (i <- 1 to 1000 * 3) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsReads } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(10).collect() println(s"SINA minimal: bytesReads.sum=${bytesReads.sum} recordsReads.sum=${recordsRead.sum}") assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
The assertion for {{bytesReads.sum > 0}} in this test fails: {code:scala}import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import com.datastax.spark.connector.cql.CassandraConnector spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.conf.set("spark.sql.defaultCatalog", "cassandra") override lazy val conn = CassandraConnector(defaultConf) conn.withSessionDo { session => session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""") session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""") for (i <- 1 to 1000 * 3) { session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)") } } val bytesReads = new mutable.ArrayBuffer[Long]() val recordsReads = new mutable.ArrayBuffer[Long]() val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsReads += taskEnd.taskMetrics.inputMetrics.recordsReads } } spark.sparkContext.addSparkListener(bytesReadListener) val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin") try { df.limit(10).collect() assert(recordsReads.sum > 0) assert(bytesReads.sum > 0) // Assertion fails } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }{code}
Summary
DSV2 readBytes is not updated
DSV2 bytesRead is not updated
N
created the IssueJuly 8, 2023 at 1:06 AM

Flag notifications