@@ -1595,6 +1595,7 @@ final class PoolMaintainer {
|
1595 | 1595 | Instant lastResetTime = Instant.ofEpochMilli(0);
|
1596 | 1596 | int numSessionsToClose = 0;
|
1597 | 1597 | int sessionsToClosePerLoop = 0;
|
| 1598 | +boolean closed = false; |
1598 | 1599 |
|
1599 | 1600 | @GuardedBy("lock")
|
1600 | 1601 | ScheduledFuture<?> scheduledFuture;
|
@@ -1621,17 +1622,26 @@ public void run() {
|
1621 | 1622 |
|
1622 | 1623 | void close() {
|
1623 | 1624 | synchronized (lock) {
|
1624 |
| -scheduledFuture.cancel(false); |
1625 |
| -if (!running) { |
1626 |
| -decrementPendingClosures(1); |
| 1625 | +if (!closed) { |
| 1626 | +closed = true; |
| 1627 | +scheduledFuture.cancel(false); |
| 1628 | +if (!running) { |
| 1629 | +decrementPendingClosures(1); |
| 1630 | +} |
1627 | 1631 | }
|
1628 | 1632 | }
|
1629 | 1633 | }
|
1630 | 1634 |
|
| 1635 | +boolean isClosed() { |
| 1636 | +synchronized (lock) { |
| 1637 | +return closed; |
| 1638 | +} |
| 1639 | +} |
| 1640 | + |
1631 | 1641 | // Does various pool maintenance activities.
|
1632 | 1642 | void maintainPool() {
|
1633 | 1643 | synchronized (lock) {
|
1634 |
| -if (isClosed()) { |
| 1644 | +if (SessionPool.this.isClosed()) { |
1635 | 1645 | return;
|
1636 | 1646 | }
|
1637 | 1647 | running = true;
|
@@ -1643,7 +1653,7 @@ void maintainPool() {
|
1643 | 1653 | replenishPool();
|
1644 | 1654 | synchronized (lock) {
|
1645 | 1655 | running = false;
|
1646 |
| -if (isClosed()) { |
| 1656 | +if (SessionPool.this.isClosed()) { |
1647 | 1657 | decrementPendingClosures(1);
|
1648 | 1658 | }
|
1649 | 1659 | }
|
@@ -2126,6 +2136,7 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
|
2126 | 2136 | }
|
2127 | 2137 | if (isDatabaseOrInstanceNotFound(e)) {
|
2128 | 2138 | setResourceNotFoundException((ResourceNotFoundException) e);
|
| 2139 | +poolMaintainer.close(); |
2129 | 2140 | }
|
2130 | 2141 | }
|
2131 | 2142 | }
|
@@ -2161,10 +2172,14 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
|
2161 | 2172 | }
|
2162 | 2173 | closureFuture = SettableFuture.create();
|
2163 | 2174 | retFuture = closureFuture;
|
2164 |
| -pendingClosure = |
2165 |
| -totalSessions() + numSessionsBeingCreated + 1 /* For pool maintenance thread */; |
2166 | 2175 |
|
2167 |
| -poolMaintainer.close(); |
| 2176 | +pendingClosure = totalSessions() + numSessionsBeingCreated; |
| 2177 | + |
| 2178 | +if (!poolMaintainer.isClosed()) { |
| 2179 | +pendingClosure += 1; // For pool maintenance thread |
| 2180 | +poolMaintainer.close(); |
| 2181 | +} |
| 2182 | + |
2168 | 2183 | sessions.clear();
|
2169 | 2184 | for (PooledSessionFuture session : checkedOutSessions) {
|
2170 | 2185 | if (session.Exception != null) {
|
@@ -2180,7 +2195,13 @@ ListenableFuture<Void> closeAsync(ClosedException closedException) {
|
2180 | 2195 | closeSessionAsync(session);
|
2181 | 2196 | }
|
2182 | 2197 | }
|
| 2198 | + |
| 2199 | +// Nothing to be closed, mark as complete |
| 2200 | +if (pendingClosure == 0) { |
| 2201 | +closureFuture.set(null); |
| 2202 | +} |
2183 | 2203 | }
|
| 2204 | + |
2184 | 2205 | retFuture.addListener(
|
2185 | 2206 | new Runnable() {
|
2186 | 2207 | @Override
|
|
0 commit comments