Merged
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
Original file line numberDiff line numberDiff line change
Expand Up@@ -46,6 +46,7 @@
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Instant;

/** Default implementation of the Cloud Spanner interface. */
class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
Expand DownExpand Up@@ -94,8 +95,22 @@ private static String nextDatabaseClientId(DatabaseId databaseId) {
private final DatabaseAdminClient dbAdminClient;
private final InstanceAdminClient instanceClient;

/**
* Exception class used to track the stack trace at the point when a Spanner instance is closed.
* This exception will be thrown if a user tries to use any resources that were returned by this
* Spanner instance after the instance has been closed. This makes it easier to track down the
* code that (accidently) closed the Spanner instance.
*/
static final class ClosedException extends RuntimeException {
private static final long serialVersionUID = 1451131180314064914L;

ClosedException() {
super("Spanner client was closed at " + Instant.now());
}
}

@GuardedBy("this")
private boolean spannerIsClosed = false;
private ClosedException closedException;

@VisibleForTesting
SpannerImpl(SpannerRpc gapicRpc, SpannerOptions options) {
Expand DownExpand Up@@ -131,9 +146,17 @@ SessionImpl sessionWithId(String name) {
return getSessionClient(id.getDatabaseId()).sessionWithId(name);
}

void checkClosed() {
synchronized (this) {
if (closedException != null) {
throw new IllegalStateException("Cloud Spanner client has been closed", closedException);
}
}
}

SessionClient getSessionClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
checkClosed();
if (sessionClients.containsKey(db)) {
return sessionClients.get(db);
} else {
Expand DownExpand Up@@ -161,7 +184,7 @@ public InstanceAdminClient getInstanceAdminClient() {
@Override
public DatabaseClient getDatabaseClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
checkClosed();
if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) {
// Move the invalidated client to a separate list, so we can close it together with the
// other database clients when the Spanner instance is closed.
Expand DownExpand Up@@ -206,12 +229,12 @@ public void close() {
void close(long timeout, TimeUnit unit) {
List<ListenableFuture<Void>> closureFutures = null;
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
spannerIsClosed = true;
checkClosed();
closedException = new ClosedException();
closureFutures = new ArrayList<>();
invalidatedDbClients.addAll(dbClients.values());
for (DatabaseClientImpl dbClient : invalidatedDbClients) {
closureFutures.add(dbClient.closeAsync());
closureFutures.add(dbClient.closeAsync(closedException));
}
dbClients.clear();
}
Expand All@@ -234,7 +257,9 @@ void close(long timeout, TimeUnit unit) {

@Override
public boolean isClosed() {
return spannerIsClosed;
synchronized (this) {
return closedException != null;
}
}

/** Helper class for gRPC calls that can return paginated results. */
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -159,18 +159,18 @@ public void run() {

@Test
public void closeQuicklyDoesNotBlockIndefinitely() throws Exception {
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}

@Test
public void closeAfterInitialCreateDoesNotBlockIndefinitely() throws Exception {
pool.getReadSession().close();
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}

@Test
public void closeWhenSessionsActiveFinishes() throws Exception {
Session session = pool.getReadSession();
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}
}
Original file line numberDiff line numberDiff line change
Expand Up@@ -322,7 +322,7 @@ public void run() {
assertThat(maxAliveSessions).isAtMost(maxSessions);
}
stopMaintenance.set(true);
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
Exception e = getFailedError();
if (e != null) {
throw e;
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -42,6 +42,7 @@
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand All@@ -58,6 +59,8 @@
import com.google.spanner.v1.RollbackRequest;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.MetricRegistry;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand DownExpand Up@@ -165,6 +168,26 @@ public void run() {
Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class));
}

@Test
public void testClosedPoolIncludesClosedException() {
pool = createPool();
assertThat(pool.isValid()).isTrue();
closePoolWithStacktrace();
try {
pool.getReadSession();
fail("missing expected exception");
} catch (IllegalStateException e) {
assertThat(e.getCause()).isInstanceOf(ClosedException.class);
StringWriter sw = new StringWriter();
e.getCause().printStackTrace(new PrintWriter(sw));
assertThat(sw.toString()).contains("closePoolWithStacktrace");
}
}

private void closePoolWithStacktrace() {
pool.closeAsync(new SpannerImpl.ClosedException());
}

@Test
public void sessionCreation() {
setupMockSessionCreation();
Expand DownExpand Up@@ -203,7 +226,7 @@ public void poolLifo() {
public void poolClosure() throws Exception {
setupMockSessionCreation();
pool = createPool();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
Expand DownExpand Up@@ -237,7 +260,7 @@ public void run() {
// Clear the exception to suppress logging of expected exceptions.
Session.clearException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
}
Expand All@@ -260,7 +283,7 @@ public void run() {
}
})
.start();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
stop.set(true);
}

Expand DownExpand Up@@ -316,7 +339,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
Expand DownExpand Up@@ -374,7 +397,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
Expand DownExpand Up@@ -411,7 +434,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
ListenableFuture<Void> f = pool.closeAsync();
ListenableFuture<Void> f = pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
f.get();
assertThat(f.isDone()).isTrue();
Expand DownExpand Up@@ -456,7 +479,7 @@ public Session answer(InvocationOnMock invocation) throws Throwable {
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insidePrepare.await();
ListenableFuture<Void> f = pool.closeAsync();
ListenableFuture<Void> f = pool.closeAsync(new SpannerImpl.ClosedException());
releasePrepare.countDown();
f.get();
assertThat(f.isDone()).isTrue();
Expand DownExpand Up@@ -487,7 +510,7 @@ public void run() {
PooledSession Session = pool.getReadSession();
// Suppress expected Session warning.
Session.clearException();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
expectedException.expect(IllegalStateException.class);
pool.getReadSession();
}
Expand DownExpand Up@@ -925,7 +948,7 @@ public void run() {
runMaintainanceLoop(clock, pool, cycles);
// We will still close 2 sessions since at any point in time only 1 session was in use.
assertThat(pool.numIdleSessionsRemoved()).isEqualTo(2L);
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
Expand DownExpand Up@@ -976,7 +999,7 @@ public void run() {
// The session pool only keeps MinSessions + MaxIdleSessions alive.
verify(session, times(options.getMinSessions() + options.getMaxIdleSessions()))
.singleUse(any(TimestampBound.class));
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
Expand DownExpand Up@@ -1061,7 +1084,7 @@ public void run() {
assertThat(pool.getNumberOfAvailableWritePreparedSessions())
.isEqualTo((int) Math.ceil(options.getMinSessions() * options.getWriteSessionsFraction()));

pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

private void waitForExpectedSessionPool(int expectedSessions, float writeFraction)
Expand DownExpand Up@@ -1447,7 +1470,7 @@ public Integer run(TransactionContext transaction) throws Exception {
.isTrue();
}
}
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
}
}
}
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -27,8 +27,11 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceRpc;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand DownExpand Up@@ -222,13 +225,36 @@ public void testClientId() {

// Get a database client for the same database as the first database. As this goes through a
// different Spanner instance with potentially different options, it will get a different
// client
// id.
// client id.
DatabaseClientImpl databaseClient3 = (DatabaseClientImpl) spanner.getDatabaseClient(db);
assertThat(databaseClient3.clientId).isEqualTo("client-2");
}
}

@Test
public void testClosedException() {
Spanner spanner = new SpannerImpl(rpc, spannerOptions);
assertThat(spanner.isClosed()).isFalse();
// Close the Spanner instance in a different method so we can actually verify that the entire
// stacktrace of the method that closed the instance is included in the exception that will be
// thrown by the instance after it has been closed.
closeSpannerAndIncludeStacktrace(spanner);
assertThat(spanner.isClosed()).isTrue();
try {
spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
fail("missing expected exception");
} catch (IllegalStateException e) {
assertThat(e.getCause()).isInstanceOf(ClosedException.class);
StringWriter sw = new StringWriter();
e.getCause().printStackTrace(new PrintWriter(sw));
assertThat(sw.toString()).contains("closeSpannerAndIncludeStacktrace");
}
}

private void closeSpannerAndIncludeStacktrace(Spanner spanner) {
spanner.close();
}

private SpannerOptions createSpannerOptions() {
return SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
Expand Down