File tree

5 files changed

+160
-13
lines changed

5 files changed

+160
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
555555
}
556556

557557
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
558-
Statement statement, QueryMode queryMode, Options options) {
558+
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
559559
ExecuteSqlRequest.Builder builder =
560560
ExecuteSqlRequest.newBuilder()
561561
.setSql(statement.getSql())
@@ -569,9 +569,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
569569
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
570570
}
571571
}
572-
TransactionSelector selector = getTransactionSelector();
573-
if (selector != null) {
574-
builder.setTransaction(selector);
572+
if (withTransactionSelector) {
573+
TransactionSelector selector = getTransactionSelector();
574+
if (selector != null) {
575+
builder.setTransaction(selector);
576+
}
575577
}
576578
builder.setSeqno(getSeqNo());
577579
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
@@ -616,18 +618,26 @@ ResultSet executeQueryInternalWithOptions(
616618
beforeReadOrQuery();
617619
final int prefetchChunks =
618620
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
621+
final ExecuteSqlRequest.Builder request =
622+
getExecuteSqlRequestBuilder(
623+
statement, queryMode, options, /* withTransactionSelector = */ false);
619624
ResumableStreamIterator stream =
620625
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
621626
@Override
622627
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
623628
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
624-
final ExecuteSqlRequest.Builder request =
625-
getExecuteSqlRequestBuilder(statement, queryMode, options);
626629
if (partitionToken != null) {
627630
request.setPartitionToken(partitionToken);
628631
}
632+
TransactionSelector selector = null;
629633
if (resumeToken != null) {
630634
request.setResumeToken(resumeToken);
635+
selector = getTransactionSelector();
636+
} else if (!request.hasTransaction()) {
637+
selector = getTransactionSelector();
638+
}
639+
if (selector != null) {
640+
request.setTransaction(selector);
631641
}
632642
SpannerRpc.Call call =
633643
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
@@ -735,10 +745,13 @@ ResultSet readInternalWithOptions(
735745
@Override
736746
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
737747
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
748+
TransactionSelector selector = null;
738749
if (resumeToken != null) {
739750
builder.setResumeToken(resumeToken);
751+
selector = getTransactionSelector();
752+
} else if (!builder.hasTransaction()) {
753+
selector = getTransactionSelector();
740754
}
741-
TransactionSelector selector = getTransactionSelector();
742755
if (selector != null) {
743756
builder.setTransaction(selector);
744757
}
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,7 @@ protected PartialResultSet computeNext() {
10791079
backoffSleep(context, backOff);
10801080
}
10811081
}
1082+
10821083
continue;
10831084
}
10841085
span.addAnnotation("Stream broken. Not safe to retry");
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,10 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
575575
beforeReadOrQuery();
576576
final ExecuteSqlRequest.Builder builder =
577577
getExecuteSqlRequestBuilder(
578-
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
578+
statement,
579+
QueryMode.NORMAL,
580+
Options.fromUpdateOptions(options),
581+
/* withTransactionSelector = */ true);
579582
try {
580583
com.google.spanner.v1.ResultSet resultSet =
581584
rpc.executeQuery(builder.build(), session.getOptions());
@@ -599,7 +602,10 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
599602
beforeReadOrQuery();
600603
final ExecuteSqlRequest.Builder builder =
601604
getExecuteSqlRequestBuilder(
602-
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
605+
statement,
606+
QueryMode.NORMAL,
607+
Options.fromUpdateOptions(options),
608+
/* withTransactionSelector = */ true);
603609
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
604610
try {
605611
// Register the update as an async operation that must finish before the transaction may
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public void executeSqlRequestBuilderWithoutQueryOptions() {
9090
ExecuteSqlRequest request =
9191
context
9292
.getExecuteSqlRequestBuilder(
93-
Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions())
93+
Statement.of("SELECT FOO FROM BAR"),
94+
QueryMode.NORMAL,
95+
Options.fromQueryOptions(),
96+
true)
9497
.build();
9598
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
9699
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
@@ -105,7 +108,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
105108
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
106109
.build(),
107110
QueryMode.NORMAL,
108-
Options.fromQueryOptions())
111+
Options.fromQueryOptions(),
112+
true)
109113
.build();
110114
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
111115
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,130 @@ public Long run(TransactionContext transaction) throws Exception {
255255
assertThat(countTransactionsStarted()).isEqualTo(2);
256256
}
257257

258+
@Test
259+
public void testInlinedBeginFirstUpdateAborts() {
260+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
261+
long updateCount =
262+
client
263+
.readWriteTransaction()
264+
.run(
265+
new TransactionCallable<Long>() {
266+
boolean firstAttempt = true;
267+
268+
@Override
269+
public Long run(TransactionContext transaction) throws Exception {
270+
if (firstAttempt) {
271+
firstAttempt = false;
272+
mockSpanner.putStatementResult(
273+
StatementResult.exception(
274+
UPDATE_STATEMENT,
275+
mockSpanner.createAbortedException(
276+
ByteString.copyFromUtf8("some-tx"))));
277+
} else {
278+
mockSpanner.putStatementResult(
279+
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
280+
}
281+
return transaction.executeUpdate(UPDATE_STATEMENT);
282+
}
283+
});
284+
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
285+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
286+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
287+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
288+
}
289+
290+
@Test
291+
public void testInlinedBeginFirstQueryAborts() {
292+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
293+
long updateCount =
294+
client
295+
.readWriteTransaction()
296+
.run(
297+
new TransactionCallable<Long>() {
298+
boolean firstAttempt = true;
299+
300+
@Override
301+
public Long run(TransactionContext transaction) throws Exception {
302+
if (firstAttempt) {
303+
firstAttempt = false;
304+
mockSpanner.putStatementResult(
305+
StatementResult.exception(
306+
SELECT1,
307+
mockSpanner.createAbortedException(
308+
ByteString.copyFromUtf8("some-tx"))));
309+
} else {
310+
mockSpanner.putStatementResult(
311+
StatementResult.query(SELECT1, SELECT1_RESULTSET));
312+
}
313+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
314+
while (rs.next()) {
315+
return rs.getLong(0);
316+
}
317+
}
318+
return 0L;
319+
}
320+
});
321+
assertThat(updateCount).isEqualTo(1L);
322+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
323+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
324+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
325+
}
326+
327+
@Test
328+
public void testInlinedBeginFirstQueryReturnsUnavailable() {
329+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
330+
mockSpanner.setExecuteSqlExecutionTime(
331+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
332+
long value =
333+
client
334+
.readWriteTransaction()
335+
.run(
336+
new TransactionCallable<Long>() {
337+
@Override
338+
public Long run(TransactionContext transaction) throws Exception {
339+
// The first attempt will return UNAVAILABLE and retry internally.
340+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
341+
while (rs.next()) {
342+
return rs.getLong(0);
343+
}
344+
}
345+
return 0L;
346+
}
347+
});
348+
assertThat(value).isEqualTo(1L);
349+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
350+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
351+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
352+
}
353+
354+
@Test
355+
public void testInlinedBeginFirstReadReturnsUnavailable() {
356+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
357+
mockSpanner.setReadExecutionTime(
358+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
359+
long value =
360+
client
361+
.readWriteTransaction()
362+
.run(
363+
new TransactionCallable<Long>() {
364+
@Override
365+
public Long run(TransactionContext transaction) throws Exception {
366+
// The first attempt will return UNAVAILABLE and retry internally.
367+
try (ResultSet rs =
368+
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
369+
while (rs.next()) {
370+
return rs.getLong(0);
371+
}
372+
}
373+
return 0L;
374+
}
375+
});
376+
assertThat(value).isEqualTo(1L);
377+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
378+
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
379+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
380+
}
381+
258382
@Test
259383
public void testInlinedBeginTxWithQuery() {
260384
DatabaseClient client =
@@ -283,8 +407,7 @@ public Long run(TransactionContext transaction) throws Exception {
283407

284408
@Test
285409
public void testInlinedBeginTxWithRead() {
286-
DatabaseClient client =
287-
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
410+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
288411
long updateCount =
289412
client
290413
.readWriteTransaction()

0 commit comments

Comments
 (0)