Performance regression on write

Description

Testing with 1.2.0-alpha3, single node localhost cassandra, local[4] Spark

Writing to a table with

  • 1 small varchar primary key

  • 10 small varchar value columns

Writing 1M rows takes

  • 17 secs on 1.1.0

  • 66 secs on 1.2.0-alpha3

This seems related to token aware optimization however
doing "spark.cassandra.output.batch.level=all" or "spark.cassandra.output.batch.level=partition" makes no difference.

What seems to matter is the number of physical rows, it degrades fast as that increases

  • Writing 1M RDD records logically grouped by 1K clustering keys shows no perf degradation (but also no improvement with token aware though)

  • But writing 1M RDD with each record mapping to physical Cass row degrades 3 times between 1.1 to 1.2

My bet would be on a time complexity issue in the batch grouping algorithm.

There's also a small somewhat related bug in logging where it doesn't properly count records logged:

[Executor task launch worker-3] INFO [writer.TableWriter] Wrote 0 rows to test_dimensions4.dims in 65.353 s.

Sorry I can't share my test code, but should be simple to repro.

Environment

None

Pull Requests

None

Activity

Show:

Piotr Kołaczkowski March 17, 2015 at 11:54 AM
Edited

What about increasing default concurrent.writes to 16, and lowering batch.size.bytes to 4 kB? On a single node I don't see much improvement when doing batches larger than that, as long as concurrent.writes are high. This would also reduce memory consumption on the client.

Russell Spitzer March 16, 2015 at 9:32 PM
Edited

Rerun - No Batching in both modes, as expected

batchlevel

batch.rows

batch.buffer

concurrent.writes

time

All

1

N/A

5

219

All

1

N/A

256

143

Partition

1

1

5

216

Partition

1

1

256

138

Ok this makes sense.

Jacek March 16, 2015 at 8:36 PM

We should probably mention this somewhere in the docs to increase heap size accordingly. Basically each single task runs its own buffer and the user should be aware of memory requirements of batching.

Cristian March 16, 2015 at 7:27 PM
Edited

Nevermind, sorry, was running with too low heap.

-

I've done some further testing, now "none" behaves pretty much like 1.1, which is good.

There is an odd thing with "partition" though: running against Cass localhost it's similar to "none", which is also good. There's not improvement over "none" but perhaps that's expected against single node Cass.

However when run against a real Cass cluster it behaves well with Spark local[1] but with local[4] or local[8] it is several times more expensive and maxes out the CPUs.

This does not reproduce running against localhost, where more threads actually help, as expected, so it's something to do with running against a real cluster.

Unfortunately can't profile on the box that has access to the cluster, maybe someone else can reproduce and profile ?

Just need to use "partition" with local[1] and local[4] agains a real cluster.
-

Russell Spitzer March 16, 2015 at 5:48 PM
Edited

Some more single machine tests

These should be equivalent to just doing executeAsync with a limited pool, with 20k rows per Partition

batchlevel

batch.rows

batch.buffer

time

All

1

N/A

53

Partition

1

1

174

Set the wrong env variable, Created https://datastax-oss.atlassian.net/browse/SPARKC-90#icft=SPARKC-90 to prevent this kind of mistake again

Fixed

Details

Assignee

Reporter

Reviewer

Pull Request

Tester

Components

Affects versions

Priority

Created March 12, 2015 at 3:51 PM
Updated March 19, 2015 at 8:05 PM
Resolved March 19, 2015 at 8:05 PM