Issues
- $session->execute('INSERT ....') returns NULL instead of throwing an Exception, when passing an array as a parameterPHP-241
- PHP 8PHP-239
- Future of PHP DriverPHP-238
- New Tag for DataStax PHP Driver on GitHubPHP-237Resolved issue: PHP-237
- Cassandra PHP Driver hangsPHP-235
- Numeric types cannot perform cast or arithmetic operations on PHP 7.3+PHP-234
- Update Ubuntu and RHEL releasesPHP-233
- Update TravisCI and implement Jenkins CI support (excludes releases)PHP-232Michael Fero
- Limit PHP versions to officially supported PHP versionPHP-231Michael Fero
- Update Windows automated build script for PHP 7.2+ not working for ms stduio 2017 shows compilation errorPHP-230Resolved issue: PHP-230
- PHP driver relies on removed HAVE_SPL symbolPHP-229
- Release a new version via PECLPHP-228
- Prepared statements with named placeholdersPHP-227
- Update Windows automated build script for PHP 7.2+PHP-226
- Compile Warnings with PHP7.3PHP-225
- Cannot query in Keyspace with Uppercase charactersPHP-224
- DefaultSession option parameters incorrectly documentedPHP-223
- Fix RPM spec cpp-driver dependenciesPHP-222
- Allow for builds against PHP v7.xPHP-221Resolved issue: PHP-221
- PHP quick start example shouldn't use schema tablesPHP-220
- Mark Downgrading Consistency Retry Policy as deprecatedPHP-219
- Same timeuuid when creating it from timestamp for same timestampPHP-218
- Too many open connections in php-fpmPHP-215
- PHP driver fails with fatal error when printing Decimal object with negative scale attributePHP-214
- Remove reserved words from class names and functions, etc.PHP-212
- Require the local datacenter when contact points are specifiedPHP-209
- Support async queries with loop/promise implementationsPHP-208
- Update cluster naming for CCM feature and integration testsPHP-207
- Update CI builds to build the driver extension staticallyPHP-206
- Update build.yaml to include a connection testPHP-204Arun Chennadi
- Automate php-driver builds via adding a build.yaml and setting up jobs on jenkins.devtoolsPHP-203Arun Chennadi
- Adding documentation for binary installationsPHP-202
- Remove support for libuv v0.10.xPHP-201
- Update DSE README to contain relevant information from OSSPHP-200
- Investigate making all links work in both GitHub markup and DS docsPHP-199
- Merge DSE fixes to corePHP-198Resolved issue: PHP-198
- Support initializing TimeUuid from uuid stringPHP-197
- pecl install fails on debian jessyPHP-194
- Build the php driver for Ubuntu 16.04 with libuv1 as a dependencyPHP-193
- Cassandra\DefaultColumn::isReversed is deprecatedPHP-192Resolved issue: PHP-192Benjamin Roth
- event loopPHP-190Resolved issue: PHP-190
- Set of map : bad hash calculationPHP-189Resolved issue: PHP-189
- Merge fixes to documentation and examples from DSEPHP-188Resolved issue: PHP-188
- Merge duration type bug fix from DSEPHP-187Resolved issue: PHP-187
- Fix comment casing in yaml documentationPHP-186
- Remove {@inheritDoc} from method parametersPHP-184Resolved issue: PHP-184
- Docs: Add deprecated to ExecutionOptionsPHP-180Resolved issue: PHP-180
- DateRange constructor doc not rendered properly by doxygen/documentorPHP-178Resolved issue: PHP-178
- Set and Map unit tests should add more than one valuePHP-176
- type method on PHP classes should be staticPHP-175
50 of 194
Something went wrong on our end
If this keeps happening, share this information with your admin, who should contact support.
Hash 19PXJTF
Trace 7c92f7d4c96e45fa95c5d678d381faad
Description
None
Environment
None
Pull Requests
None
Activity
Show:
Sina Siadat
updated the environmentJuly 29, 2024 at 2:33 PMThe assertion for {{bytesReads.sum > 0}} in this integration test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0) // Succeeds
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
The assertion for {{bytesReads.sum > 0}} in this integration test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0) // Succeeds
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
Sina Siadat
updated the environmentJuly 29, 2024 at 11:34 AMThe assertion for {{bytesReads.sum > 0}} in this integration test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
The assertion for {{bytesReads.sum > 0}} in this integration test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0) // Succeeds
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
Sina Siadat
made 4 changesJuly 23, 2024 at 2:23 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test integration test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
The assertion for {{bytesReads.sum > 0}} in this integration test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test integration test (it:test) fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
The assertion for {{bytesReads.sum > 0}} in this test integration test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
The assertion for {{bytesReads.sum > 0}} in this test integration test (it:test) fails:
{code:scala}package com.datastax.spark.connector.datasource
import scala.collection.mutable
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import org.scalatest.BeforeAndAfterEach
import com.datastax.spark.connector.datasource.CassandraCatalog
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
Sina Siadat
updated the environmentJuly 23, 2024 at 2:15 PMThe assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
{code}
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}package com.datastax.spark.connector.datasource
import java.util.concurrent.LinkedTransferQueue
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.{DefaultCluster, SeparateJVM}
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Seconds, Span}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.internal.SQLConf
import com.datastax.spark.connector.datasource.CassandraCatalog
import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with BeforeAndAfterEach {
override lazy val conn = CassandraConnector(defaultConf)
override def beforeClass {
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
spark.conf.set("spark.sql.catalog.cassandra", classOf[CassandraCatalog].getCanonicalName)
}
it should "update bytesRead" in {
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}
}{code}
Sina Siadat
made 2 changesJuly 23, 2024 at 1:59 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}{code}
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
class CassandraCatalogMetricsSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = spark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}
}{code}
Sina Siadat
made 2 changesJuly 23, 2024 at 1:52 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(10).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(1000 * 2).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
Sina Siadat
made 3 changesJuly 23, 2024 at 1:47 PM
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""")
for (i <- 1 to 1000 * 3) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsReads
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(10).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""")
for (i <- 1 to 1000 * 10) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsRead
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(10).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
Environment
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""")
for (i <- 1 to 1000 * 3) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsReads
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(10).collect()
println(s"SINA minimal: bytesReads.sum=${bytesReads.sum} recordsReads.sum=${recordsRead.sum}")
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
The assertion for {{bytesReads.sum > 0}} in this test fails:
{code:scala}import scala.collection.mutable
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import com.datastax.spark.connector.cql.CassandraConnector
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.defaultCatalog", "cassandra")
override lazy val conn = CassandraConnector(defaultConf)
conn.withSessionDo { session =>
session.execute(s"""CREATE KEYSPACE IF NOT EXISTS $ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute(s"""CREATE TABLE IF NOT EXISTS $ks.leftjoin (key INT, x INT, PRIMARY KEY (key))""")
for (i <- 1 to 1000 * 3) {
session.execute(s"INSERT INTO $ks.leftjoin (key, x) values ($i, $i)")
}
}
val bytesReads = new mutable.ArrayBuffer[Long]()
val recordsReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
recordsReads += taskEnd.taskMetrics.inputMetrics.recordsReads
}
}
spark.sparkContext.addSparkListener(bytesReadListener)
val df = metricsSpark.sql(s"SELECT * FROM $ks.leftjoin")
try {
df.limit(10).collect()
assert(recordsReads.sum > 0)
assert(bytesReads.sum > 0) // Assertion fails
} finally {
spark.sparkContext.removeSparkListener(bytesReadListener)
}{code}
Summary
DSV2 readBytes is not updated
DSV2 bytesRead is not updated
N
created the IssueJuly 8, 2023 at 1:06 AM