repartitionByCassandraReplica doesn't seem to be using local-awareness
Description
I'm new to Spark Cassandra Connector, and am trying to learn as much as possible. I learned it has the ability to repartition an rdd to match the replication strategy of an Cassandra keyspace and table. But somehow, this doesn't seem to be working as expected. Am I using it in the wrong way?
Code in Java only.
2 Cassandra nodes in the environment, 192.168.11.61, 192.168.11.62. 3 Spark nodes, 192.168.11.51(master), 192.168.11.61(worker), 192.168.11.62(worker).
The keyspace is set to have only 1 replica of any data (for local-awareness test purpose).
[ keyspace ] CREATE KEYSPACE IF NOT EXISTS mydb WITH REPLICATION={'class':'NetworkTopologyStrategy', 'dc1':1};
Now, by using command tool "./bin/nodetool getendpoints mydb g_url_flow <partition_key>" I figured: 2016-12-01, dev#123 on node 192.168.11.61 2016-12-01, dev#124 on node 192.168.11.61 2016-12-01, dev#125 on node 192.168.11.61 2016-12-02, dev#124 on node 192.168.11.62 2016-12-02, dev#125 on node 192.168.11.62 2016-12-02, dev#126 on node 192.168.11.62
After the job finished, from Spark web GUI, I saw: from 192.168.11.61, web page, spark job output: >>> 2016-12-01, dev#123 >>> 2016-12-02, dev#125 from 192.168.11.62, web page, spark job output: >>> 2016-12-02, dev#124 >>> 2016-12-02, dev#126 >>> 2016-12-01, dev#125 >>> 2016-12-01, dev#124
As you can see here, it is not correctly repartitioned by Cassandra's replica strategy. Why is this? Am I misunderstanding something? Or is this a bug?
Got a heads up from @Dmytro that the wrong byte buffer is getting inserted into the endpoint function. We are passing in the Token when we should just be passing in the byebuffer representing the partition key.
Russell Spitzer April 14, 2017 at 7:35 PM
There are many reasons that a task could be run on the wrong executors but it is probably best to actually check the partition preferred locations. You can do this via
The java function just wraps the scala. These preferred locations are used by the DAGScheduler to match up tasks with executors. If your executor names don't match these then it won't schedule tasks on the correct machines.
I'm new to Spark Cassandra Connector, and am trying to learn as much as possible.
I learned it has the ability to repartition an rdd to match the replication strategy of an Cassandra keyspace and table. But somehow, this doesn't seem to be working as expected. Am I using it in the wrong way?
Code in Java only.
2 Cassandra nodes in the environment, 192.168.11.61, 192.168.11.62.
3 Spark nodes, 192.168.11.51(master), 192.168.11.61(worker), 192.168.11.62(worker).
The keyspace is set to have only 1 replica of any data (for local-awareness test purpose).
[ keyspace ]
CREATE KEYSPACE IF NOT EXISTS mydb WITH REPLICATION={'class':'NetworkTopologyStrategy', 'dc1':1};
[ table ]
CREATE TABLE g_url_flow (date date, dev_id text, hour tinyint, timestamp timestamp, top_domain text, full_domain text, url text, PRIMARY KEY((date, dev_id), hour, timestamp));
[ data ]
date | dev_id | hour | timestamp | full_domain | top_domain | url
---------------------------------------------------------------------------------------------------------------
2016-12-02 | dev#126 | 12 | 2016-12-02 04:05:22.375000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-01 | dev#125 | 12 | 2016-12-01 04:03:25.565000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-02 | dev#125 | 12 | 2016-12-02 04:04:43.512000+0000 | www.xxx.com | xxx.com | www.xxx.com/006.html
2016-12-01 | dev#124 | 12 | 2016-12-01 04:01:21.007000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-01 | dev#124 | 12 | 2016-12-01 04:02:32.244000+0000 | www.xxx.com | xxx.com | www.xxx.com/003.html
2016-12-02 | dev#123 | 12 | 2016-12-02 04:03:25.675000+0000 | www.xxx.com | xxx.com | www.xxx.com/008.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:00:03.306000+0000 | www.xxx.com | xxx.com | www.xxx.com/001.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:00:19.133000+0000 | www.xxx.com | xxx.com | www.xxx.com/002.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:00:57.541000+0000 | www.xxx.com | xxx.com | www.xxx.com/003.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:01:35.121000+0000 | www.xxx.com | xxx.com | www.xxx.com/005.html
2016-12-01 | dev#123 | 12 | 2016-12-01 04:02:11.155000+0000 | www.xxx.com | xxx.com | www.xxx.com/006.html
Now, by using command tool "./bin/nodetool getendpoints mydb g_url_flow <partition_key>" I figured:
2016-12-01, dev#123 on node 192.168.11.61
2016-12-01, dev#124 on node 192.168.11.61
2016-12-01, dev#125 on node 192.168.11.61
2016-12-02, dev#124 on node 192.168.11.62
2016-12-02, dev#125 on node 192.168.11.62
2016-12-02, dev#126 on node 192.168.11.62
However, after I ran the following code (Java):
SparkConf _conf = new SparkConf() .setAppName("my scc 01") .setMaster("spark://192.168.11.51:7077") .set("spark.cassandra.connection.host", "192.168.11.61,192.168.11.62") .set("spark.cassandra.connection.port", "9042"); JavaSparkContext _sc = new JavaSparkContext(_conf); List<GUrlPnk> _pnkList = new ArrayList<GUrlPnk>(); _pnkList.add(new GUrlPnk("2016-12-01", "dev#123")); _pnkList.add(new GUrlPnk("2016-12-01", "dev#124")); _pnkList.add(new GUrlPnk("2016-12-01", "dev#125")); _pnkList.add(new GUrlPnk("2016-12-02", "dev#126")); JavaRDD<GUrlPnk> _pnkRawRdd = _sc.parallelize(_pnkList); JavaRDD<GUrlPnk> _pnkCRdd = javaFunctions(_pnkRawRdd) .repartitionByCassandraReplica( "mydb", "g_url_flow", 10, someColumns("date", "dev_id"), mapToRow( GUrlPnk.class, Pair.of("date", "date"), Pair.of("devId", "dev_id") ) ); // |- print info on each node _pnkCRdd.foreach((GUrlPnk ele) -> { System.out.println(">>> " + ele.getDate() + ", " + ele.getDevId()); });
After the job finished, from Spark web GUI, I saw:
from 192.168.11.61, web page, spark job output:
>>> 2016-12-01, dev#123
>>> 2016-12-02, dev#125
from 192.168.11.62, web page, spark job output:
>>> 2016-12-02, dev#124
>>> 2016-12-02, dev#126
>>> 2016-12-01, dev#125
>>> 2016-12-01, dev#124
As you can see here, it is not correctly repartitioned by Cassandra's replica strategy. Why is this? Am I misunderstanding something? Or is this a bug?