File tree

1 file changed

+114
-41
lines changed

1 file changed

+114
-41
lines changed
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,7 @@ private static enum Position {
10991099
private final ScheduledExecutorService executor;
11001100
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
11011101
private final ScheduledExecutorService prepareExecutor;
1102+
private final int prepareThreadPoolSize;
11021103
final PoolMaintainer poolMaintainer;
11031104
private final Clock clock;
11041105
private final Object lock = new Object();
@@ -1143,6 +1144,12 @@ private static enum Position {
11431144
@GuardedBy("lock")
11441145
private long numSessionsReleased = 0;
11451146

1147+
@GuardedBy("lock")
1148+
private long numSessionsInProcessPrepared = 0;
1149+
1150+
@GuardedBy("lock")
1151+
private long numSessionsAsyncPrepared = 0;
1152+
11461153
@GuardedBy("lock")
11471154
private long numIdleSessionsRemoved = 0;
11481155

@@ -1224,15 +1231,14 @@ private SessionPool(
12241231
this.options = options;
12251232
this.executorFactory = executorFactory;
12261233
this.executor = executor;
1227-
int prepareThreads;
12281234
if (executor instanceof ThreadPoolExecutor) {
1229-
prepareThreads = Math.max(((ThreadPoolExecutor) executor).getCorePoolSize(), 1);
1235+
prepareThreadPoolSize = Math.max(((ThreadPoolExecutor) executor).getCorePoolSize(), 1);
12301236
} else {
1231-
prepareThreads = 8;
1237+
prepareThreadPoolSize = 8;
12321238
}
12331239
this.prepareExecutor =
12341240
Executors.newScheduledThreadPool(
1235-
prepareThreads,
1241+
prepareThreadPoolSize,
12361242
new ThreadFactoryBuilder()
12371243
.setDaemon(true)
12381244
.setNameFormat("session-pool-prepare-%d")
@@ -1244,6 +1250,19 @@ private SessionPool(
12441250
}
12451251

12461252
@VisibleForTesting
1253+
long getNumberOfSessionsInProcessPrepared() {
1254+
synchronized (lock) {
1255+
return numSessionsInProcessPrepared;
1256+
}
1257+
}
1258+
1259+
@VisibleForTesting
1260+
long getNumberOfSessionsAsyncPrepared() {
1261+
synchronized (lock) {
1262+
return numSessionsAsyncPrepared;
1263+
}
1264+
}
1265+
12471266
void removeFromPool(PooledSession session) {
12481267
synchronized (lock) {
12491268
if (isClosed()) {
@@ -1453,46 +1472,98 @@ PooledSession getReadSession() throws SpannerException {
14531472
PooledSession getReadWriteSession() {
14541473
Span span = Tracing.getTracer().getCurrentSpan();
14551474
span.addAnnotation("Acquiring read write session");
1456-
Waiter waiter = null;
14571475
PooledSession sess = null;
1458-
synchronized (lock) {
1459-
if (closureFuture != null) {
1460-
span.addAnnotation("Pool has been closed");
1461-
throw new IllegalStateException("Pool has been closed");
1476+
// Loop to retry SessionNotFoundExceptions that might occur during in-process prepare of a
1477+
// session.
1478+
while (true) {
1479+
Waiter waiter = null;
1480+
boolean inProcessPrepare = false;
1481+
synchronized (lock) {
1482+
if (closureFuture != null) {
1483+
span.addAnnotation("Pool has been closed");
1484+
throw new IllegalStateException("Pool has been closed");
1485+
}
1486+
if (resourceNotFoundException != null) {
1487+
span.addAnnotation("Database has been deleted");
1488+
throw SpannerExceptionFactory.newSpannerException(
1489+
ErrorCode.NOT_FOUND,
1490+
String.format(
1491+
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
1492+
resourceNotFoundException.getMessage()),
1493+
resourceNotFoundException);
1494+
}
1495+
sess = writePreparedSessions.poll();
1496+
if (sess == null) {
1497+
if (numSessionsBeingPrepared <= prepareThreadPoolSize) {
1498+
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
1499+
PooledSession readSession = readSessions.poll();
1500+
if (readSession != null) {
1501+
span.addAnnotation(
1502+
"Acquired read only session. Preparing for read write transaction");
1503+
prepareSession(readSession);
1504+
} else {
1505+
span.addAnnotation("No session available");
1506+
maybeCreateSession();
1507+
}
1508+
}
1509+
} else {
1510+
inProcessPrepare = true;
1511+
numSessionsInProcessPrepared++;
1512+
PooledSession readSession = readSessions.poll();
1513+
if (readSession != null) {
1514+
// Create a read/write transaction in-process if there is already a queue for prepared
1515+
// sessions. This is more efficient than doing it asynchronously, as it scales with
1516+
// the number of user threads. The thread pool for asynchronously preparing sessions
1517+
// is fixed.
1518+
span.addAnnotation(
1519+
"Acquired read only session. Preparing in-process for read write transaction");
1520+
sess = readSession;
1521+
} else {
1522+
span.addAnnotation("No session available");
1523+
maybeCreateSession();
1524+
}
1525+
}
1526+
if (sess == null) {
1527+
waiter = new Waiter();
1528+
if (inProcessPrepare) {
1529+
// inProcessPrepare=true means that we have already determined that the queue for
1530+
// preparing read/write sessions is larger than the number of threads in the prepare
1531+
// thread pool, and that it's more efficient to do the prepare in-process. We will
1532+
// therefore create a waiter for a read-only session, even though a read/write session
1533+
// has been requested.
1534+
readWaiters.add(waiter);
1535+
} else {
1536+
readWriteWaiters.add(waiter);
1537+
}
1538+
}
1539+
} else {
1540+
span.addAnnotation("Acquired read write session");
1541+
}
14621542
}
1463-
if (resourceNotFoundException != null) {
1464-
span.addAnnotation("Database has been deleted");
1465-
throw SpannerExceptionFactory.newSpannerException(
1466-
ErrorCode.NOT_FOUND,
1467-
String.format(
1468-
"The session pool has been invalidated because a previous RPC returned 'Database not found': %s",
1469-
resourceNotFoundException.getMessage()),
1470-
resourceNotFoundException);
1543+
if (waiter != null) {
1544+
logger.log(
1545+
Level.FINE,
1546+
"No session available in the pool. Blocking for one to become available/created");
1547+
span.addAnnotation("Waiting for read write session to be available");
1548+
sess = waiter.take();
14711549
}
1472-
sess = writePreparedSessions.poll();
1473-
if (sess == null) {
1474-
if (numSessionsBeingPrepared <= readWriteWaiters.size()) {
1475-
PooledSession readSession = readSessions.poll();
1476-
if (readSession != null) {
1477-
span.addAnnotation("Acquired read only session. Preparing for read write transaction");
1478-
prepareSession(readSession);
1479-
} else {
1480-
span.addAnnotation("No session available");
1481-
maybeCreateSession();
1550+
if (inProcessPrepare) {
1551+
try {
1552+
sess.prepareReadWriteTransaction();
1553+
} catch (Throwable t) {
1554+
sess = null;
1555+
SpannerException e = newSpannerException(t);
1556+
if (!isClosed()) {
1557+
handlePrepareSessionFailure(e, sess, false);
1558+
}
1559+
if (!isSessionNotFound(e)) {
1560+
throw e;
14821561
}
14831562
}
1484-
waiter = new Waiter();
1485-
readWriteWaiters.add(waiter);
1486-
} else {
1487-
span.addAnnotation("Acquired read write session");
14881563
}
1489-
}
1490-
if (waiter != null) {
1491-
logger.log(
1492-
Level.FINE,
1493-
"No session available in the pool. Blocking for one to become available/created");
1494-
span.addAnnotation("Waiting for read write session to be available");
1495-
sess = waiter.take();
1564+
if (sess != null) {
1565+
break;
1566+
}
14961567
}
14971568
sess.markBusy();
14981569
incrementNumSessionsInUse();
@@ -1620,7 +1691,8 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
16201691
}
16211692
}
16221693

1623-
private void handlePrepareSessionFailure(SpannerException e, PooledSession session) {
1694+
private void handlePrepareSessionFailure(
1695+
SpannerException e, PooledSession session, boolean informFirstWaiter) {
16241696
synchronized (lock) {
16251697
if (isSessionNotFound(e)) {
16261698
invalidateSession(session);
@@ -1643,7 +1715,7 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
16431715
MoreObjects.firstNonNull(
16441716
this.resourceNotFoundException,
16451717
isDatabaseOrInstanceNotFound(e) ? (ResourceNotFoundException) e : null);
1646-
} else if (readWriteWaiters.size() > 0) {
1718+
} else if (informFirstWaiter && readWriteWaiters.size() > 0) {
16471719
releaseSession(session, Position.FIRST);
16481720
readWriteWaiters.poll().put(e);
16491721
} else {
@@ -1792,6 +1864,7 @@ public void run() {
17921864
sess.prepareReadWriteTransaction();
17931865
logger.log(Level.FINE, "Session prepared");
17941866
synchronized (lock) {
1867+
numSessionsAsyncPrepared++;
17951868
numSessionsBeingPrepared--;
17961869
if (!isClosed()) {
17971870
if (readWriteWaiters.size() > 0) {
@@ -1807,7 +1880,7 @@ public void run() {
18071880
synchronized (lock) {
18081881
numSessionsBeingPrepared--;
18091882
if (!isClosed()) {
1810-
handlePrepareSessionFailure(newSpannerException(t), sess);
1883+
handlePrepareSessionFailure(newSpannerException(t), sess, true);
18111884
}
18121885
}
18131886
}

0 commit comments

Comments
 (0)