Performance regression on write
Description
Environment
Pull Requests
Activity

Piotr Kołaczkowski March 17, 2015 at 11:54 AMEdited
What about increasing default concurrent.writes to 16, and lowering batch.size.bytes to 4 kB? On a single node I don't see much improvement when doing batches larger than that, as long as concurrent.writes are high. This would also reduce memory consumption on the client.

Russell Spitzer March 16, 2015 at 9:32 PMEdited
Rerun - No Batching in both modes, as expected
batchlevel | batch.rows | batch.buffer | concurrent.writes | time |
---|---|---|---|---|
All | 1 | N/A | 5 | 219 |
All | 1 | N/A | 256 | 143 |
Partition | 1 | 1 | 5 | 216 |
Partition | 1 | 1 | 256 | 138 |
Ok this makes sense.

Jacek March 16, 2015 at 8:36 PM
We should probably mention this somewhere in the docs to increase heap size accordingly. Basically each single task runs its own buffer and the user should be aware of memory requirements of batching.

Cristian March 16, 2015 at 7:27 PMEdited
Nevermind, sorry, was running with too low heap.
-
I've done some further testing, now "none" behaves pretty much like 1.1, which is good.
There is an odd thing with "partition" though: running against Cass localhost it's similar to "none", which is also good. There's not improvement over "none" but perhaps that's expected against single node Cass.
However when run against a real Cass cluster it behaves well with Spark local[1] but with local[4] or local[8] it is several times more expensive and maxes out the CPUs.
This does not reproduce running against localhost, where more threads actually help, as expected, so it's something to do with running against a real cluster.
Unfortunately can't profile on the box that has access to the cluster, maybe someone else can reproduce and profile ?
Just need to use "partition" with local[1] and local[4] agains a real cluster.
-

Russell Spitzer March 16, 2015 at 5:48 PMEdited
Some more single machine tests
These should be equivalent to just doing executeAsync with a limited pool, with 20k rows per Partition
batchlevel | batch.rows | batch.buffer | time |
---|
All | 1 | N/A | 53 |
Partition | 1 | 1 | 174 |
Set the wrong env variable, Created https://datastax-oss.atlassian.net/browse/SPARKC-90#icft=SPARKC-90 to prevent this kind of mistake again
Testing with 1.2.0-alpha3, single node localhost cassandra, local[4] Spark
Writing to a table with
1 small varchar primary key
10 small varchar value columns
Writing 1M rows takes
17 secs on 1.1.0
66 secs on 1.2.0-alpha3
This seems related to token aware optimization however
doing "spark.cassandra.output.batch.level=all" or "spark.cassandra.output.batch.level=partition" makes no difference.
What seems to matter is the number of physical rows, it degrades fast as that increases
Writing 1M RDD records logically grouped by 1K clustering keys shows no perf degradation (but also no improvement with token aware though)
But writing 1M RDD with each record mapping to physical Cass row degrades 3 times between 1.1 to 1.2
My bet would be on a time complexity issue in the batch grouping algorithm.
There's also a small somewhat related bug in logging where it doesn't properly count records logged:
[Executor task launch worker-3] INFO [writer.TableWriter] Wrote 0 rows to test_dimensions4.dims in 65.353 s.
Sorry I can't share my test code, but should be simple to repro.