Hello guys,
I've just created the sscce which reproduces the next issue.
story:
I needed to implement the pagination task using cassandra as a data store.
As all data need to be requested asynchronously so I use advices from async-paging
The issue which I faced is that when I try to execute new read statement asynchronously I sometimes got previous and already exhausted result set. In spite of I use serial consistency level for this read statement and do not set paging state.it occurs anyway.
As you can see in my sscce I've created two tests. I should note that the issue reproduces sometimes only if the test case contains two tests. If I run each test independently it is always done successfully. Also I note that I do a clean job after each test is done.
The issue is fixed if I add some delay (~1sec) before execute async request - see this commit
I don't actually know what the root of the problem is. Either I need to do some additional settings or you need to fix it internally, So I decided to post it here anyway.
I believe you will be able to quickly reproduce it and fix if it requires.
Windows 7 x64
Java 1.8.0_91 x64
Cassandra 3.9.0
Cassandra DataStax Java Driver 3.2.0
From a quick glance into your code, I have a couple of suggestions:
I see some variables being shared between threads, e.g. PagingState and ResultSet. Such variables are being read and written by different threads, see here. This is not going to work.
I also suspect that some shared integers, e.g. currentPage and requestedPage will need to be synchronized in order be thread-safe, maybe just turning them volatile is enough; but in general, your class is holding too much shared state.
I am not sure if the test() operator, that you use to unit test your code, blocks or not until the underlying observable completes (I am more familiar with Flowables) – from the code, it does not seem to block. If it indeed does not block, you will have to explicitly block in your tests because paging is done asynchronously, e.g.:
Although the general guidelines stated in my previous comment remain valid, I must admit that, after looking deeper into your code, they are not directly related to the actual problem.
So the actual problem is in your test code: in some places your are calling the actual method instead of passing a method reference, e.g. the following is incorrect:
Because your are calling paginator.currentPage() before the Completable returned by addAll completes.
You need to replace this with:
And your problems will all disappear.
Some other minor remarks:
Don't use ConsistencyLevel.SERIAL for normal updates, SERIAL is only valid for LWT updates; attempting to set a normal consistency level to SERIAL might throw an error in a future version of the driver, see JAVA-1482.
In testThatCurrentPageIsGettingCorrectly, you are using Single.merge to concatenate pages together; for correctness, I suggest that you use the more apt Single.concat operator instead.
Please let me know if I can close this ticket, as it appears that the driver is not at fault here.
Many thanks for your review. You exactly pointed me out at the place where the problem occurrs. And all your other notes are very helpful. I've just made the corections and the issue is gone. Actually this is not related to the driver, Before you close the ticket I would like to ask you about the next snippet of test code:
Should I use here Single.fromCallable(method reference) instead of Single.just(method call) to prevent the same issue as above?
Should I use here Single.fromCallable(method reference) instead of Single.just(method call) to prevent the same issue as above?
Yes, otherwise you will call nextPage() 3 times potentially before all the entries are added.
Yep. You can close the ticket now.