Expose node token and range information

Description

The driver already read the host tokens and generally compute token ranges for the sake of TokenAwarePolicy. For some type of applications (hadoop mainly), it can be useful for a client to get that information in an actionable way (to create proper splits for instance), so we should expose it somehow.

Environment

None

Pull Requests

None

Activity

Show:
Sylvain Lebresne
January 21, 2015, 11:59 AM

I think it makes sense that merging the result of a split gives you back the original range.

Absolutely.

there is no predefined max token that would allow us to compute the splits

For the record, I'll note that there is an actual max token in practice with BOP since partition keys are limited to 64K. But I agree that it's an edge case so I don't mind if you want to leave that out for now.

Regarding getTokenMap(), one interesting property is that it's atomic, which is not the case if we need to iterate over all hosts. I think this could be important to some clients.

It's not really atomic in a meaningful sense. It's atomic from the point of view of the driver, but the driver view is not atomic from the cluster point of view because the token map uses informations from 2 separate queries (to the 'local' and to the 'peers' table). Even without that, token change ownership rarely and through heavy operations and those changes are not propagated to the cluster atomically, and the driver only ever provided the view of a single node. So that overall, I strongly doubt any client could meaningfully rely on any sort of atomicity regaring the token map (what would probably be useful however would be a way to register to get notified on topology updates).

Overall, I think having a getTokens to Host make more sense, and that once we have that we don't need getTokenMap for all practical intent and purpose. And I do find that method weird from an API standpoint for some reason. I'll further add that keeping the API smaller is better, and that it's easier to add things back later if it proves necessary than to remove redundant stuff. So anyway, I don't think we should include it, at least not initially, but if you really feel strongly about it, I guess it's not a big deal.

Andreas Petter
January 21, 2015, 3:58 PM

Thank you, Olivier. Just to understand how to use this. Is it correct that calculating all splits for a given keyspace can be done similar to the following:

Some questions to prepare answers for: Referring to aforementioned concurrency problems: will i need to handle concurrent changes? how will i detect them (e.g. move token completes while performing splitEvenly, etc. - register listeners on the driver)? In case of an auto-paging ResultSet is it o.k. to set someNumber to 1 or will this potentially result in Cassandra go OOM? How do i set this number?

Olivier Michallat
January 23, 2015, 5:36 PM

Yes, or you could start from the token ranges. I suppose you don't send each split to all possible replicas, so that will be more convenient:

Regarding estimateNumberOfSplits, this is what the Spark connector currently does with the Thrift operation describe_splits_ex. To do this purely via the native protocol, we are reliant on CASSANDRA-7688 (how we expose that information will likely be another ticket).

will i need to handle concurrent changes?

I don't think so. As Sylvain said, token change ownership is rare, and anyway sending a split to the wrong replica should not be catastrophic for a job, slightly inefficient at worst.

mck
January 24, 2015, 10:40 PM
Edited

I should have jumped in a little earlier, but better late than never (i hope).
Here's a little input from a cassandra+hadoop/spark user, mostly to confirm some of the statements made so far.

I think the first step for analytics tools is to divide the main job across the cluster, to determine which partition range is processed by which host

This is correct. The hadoop job starts (for us not even within the same datacenter) and for a given range requests splits, and for each split gets a list of locations (corresponding to replica).

Hadoop will favour sending the split to a tasktracker with matching location. The default behaviour is if all these tasktrackers are busy the split will immediately go to any free tasktracker. Tasktrackers can often be busy, so having all locations listed for each split i believe is important to achieve any half decent data-locality.
Data-locality is typically improved (can almost be effectively enforced) using the "yarn.scheduler.capacity.node-locality-delay" in the capacity scheduler, which allows the task to be cycled through the queue so many times looking for a matching location before it giving up and throwing it at any tasktracker.

With hadoop the one split can be sent to multiple locations (see job.setMapSpeculativeExecution(bool)). by default (at least in hadoop-2.2) this property is true.

I believe Spark works (or can work) the same way given it's based off the same interface.

Lastly, and I'm not too sure this is applicable directly to this issue, but for our cluster even with only 4 physical nodes we've seen hundred of thousands of splits result because of vnodes and we have to re-merge splits (with code along the lines provided in this patch) with matching location sets so to get sizes back close to "cassandra.input.split.size".

Piotr Kołaczkowski
February 3, 2015, 8:54 PM

What is ETA for the fix releases of this ticket?

Fixed

Assignee

Olivier Michallat

Reporter

Sylvain Lebresne

Labels

None

PM Priority

None

Affects versions

None

Fix versions

Pull Request

None

Doc Impact

None

Size

None

External issue ID

None

External issue ID

None

Priority

Major
Configure