File tree

6 files changed

+101
-7
lines changed

6 files changed

+101
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -371,4 +371,10 @@
371371
<className>com/google/cloud/spanner/ResultSets</className>
372372
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
373373
</difference>
374+
375+
<difference>
376+
<differenceType>7012</differenceType>
377+
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
378+
<method>com.google.api.core.ApiFuture closeAsync()</method>
379+
</difference>
374380
</differences>
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.cloud.Timestamp;
21-
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction;
22-
import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture;
23-
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
2421
import com.google.cloud.spanner.TransactionManager.TransactionState;
2522
import com.google.common.util.concurrent.ListenableFuture;
2623
import com.google.common.util.concurrent.MoreExecutors;
@@ -200,4 +197,11 @@ public interface AsyncTransactionFunction<I, O> {
200197
*/
201198
@Override
202199
void close();
200+
201+
/**
202+
* Closes the transaction manager. If there is an active transaction, it will be rolled back. The
203+
* underlying session will be released back to the session pool. The returned {@link ApiFuture} is
204+
* done when the transaction (if any) has been rolled back.
205+
*/
206+
ApiFuture<Void> closeAsync();
203207
}
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2525
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
2626
import com.google.cloud.spanner.TransactionManager.TransactionState;
27+
import com.google.common.base.MoreObjects;
2728
import com.google.common.base.Preconditions;
2829
import com.google.common.util.concurrent.MoreExecutors;
2930
import io.opencensus.trace.Span;
@@ -54,7 +55,17 @@ public void setSpan(Span span) {
5455

5556
@Override
5657
public void close() {
58+
closeAsync();
59+
}
60+
61+
@Override
62+
public ApiFuture<Void> closeAsync() {
63+
ApiFuture<Void> res = null;
64+
if (txnState == TransactionState.STARTED) {
65+
res = rollbackAsync();
66+
}
5767
txn.close();
68+
return MoreObjects.firstNonNull(res, ApiFutures.<Void>immediateFuture(null));
5869
}
5970

6071
@Override
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.api.core.ApiFutures;
2323
import com.google.api.core.SettableApiFuture;
2424
import com.google.cloud.Timestamp;
25-
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
2625
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
2726
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
2827
import com.google.cloud.spanner.TransactionManager.TransactionState;
@@ -59,14 +58,41 @@ public void run() {
5958

6059
@Override
6160
public void close() {
62-
delegate.addListener(
63-
new Runnable() {
61+
SpannerApiFutures.get(closeAsync());
62+
}
63+
64+
@Override
65+
public ApiFuture<Void> closeAsync() {
66+
final SettableApiFuture<Void> res = SettableApiFuture.create();
67+
ApiFutures.addCallback(
68+
delegate,
69+
new ApiFutureCallback<AsyncTransactionManagerImpl>() {
6470
@Override
65-
public void run() {
71+
public void onFailure(Throwable t) {
6672
session.close();
6773
}
74+
75+
@Override
76+
public void onSuccess(AsyncTransactionManagerImpl result) {
77+
ApiFutures.addCallback(
78+
result.closeAsync(),
79+
new ApiFutureCallback<Void>() {
80+
@Override
81+
public void onFailure(Throwable t) {
82+
res.setException(t);
83+
}
84+
85+
@Override
86+
public void onSuccess(Void result) {
87+
session.close();
88+
res.set(result);
89+
}
90+
},
91+
MoreExecutors.directExecutor());
92+
}
6893
},
6994
MoreExecutors.directExecutor());
95+
return res;
7096
}
7197

7298
@Override
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
3737
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
3838
import com.google.cloud.spanner.Options.ReadOption;
39+
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
3940
import com.google.common.base.Function;
41+
import com.google.common.base.Predicate;
4042
import com.google.common.collect.ImmutableList;
4143
import com.google.common.collect.Iterables;
4244
import com.google.common.collect.Range;
@@ -47,6 +49,8 @@
4749
import com.google.spanner.v1.CommitRequest;
4850
import com.google.spanner.v1.ExecuteBatchDmlRequest;
4951
import com.google.spanner.v1.ExecuteSqlRequest;
52+
import com.google.spanner.v1.RollbackRequest;
53+
import com.google.spanner.v1.TransactionSelector;
5054
import io.grpc.Status;
5155
import java.util.Arrays;
5256
import java.util.Collection;
@@ -181,6 +185,30 @@ public void onSuccess(long[] input) {
181185
}
182186
}
183187

188+
@Test
189+
public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exception {
190+
AsyncTransactionManager manager = client().transactionManagerAsync();
191+
TransactionContext txn = manager.beginAsync().get();
192+
txn.executeUpdateAsync(UPDATE_STATEMENT).get();
193+
final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector();
194+
195+
SpannerApiFutures.get(manager.closeAsync());
196+
// The mock server should already have the Rollback request, as we are waiting for the returned
197+
// ApiFuture to be done.
198+
mockSpanner.waitForRequestsToContain(
199+
new Predicate<AbstractMessage>() {
200+
@Override
201+
public boolean apply(AbstractMessage input) {
202+
if (input instanceof RollbackRequest) {
203+
RollbackRequest request = (RollbackRequest) input;
204+
return request.getTransactionId().equals(selector.getId());
205+
}
206+
return false;
207+
}
208+
},
209+
0L);
210+
}
211+
184212
@Test
185213
public void asyncTransactionManagerUpdate() throws Exception {
186214
final SettableApiFuture<Long> updateCount = SettableApiFuture.create();
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
2424
import com.google.common.base.Optional;
2525
import com.google.common.base.Preconditions;
26+
import com.google.common.base.Predicate;
2627
import com.google.common.base.Stopwatch;
2728
import com.google.common.base.Throwables;
29+
import com.google.common.collect.Iterables;
2830
import com.google.common.util.concurrent.Uninterruptibles;
2931
import com.google.protobuf.AbstractMessage;
3032
import com.google.protobuf.ByteString;
@@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
19271929
}
19281930
}
19291931

1932+
public void waitForRequestsToContain(
1933+
Predicate<? super AbstractMessage> predicate, long timeoutMillis)
1934+
throws InterruptedException, TimeoutException {
1935+
Stopwatch watch = Stopwatch.createStarted();
1936+
while (true) {
1937+
Iterable<AbstractMessage> msg = Iterables.filter(getRequests(), predicate);
1938+
if (msg.iterator().hasNext()) {
1939+
break;
1940+
}
1941+
Thread.sleep(10L);
1942+
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
1943+
throw new TimeoutException(
1944+
"Timeout while waiting for requests to contain the wanted request");
1945+
}
1946+
}
1947+
}
1948+
19301949
@Override
19311950
public void addResponse(AbstractMessage response) {
19321951
throw new UnsupportedOperationException();

0 commit comments

Comments
 (0)