repartitionByCassandraReplica doesn't seem to be using local-awareness

Description

I'm new to Spark Cassandra Connector, and am trying to learn as much as possible.
I learned it has the ability to repartition an rdd to match the replication strategy of an Cassandra keyspace and table. But somehow, this doesn't seem to be working as expected. Am I using it in the wrong way?

Code in Java only.

2 Cassandra nodes in the environment, 192.168.11.61, 192.168.11.62.
3 Spark nodes, 192.168.11.51(master), 192.168.11.61(worker), 192.168.11.62(worker).

The keyspace is set to have only 1 replica of any data (for local-awareness test purpose).

[ keyspace ]
CREATE KEYSPACE IF NOT EXISTS mydb WITH REPLICATION={'class':'NetworkTopologyStrategy', 'dc1':1};

[ table ]
CREATE TABLE g_url_flow (date date, dev_id text, hour tinyint, timestamp timestamp, top_domain text, full_domain text, url text, PRIMARY KEY((date, dev_id), hour, timestamp));

[ data ]
date | dev_id | hour | timestamp | full_domain | top_domain | url
---------------------------------------------------------------------------------------------------------------
2016-12-02 | dev#126 | 12 | 2016-12-02 04:05:22.375000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-01 | dev#125 | 12 | 2016-12-01 04:03:25.565000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-02 | dev#125 | 12 | 2016-12-02 04:04:43.512000+0000 | www.xxx.com | xxx.com | www.xxx.com/006.html
2016-12-01 | dev#124 | 12 | 2016-12-01 04:01:21.007000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-01 | dev#124 | 12 | 2016-12-01 04:02:32.244000+0000 | www.xxx.com | xxx.com | www.xxx.com/003.html
2016-12-02 | dev#123 | 12 | 2016-12-02 04:03:25.675000+0000 | www.xxx.com | xxx.com | www.xxx.com/008.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:00:03.306000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:00:19.133000+0000 | www.xxx.com | xxx.com | www.xxx.com/002.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:00:57.541000+0000 | www.xxx.com | xxx.com | www.xxx.com/003.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:01:35.121000+0000 | www.xxx.com | xxx.com | www.xxx.com/005.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:02:11.155000+0000 | www.xxx.com | xxx.com | www.xxx.com/006.html

Now, by using command tool "./bin/nodetool getendpoints mydb g_url_flow <partition_key>" I figured:
2016-12-01, dev#123 on node 192.168.11.61
2016-12-01, dev#124 on node 192.168.11.61
2016-12-01, dev#125 on node 192.168.11.61
2016-12-02, dev#124 on node 192.168.11.62
2016-12-02, dev#125 on node 192.168.11.62
2016-12-02, dev#126 on node 192.168.11.62

However, after I ran the following code (Java):

SparkConf _conf = new SparkConf() .setAppName("my scc 01") .setMaster("spark://192.168.11.51:7077") .set("spark.cassandra.connection.host", "192.168.11.61,192.168.11.62") .set("spark.cassandra.connection.port", "9042"); JavaSparkContext _sc = new JavaSparkContext(_conf); List<GUrlPnk> _pnkList = new ArrayList<GUrlPnk>(); _pnkList.add(new GUrlPnk("2016-12-01", "dev#123")); _pnkList.add(new GUrlPnk("2016-12-01", "dev#124")); _pnkList.add(new GUrlPnk("2016-12-01", "dev#125")); _pnkList.add(new GUrlPnk("2016-12-02", "dev#126")); JavaRDD<GUrlPnk> _pnkRawRdd = _sc.parallelize(_pnkList); JavaRDD<GUrlPnk> _pnkCRdd = javaFunctions(_pnkRawRdd) .repartitionByCassandraReplica( "mydb", "g_url_flow", 10, someColumns("date", "dev_id"), mapToRow( GUrlPnk.class, Pair.of("date", "date"), Pair.of("devId", "dev_id") ) ); // |- print info on each node _pnkCRdd.foreach((GUrlPnk ele) -> { System.out.println(">>> " + ele.getDate() + ", " + ele.getDevId()); });

After the job finished, from Spark web GUI, I saw:
from 192.168.11.61, web page, spark job output:
>>> 2016-12-01, dev#123
>>> 2016-12-02, dev#125
from 192.168.11.62, web page, spark job output:
>>> 2016-12-02, dev#124
>>> 2016-12-02, dev#126
>>> 2016-12-01, dev#125
>>> 2016-12-01, dev#124

As you can see here, it is not correctly repartitioned by Cassandra's replica strategy. Why is this? Am I misunderstanding something? Or is this a bug?

Environment

Spark 2.0.2
Cassandra 3.9
SparkCassandraConnector 2.0.1
Oracle JDK 8u91
Eclipse
Java 8

Pull Requests

https://github.com/datastax/spark-cassandra-connector/pull/1125

Activity

Show:

Russell Spitzer June 14, 2017 at 10:43 PM

Got a heads up from that the wrong byte buffer is getting inserted into the endpoint function. We are passing in the Token when we should just be passing in the byebuffer representing the partition key.

Russell Spitzer April 14, 2017 at 7:35 PM

There are many reasons that a task could be run on the wrong executors but it is probably best to actually check the partition preferred locations. You can do this via

scala> rdd.partitions.foreach(part => println(rdd.getPreferredLocations(part))) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost) ArrayBuffer(127.0.0.1, localhost)

The java function just wraps the scala. These preferred locations are used by the DAGScheduler to match up tasks with executors. If your executor names don't match these then it won't schedule tasks on the correct machines.

Fixed

Details

Assignee

Reporter

Fix versions

Reviewer

Components

Affects versions

Priority

Created April 14, 2017 at 5:49 AM
Updated July 9, 2017 at 2:55 PM
Resolved July 7, 2017 at 5:40 PM