|
26 | 26 | import com.google.api.gax.core.GaxProperties;
|
27 | 27 | import com.google.api.gax.grpc.GaxGrpcProperties;
|
28 | 28 | import com.google.api.gax.grpc.GrpcCallContext;
|
| 29 | +import com.google.api.gax.grpc.GrpcCallSettings; |
| 30 | +import com.google.api.gax.grpc.GrpcStubCallableFactory; |
29 | 31 | import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
|
30 | 32 | import com.google.api.gax.longrunning.OperationFuture;
|
31 | 33 | import com.google.api.gax.retrying.ResultRetryAlgorithm;
|
|
35 | 37 | import com.google.api.gax.rpc.ApiCallContext;
|
36 | 38 | import com.google.api.gax.rpc.ApiClientHeaderProvider;
|
37 | 39 | import com.google.api.gax.rpc.ApiException;
|
| 40 | +import com.google.api.gax.rpc.ClientContext; |
38 | 41 | import com.google.api.gax.rpc.FixedHeaderProvider;
|
39 | 42 | import com.google.api.gax.rpc.HeaderProvider;
|
40 | 43 | import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
|
|
44 | 47 | import com.google.api.gax.rpc.StatusCode;
|
45 | 48 | import com.google.api.gax.rpc.StreamController;
|
46 | 49 | import com.google.api.gax.rpc.TransportChannelProvider;
|
| 50 | +import com.google.api.gax.rpc.UnaryCallSettings; |
| 51 | +import com.google.api.gax.rpc.UnaryCallable; |
47 | 52 | import com.google.api.gax.rpc.UnavailableException;
|
48 | 53 | import com.google.api.gax.rpc.WatchdogProvider;
|
49 | 54 | import com.google.api.pathtemplate.PathTemplate;
|
|
59 | 64 | import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider;
|
60 | 65 | import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
|
61 | 66 | import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
|
| 67 | +import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory; |
62 | 68 | import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
|
63 | 69 | import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
|
64 | 70 | import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
|
|
72 | 78 | import com.google.common.base.Preconditions;
|
73 | 79 | import com.google.common.collect.ImmutableList;
|
74 | 80 | import com.google.common.collect.ImmutableMap;
|
| 81 | +import com.google.common.collect.ImmutableSet; |
75 | 82 | import com.google.common.util.concurrent.RateLimiter;
|
76 | 83 | import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
77 | 84 | import com.google.iam.v1.GetIamPolicyRequest;
|
|
157 | 164 | import java.util.LinkedList;
|
158 | 165 | import java.util.List;
|
159 | 166 | import java.util.Map;
|
| 167 | +import java.util.Set; |
160 | 168 | import java.util.concurrent.Callable;
|
161 | 169 | import java.util.concurrent.CancellationException;
|
162 | 170 | import java.util.concurrent.ConcurrentHashMap;
|
@@ -443,7 +451,45 @@ public GapicSpannerRpc(final SpannerOptions options) {
|
443 | 451 | .setCredentialsProvider(credentialsProvider)
|
444 | 452 | .setStreamWatchdogProvider(watchdogProvider)
|
445 | 453 | .build();
|
446 |
| -this.databaseAdminStub = GrpcDatabaseAdminStub.create(this.databaseAdminStubSettings); |
| 454 | + |
| 455 | +// Automatically retry RESOURCE_EXHAUSTED for GetOperation if auto-throttling of |
| 456 | +// administrative requests has been set. The GetOperation RPC is called repeatedly by gax |
| 457 | +// while polling long-running operations for their progress and can also cause these errors. |
| 458 | +// The default behavior is not to retry these errors, and this option should normally only be |
| 459 | +// enabled for (integration) testing. |
| 460 | +if (options.isAutoThrottleAdministrativeRequests()) { |
| 461 | +GrpcStubCallableFactory factory = |
| 462 | +new GrpcDatabaseAdminCallableFactory() { |
| 463 | +@Override |
| 464 | +public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCallable( |
| 465 | +GrpcCallSettings<RequestT, ResponseT> grpcCallSettings, |
| 466 | +UnaryCallSettings<RequestT, ResponseT> callSettings, |
| 467 | +ClientContext clientContext) { |
| 468 | +// Make GetOperation retry on RESOURCE_EXHAUSTED to prevent polling operations from |
| 469 | +// failing with an Administrative requests limit exceeded error. |
| 470 | +if (grpcCallSettings |
| 471 | +.getMethodDescriptor() |
| 472 | +.getFullMethodName() |
| 473 | +.equals("google.longrunning.Operations/GetOperation")) { |
| 474 | +Set<StatusCode.Code> codes = |
| 475 | +ImmutableSet.<StatusCode.Code>builderWithExpectedSize( |
| 476 | +callSettings.getRetryableCodes().size() + 1) |
| 477 | +.addAll(callSettings.getRetryableCodes()) |
| 478 | +.add(StatusCode.Code.RESOURCE_EXHAUSTED) |
| 479 | +.build(); |
| 480 | +callSettings = callSettings.toBuilder().setRetryableCodes(codes).build(); |
| 481 | +} |
| 482 | +return super.createUnaryCallable(grpcCallSettings, callSettings, clientContext); |
| 483 | +} |
| 484 | +}; |
| 485 | +this.databaseAdminStub = |
| 486 | +new GrpcDatabaseAdminStubWithCustomCallableFactory( |
| 487 | +databaseAdminStubSettings, |
| 488 | +ClientContext.create(databaseAdminStubSettings), |
| 489 | +factory); |
| 490 | +} else { |
| 491 | +this.databaseAdminStub = GrpcDatabaseAdminStub.create(databaseAdminStubSettings); |
| 492 | +} |
447 | 493 |
|
448 | 494 | // Check whether the SPANNER_EMULATOR_HOST env var has been set, and if so, if the emulator is
|
449 | 495 | // actually running.
|
@@ -504,9 +550,9 @@ private static void checkEmulatorConnection(
|
504 | 550 |
|
505 | 551 | private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
|
506 | 552 | RetrySettings.newBuilder()
|
507 |
| -.setInitialRetryDelay(Duration.ofSeconds(2L)) |
508 |
| -.setRetryDelayMultiplier(1.5) |
509 |
| -.setMaxRetryDelay(Duration.ofSeconds(15L)) |
| 553 | +.setInitialRetryDelay(Duration.ofSeconds(5L)) |
| 554 | +.setRetryDelayMultiplier(2.0) |
| 555 | +.setMaxRetryDelay(Duration.ofSeconds(60L)) |
510 | 556 | .setMaxAttempts(10)
|
511 | 557 | .build();
|
512 | 558 |
|
@@ -1021,6 +1067,11 @@ public OperationFuture<Empty, UpdateDatabaseDdlMetadata> call() throws Exception
|
1021 | 1067 | throw newSpannerException(e);
|
1022 | 1068 | } catch (ExecutionException e) {
|
1023 | 1069 | Throwable t = e.getCause();
|
| 1070 | +SpannerException se = SpannerExceptionFactory.asSpannerException(t); |
| 1071 | +if (se instanceof AdminRequestsPerMinuteExceededException) { |
| 1072 | +// Propagate this to trigger a retry. |
| 1073 | +throw se; |
| 1074 | +} |
1024 | 1075 | if (t instanceof AlreadyExistsException) {
|
1025 | 1076 | String operationName =
|
1026 | 1077 | OPERATION_NAME_TEMPLATE.instantiate(
|
|
0 commit comments