12 files changed

+760
-27
lines changed
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
4848
import com.google.spanner.v1.PartialResultSet;
4949
import com.google.spanner.v1.ReadRequest;
50+
import com.google.spanner.v1.RequestOptions;
5051
import com.google.spanner.v1.Transaction;
5152
import com.google.spanner.v1.TransactionOptions;
5253
import com.google.spanner.v1.TransactionSelector;
@@ -557,6 +558,14 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
557558
return builder.build();
558559
}
559560

561+
RequestOptions buildRequestOptions(Options options) {
562+
RequestOptions.Builder builder = RequestOptions.newBuilder();
563+
if (options.hasPriority()) {
564+
builder.setPriority(options.priority());
565+
}
566+
return builder.build();
567+
}
568+
560569
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
561570
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
562571
ExecuteSqlRequest.Builder builder =
@@ -580,6 +589,7 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
580589
}
581590
builder.setSeqno(getSeqNo());
582591
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
592+
builder.setRequestOptions(buildRequestOptions(options));
583593
return builder;
584594
}
585595

@@ -610,6 +620,7 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
610620
builder.setTransaction(selector);
611621
}
612622
builder.setSeqno(getSeqNo());
623+
builder.setRequestOptions(buildRequestOptions(options));
613624
return builder;
614625
}
615626

@@ -760,6 +771,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
760771
if (selector != null) {
761772
builder.setTransaction(selector);
762773
}
774+
builder.setRequestOptions(buildRequestOptions(readOptions));
763775
SpannerRpc.Call call =
764776
rpc.read(builder.build(), stream.consumer(), session.getOptions());
765777
call.request(prefetchChunks);
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.cloud.Timestamp;
20+
import com.google.cloud.spanner.Options.RpcPriority;
2021
import com.google.cloud.spanner.Options.TransactionOption;
2122
import com.google.cloud.spanner.Options.UpdateOption;
2223

@@ -75,9 +76,21 @@ public interface DatabaseClient {
7576
* .set("LastName")
7677
* .to("Joel")
7778
* .build();
78-
* dbClient.writeWithOptions(Collections.singletonList(mutation));
79+
* dbClient.writeWithOptions(
80+
* Collections.singletonList(mutation),
81+
* Options.priority(RpcPriority.HIGH));
7982
* }</pre>
8083
*
84+
* Options for a transaction can include:
85+
*
86+
* <ul>
87+
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
88+
* RpcPriority} to use for the commit request of the transaction. The priority will not be
89+
* applied to any other requests on the transaction.
90+
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
91+
* {@link CommitResponse}.
92+
* </ul>
93+
*
8194
* @return a response with the timestamp at which the write was committed
8295
*/
8396
CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
@@ -138,9 +151,21 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
138151
* .set("LastName")
139152
* .to("Joel")
140153
* .build();
141-
* dbClient.writeAtLeastOnce(Collections.singletonList(mutation));
154+
* dbClient.writeAtLeastOnceWithOptions(
155+
* Collections.singletonList(mutation),
156+
* Options.priority(RpcPriority.LOW));
142157
* }</pre>
143158
*
159+
* Options for a transaction can include:
160+
*
161+
* <ul>
162+
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
163+
* RpcPriority} to use for the commit request of the transaction. The priority will not be
164+
* applied to any other requests on the transaction.
165+
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
166+
* {@link CommitResponse}.
167+
* </ul>
168+
*
144169
* @return a response with the timestamp at which the write was committed
145170
*/
146171
CommitResponse writeAtLeastOnceWithOptions(
@@ -308,6 +333,16 @@ CommitResponse writeAtLeastOnceWithOptions(
308333
* }
309334
* });
310335
* </code></pre>
336+
*
337+
* Options for a transaction can include:
338+
*
339+
* <ul>
340+
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
341+
* RpcPriority} to use for the commit request of the transaction. The priority will not be
342+
* applied to any other requests on the transaction.
343+
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
344+
* {@link CommitResponse}.
345+
* </ul>
311346
*/
312347
TransactionRunner readWriteTransaction(TransactionOption... options);
313348

@@ -338,6 +373,16 @@ CommitResponse writeAtLeastOnceWithOptions(
338373
* }
339374
* }
340375
* }</pre>
376+
*
377+
* Options for a transaction can include:
378+
*
379+
* <ul>
380+
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
381+
* RpcPriority} to use for the commit request of the transaction. The priority will not be
382+
* applied to any other requests on the transaction.
383+
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
384+
* {@link CommitResponse}.
385+
* </ul>
341386
*/
342387
TransactionManager transactionManager(TransactionOption... options);
343388

@@ -371,6 +416,16 @@ CommitResponse writeAtLeastOnceWithOptions(
371416
* },
372417
* executor);
373418
* </code></pre>
419+
*
420+
* Options for a transaction can include:
421+
*
422+
* <ul>
423+
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
424+
* RpcPriority} to use for the commit request of the transaction. The priority will not be
425+
* applied to any other requests on the transaction.
426+
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
427+
* {@link CommitResponse}.
428+
* </ul>
374429
*/
375430
AsyncRunner runAsync(TransactionOption... options);
376431

@@ -459,6 +514,18 @@ CommitResponse writeAtLeastOnceWithOptions(
459514
* }
460515
* }
461516
* }</pre>
517+
*
518+
* Options for a transaction can include:
519+
*
520+
* <p>Options for a transaction can include:
521+
*
522+
* <ul>
523+
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
524+
* RpcPriority} to use for the commit request of the transaction. The priority will not be
525+
* applied to any other requests on the transaction.
526+
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
527+
* {@link CommitResponse}.
528+
* </ul>
462529
*/
463530
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);
464531

Original file line numberDiff line numberDiff line change
@@ -17,13 +17,30 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.common.base.Preconditions;
20+
import com.google.spanner.v1.RequestOptions.Priority;
2021
import java.io.Serializable;
2122
import java.util.Objects;
2223

2324
/** Specifies options for various spanner operations */
2425
public final class Options implements Serializable {
2526
private static final long serialVersionUID = 8067099123096783941L;
2627

28+
/**
29+
* Priority for an RPC invocation. The default priority is {@link #HIGH}. This enum can be used to
30+
* set a lower priority for a specific RPC invocation.
31+
*/
32+
public enum RpcPriority {
33+
LOW(Priority.PRIORITY_LOW),
34+
MEDIUM(Priority.PRIORITY_MEDIUM),
35+
HIGH(Priority.PRIORITY_HIGH);
36+
37+
private final Priority proto;
38+
39+
private RpcPriority(Priority proto) {
40+
this.proto = Preconditions.checkNotNull(proto);
41+
}
42+
}
43+
2744
/** Marker interface to mark options applicable to both Read and Query operations */
2845
public interface ReadAndQueryOption extends ReadOption, QueryOption {}
2946

@@ -79,6 +96,11 @@ public static ReadAndQueryOption bufferRows(int bufferRows) {
7996
return new BufferRowsOption(bufferRows);
8097
}
8198

99+
/** Specifies the priority to use for the RPC. */
100+
public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
101+
return new PriorityOption(priority);
102+
}
103+
82104
/**
83105
* Specifying this will cause the list operations to fetch at most this many records in a page.
84106
*/
@@ -158,13 +180,28 @@ void appendToOptions(Options options) {
158180
}
159181
}
160182

183+
static final class PriorityOption extends InternalOption
184+
implements ReadQueryUpdateTransactionOption {
185+
private final RpcPriority priority;
186+
187+
PriorityOption(RpcPriority priority) {
188+
this.priority = priority;
189+
}
190+
191+
@Override
192+
void appendToOptions(Options options) {
193+
options.priority = priority;
194+
}
195+
}
196+
161197
private boolean withCommitStats;
162198
private Long limit;
163199
private Integer prefetchChunks;
164200
private Integer bufferRows;
165201
private Integer pageSize;
166202
private String pageToken;
167203
private String filter;
204+
private RpcPriority priority;
168205

169206
// Construction is via factory methods below.
170207
private Options() {}
@@ -221,6 +258,14 @@ String filter() {
221258
return filter;
222259
}
223260

261+
boolean hasPriority() {
262+
return priority != null;
263+
}
264+
265+
Priority priority() {
266+
return priority == null ? null : priority.proto;
267+
}
268+
224269
@Override
225270
public String toString() {
226271
StringBuilder b = new StringBuilder();
@@ -242,6 +287,9 @@ public String toString() {
242287
if (filter != null) {
243288
b.append("filter: ").append(filter).append(' ');
244289
}
290+
if (priority != null) {
291+
b.append("priority: ").append(priority).append(' ');
292+
}
245293
return b.toString();
246294
}
247295

@@ -271,7 +319,8 @@ public boolean equals(Object o) {
271319
&& (!hasPageSize() && !that.hasPageSize()
272320
|| hasPageSize() && that.hasPageSize() && Objects.equals(pageSize(), that.pageSize()))
273321
&& Objects.equals(pageToken(), that.pageToken())
274-
&& Objects.equals(filter(), that.filter());
322+
&& Objects.equals(filter(), that.filter())
323+
&& Objects.equals(priority(), that.priority());
275324
}
276325

277326
@Override
@@ -298,6 +347,9 @@ public int hashCode() {
298347
if (filter != null) {
299348
result = 31 * result + filter.hashCode();
300349
}
350+
if (priority != null) {
351+
result = 31 * result + priority.hashCode();
352+
}
301353
return result;
302354
}
303355

Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import com.google.api.gax.rpc.UnavailableException;
2727
import com.google.cloud.spanner.Options.UpdateOption;
2828
import com.google.cloud.spanner.spi.v1.SpannerRpc;
29+
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.base.Stopwatch;
3031
import com.google.common.base.Ticker;
3132
import com.google.protobuf.ByteString;
3233
import com.google.spanner.v1.BeginTransactionRequest;
3334
import com.google.spanner.v1.ExecuteSqlRequest;
3435
import com.google.spanner.v1.PartialResultSet;
36+
import com.google.spanner.v1.RequestOptions;
3537
import com.google.spanner.v1.Transaction;
3638
import com.google.spanner.v1.TransactionOptions;
3739
import com.google.spanner.v1.TransactionSelector;
@@ -162,8 +164,8 @@ private ExecuteSqlRequest resumeOrRestartRequest(
162164
}
163165
}
164166

165-
private ExecuteSqlRequest newTransactionRequestFrom(
166-
final Statement statement, final Options options) {
167+
@VisibleForTesting
168+
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
167169
ByteString transactionId = initTransaction();
168170

169171
final TransactionSelector transactionSelector =
@@ -179,6 +181,11 @@ private ExecuteSqlRequest newTransactionRequestFrom(
179181

180182
builder.setResumeToken(ByteString.EMPTY);
181183

184+
if (options.hasPriority()) {
185+
builder.setRequestOptions(
186+
RequestOptions.newBuilder().setPriority(options.priority()).build());
187+
}
188+
182189
return builder.build();
183190
}
184191

Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.protobuf.Empty;
3838
import com.google.spanner.v1.BeginTransactionRequest;
3939
import com.google.spanner.v1.CommitRequest;
40+
import com.google.spanner.v1.RequestOptions;
4041
import com.google.spanner.v1.Transaction;
4142
import com.google.spanner.v1.TransactionOptions;
4243
import io.opencensus.common.Scope;
@@ -160,22 +161,26 @@ public CommitResponse writeAtLeastOnceWithOptions(
160161
Iterable<Mutation> mutations, TransactionOption... transactionOptions)
161162
throws SpannerException {
162163
setActive(null);
164+
Options commitRequestOptions = Options.fromTransactionOptions(transactionOptions);
163165
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
164166
Mutation.toProto(mutations, mutationsProto);
165-
final CommitRequest request =
167+
final CommitRequest.Builder requestBuilder =
166168
CommitRequest.newBuilder()
167169
.setSession(name)
168170
.setReturnCommitStats(
169171
Options.fromTransactionOptions(transactionOptions).withCommitStats())
170172
.addAllMutations(mutationsProto)
171173
.setSingleUseTransaction(
172174
TransactionOptions.newBuilder()
173-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
174-
.build();
175+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
176+
if (commitRequestOptions.hasPriority()) {
177+
requestBuilder.setRequestOptions(
178+
RequestOptions.newBuilder().setPriority(commitRequestOptions.priority()).build());
179+
}
175180
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
176181
try (Scope s = tracer.withSpan(span)) {
177182
com.google.spanner.v1.CommitResponse response =
178-
spanner.getRpc().commit(request, this.options);
183+
spanner.getRpc().commit(requestBuilder.build(), this.options);
179184
return new CommitResponse(response);
180185
} catch (RuntimeException e) {
181186
TraceUtil.setWithFailure(span, e);
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.google.spanner.v1.ExecuteBatchDmlResponse;
4444
import com.google.spanner.v1.ExecuteSqlRequest;
4545
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
46+
import com.google.spanner.v1.RequestOptions;
4647
import com.google.spanner.v1.ResultSet;
4748
import com.google.spanner.v1.RollbackRequest;
4849
import com.google.spanner.v1.Transaction;
@@ -298,6 +299,10 @@ ApiFuture<CommitResponse> commitAsync() {
298299
CommitRequest.newBuilder()
299300
.setSession(session.getName())
300301
.setReturnCommitStats(options.withCommitStats());
302+
if (options.hasPriority()) {
303+
builder.setRequestOptions(
304+
RequestOptions.newBuilder().setPriority(options.priority()).build());
305+
}
301306
synchronized (lock) {
302307
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
303308
finishOps = SettableApiFuture.create();
@@ -344,6 +349,10 @@ public void run() {
344349
requestBuilder.setTransactionId(
345350
transactionId == null ? transactionIdFuture.get() : transactionId);
346351
}
352+
if (options.hasPriority()) {
353+
requestBuilder.setRequestOptions(
354+
RequestOptions.newBuilder().setPriority(options.priority()).build());
355+
}
347356
final CommitRequest commitRequest = requestBuilder.build();
348357
span.addAnnotation("Starting Commit");
349358
final Span opSpan =

0 commit comments

Comments
 (0)