File tree

5 files changed

+161
-15
lines changed

5 files changed

+161
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -535,8 +535,6 @@ private ResultSet executeQueryInternal(
535535
* <li>Specific {@link QueryOptions} passed in for this query.
536536
* <li>Any value specified in a valid environment variable when the {@link SpannerOptions}
537537
* instance was created.
538-
* <li>The default {@link SpannerOptions#getDefaultQueryOptions()} specified for the database
539-
* where the query is executed.
540538
* </ol>
541539
*/
542540
@VisibleForTesting
@@ -554,7 +552,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
554552
return builder.build();
555553
}
556554

557-
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
555+
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
556+
Statement statement, QueryMode queryMode, boolean withTransactionSelector) {
558557
ExecuteSqlRequest.Builder builder =
559558
ExecuteSqlRequest.newBuilder()
560559
.setSql(statement.getSql())
@@ -568,9 +567,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
568567
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
569568
}
570569
}
571-
TransactionSelector selector = getTransactionSelector();
572-
if (selector != null) {
573-
builder.setTransaction(selector);
570+
if (withTransactionSelector) {
571+
TransactionSelector selector = getTransactionSelector();
572+
if (selector != null) {
573+
builder.setTransaction(selector);
574+
}
574575
}
575576
builder.setSeqno(getSeqNo());
576577
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
@@ -614,18 +615,26 @@ ResultSet executeQueryInternalWithOptions(
614615
beforeReadOrQuery();
615616
final int prefetchChunks =
616617
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
618+
final ExecuteSqlRequest.Builder request =
619+
getExecuteSqlRequestBuilder(
620+
statement, queryMode, /* withTransactionSelector = */ false);
617621
ResumableStreamIterator stream =
618622
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
619623
@Override
620624
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
621625
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
622-
final ExecuteSqlRequest.Builder request =
623-
getExecuteSqlRequestBuilder(statement, queryMode);
624626
if (partitionToken != null) {
625627
request.setPartitionToken(partitionToken);
626628
}
629+
TransactionSelector selector = null;
627630
if (resumeToken != null) {
628631
request.setResumeToken(resumeToken);
632+
selector = getTransactionSelector();
633+
} else if (!request.hasTransaction()) {
634+
selector = getTransactionSelector();
635+
}
636+
if (selector != null) {
637+
request.setTransaction(selector);
629638
}
630639
SpannerRpc.Call call =
631640
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
@@ -733,10 +742,13 @@ ResultSet readInternalWithOptions(
733742
@Override
734743
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
735744
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
745+
TransactionSelector selector = null;
736746
if (resumeToken != null) {
737747
builder.setResumeToken(resumeToken);
748+
selector = getTransactionSelector();
749+
} else if (!builder.hasTransaction()) {
750+
selector = getTransactionSelector();
738751
}
739-
TransactionSelector selector = getTransactionSelector();
740752
if (selector != null) {
741753
builder.setTransaction(selector);
742754
}
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,7 @@ protected PartialResultSet computeNext() {
10711071
backoffSleep(context, backOff);
10721072
}
10731073
}
1074+
10741075
continue;
10751076
}
10761077
span.addAnnotation("Stream broken. Not safe to retry");
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,10 @@ public void buffer(Iterable<Mutation> mutations) {
515515
public long executeUpdate(Statement statement) {
516516
beforeReadOrQuery();
517517
final ExecuteSqlRequest.Builder builder =
518-
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
518+
getExecuteSqlRequestBuilder(
519+
statement,
520+
QueryMode.NORMAL,
521+
/* withTransactionSelector = */ true);
519522
try {
520523
com.google.spanner.v1.ResultSet resultSet =
521524
rpc.executeQuery(builder.build(), session.getOptions());
@@ -538,7 +541,10 @@ public long executeUpdate(Statement statement) {
538541
public ApiFuture<Long> executeUpdateAsync(Statement statement) {
539542
beforeReadOrQuery();
540543
final ExecuteSqlRequest.Builder builder =
541-
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
544+
getExecuteSqlRequestBuilder(
545+
statement,
546+
QueryMode.NORMAL,
547+
/* withTransactionSelector = */ true);
542548
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
543549
try {
544550
// Register the update as an async operation that must finish before the transaction may
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@ public void setup() {
8989
public void executeSqlRequestBuilderWithoutQueryOptions() {
9090
ExecuteSqlRequest request =
9191
context
92-
.getExecuteSqlRequestBuilder(Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL)
92+
.getExecuteSqlRequestBuilder(
93+
Statement.of("SELECT FOO FROM BAR"),
94+
QueryMode.NORMAL,
95+
true)
9396
.build();
9497
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
9598
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
@@ -103,7 +106,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
103106
Statement.newBuilder("SELECT FOO FROM BAR")
104107
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
105108
.build(),
106-
QueryMode.NORMAL)
109+
QueryMode.NORMAL,
110+
true)
107111
.build();
108112
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
109113
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,130 @@ public Long run(TransactionContext transaction) throws Exception {
251251
assertThat(countTransactionsStarted()).isEqualTo(2);
252252
}
253253

254+
@Test
255+
public void testInlinedBeginFirstUpdateAborts() {
256+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
257+
long updateCount =
258+
client
259+
.readWriteTransaction()
260+
.run(
261+
new TransactionCallable<Long>() {
262+
boolean firstAttempt = true;
263+
264+
@Override
265+
public Long run(TransactionContext transaction) throws Exception {
266+
if (firstAttempt) {
267+
firstAttempt = false;
268+
mockSpanner.putStatementResult(
269+
StatementResult.exception(
270+
UPDATE_STATEMENT,
271+
mockSpanner.createAbortedException(
272+
ByteString.copyFromUtf8("some-tx"))));
273+
} else {
274+
mockSpanner.putStatementResult(
275+
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
276+
}
277+
return transaction.executeUpdate(UPDATE_STATEMENT);
278+
}
279+
});
280+
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
281+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
282+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
283+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
284+
}
285+
286+
@Test
287+
public void testInlinedBeginFirstQueryAborts() {
288+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
289+
long updateCount =
290+
client
291+
.readWriteTransaction()
292+
.run(
293+
new TransactionCallable<Long>() {
294+
boolean firstAttempt = true;
295+
296+
@Override
297+
public Long run(TransactionContext transaction) throws Exception {
298+
if (firstAttempt) {
299+
firstAttempt = false;
300+
mockSpanner.putStatementResult(
301+
StatementResult.exception(
302+
SELECT1,
303+
mockSpanner.createAbortedException(
304+
ByteString.copyFromUtf8("some-tx"))));
305+
} else {
306+
mockSpanner.putStatementResult(
307+
StatementResult.query(SELECT1, SELECT1_RESULTSET));
308+
}
309+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
310+
while (rs.next()) {
311+
return rs.getLong(0);
312+
}
313+
}
314+
return 0L;
315+
}
316+
});
317+
assertThat(updateCount).isEqualTo(1L);
318+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
319+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
320+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
321+
}
322+
323+
@Test
324+
public void testInlinedBeginFirstQueryReturnsUnavailable() {
325+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
326+
mockSpanner.setExecuteSqlExecutionTime(
327+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
328+
long value =
329+
client
330+
.readWriteTransaction()
331+
.run(
332+
new TransactionCallable<Long>() {
333+
@Override
334+
public Long run(TransactionContext transaction) throws Exception {
335+
// The first attempt will return UNAVAILABLE and retry internally.
336+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
337+
while (rs.next()) {
338+
return rs.getLong(0);
339+
}
340+
}
341+
return 0L;
342+
}
343+
});
344+
assertThat(value).isEqualTo(1L);
345+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
346+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
347+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
348+
}
349+
350+
@Test
351+
public void testInlinedBeginFirstReadReturnsUnavailable() {
352+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
353+
mockSpanner.setReadExecutionTime(
354+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
355+
long value =
356+
client
357+
.readWriteTransaction()
358+
.run(
359+
new TransactionCallable<Long>() {
360+
@Override
361+
public Long run(TransactionContext transaction) throws Exception {
362+
// The first attempt will return UNAVAILABLE and retry internally.
363+
try (ResultSet rs =
364+
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
365+
while (rs.next()) {
366+
return rs.getLong(0);
367+
}
368+
}
369+
return 0L;
370+
}
371+
});
372+
assertThat(value).isEqualTo(1L);
373+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
374+
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
375+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
376+
}
377+
254378
@Test
255379
public void testInlinedBeginTxWithQuery() {
256380
DatabaseClient client =
@@ -279,8 +403,7 @@ public Long run(TransactionContext transaction) throws Exception {
279403

280404
@Test
281405
public void testInlinedBeginTxWithRead() {
282-
DatabaseClient client =
283-
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
406+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
284407
long updateCount =
285408
client
286409
.readWriteTransaction()

0 commit comments

Comments
 (0)