Original file line numberDiff line numberDiff line change
Expand Up@@ -42,12 +42,16 @@ class MetricRegistryConstants {
static final String COUNT = "1";

// The Metric name and description
static final String MAX_IN_USE_SESSIONS = "cloud.google.com/java/spanner/max_in_use_session";
static final String MAX_IN_USE_SESSIONS = "cloud.google.com/java/spanner/max_in_use_sessions";
static final String MAX_ALLOWED_SESSIONS = "cloud.google.com/java/spanner/max_allowed_sessions";
static final String IN_USE_SESSIONS = "cloud.google.com/java/spanner/in_use_sessions";
static final String GET_SESSION_TIMEOUTS = "cloud.google.com/java/spanner/get_session_timeouts";

static final String MAX_IN_USE_SESSIONS_DESCRIPTION =
"The maximum number of sessions in use during the last 10 minute interval.";
static final String MAX_ALLOWED_SESSIONS_DESCRIPTION =
"The maximum number of sessions allowed. Configurable by the user.";
static final String IN_USE_SESSIONS_DESCRIPTION = "The number of sessions currently in use.";
static final String SESSIONS_TIMEOUTS_DESCRIPTION =
"The number of get sessions timeouts due to pool exhaustion";
}
Original file line numberDiff line numberDiff line change
Expand Up@@ -96,6 +96,32 @@ public void removeTimeSeries(List<LabelValue> list) {}
public void clear() {}
}

public static final class FakeDerivedLongCumulative extends DerivedLongCumulative {
private final MetricsRecord record;
private final String name;
private final List<LabelKey> labelKeys;

private FakeDerivedLongCumulative(
FakeMetricRegistry metricRegistry, String name, List<LabelKey> labelKeys) {
this.record = metricRegistry.record;
this.labelKeys = labelKeys;
this.name = name;
}

@Override
public <T> void createTimeSeries(
List<LabelValue> labelValues, T t, ToLongFunction<T> toLongFunction) {
this.record.metrics.put(this.name, new PointWithFunction(t, toLongFunction));
this.record.labels.put(this.labelKeys, labelValues);
}

@Override
public void removeTimeSeries(List<LabelValue> list) {}

@Override
public void clear() {}
}

/**
* A {@link MetricRegistry} implementation that saves metrics records to be accessible from {@link
* #pollRecord()}.
Expand DownExpand Up@@ -144,7 +170,7 @@ public DoubleCumulative addDoubleCumulative(String s, MetricOptions metricOption

@Override
public DerivedLongCumulative addDerivedLongCumulative(String s, MetricOptions metricOptions) {
throw new UnsupportedOperationException();
return new FakeDerivedLongCumulative(this, s, metricOptions.getLabelKeys());
}

@Override
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -1561,12 +1561,14 @@ public void run() {
}

@Test
public void testSessionMetrics() {
public void testSessionMetrics() throws Exception {
// Create a session pool with max 2 session and a low timeout for waiting for a session.
options =
SessionPoolOptions.newBuilder()
.setMinSessions(1)
.setMaxSessions(3)
.setMaxSessions(2)
.setMaxIdleSessions(0)
.setInitialWaitForSessionTimeoutMillis(20L)
.build();
FakeClock clock = new FakeClock();
clock.currentTimeMillis = System.currentTimeMillis();
Expand All@@ -1583,16 +1585,46 @@ public void testSessionMetrics() {
Session session2 = pool.getReadSession();

MetricsRecord record = metricRegistry.pollRecord();
assertThat(record.getMetrics().size()).isEqualTo(4);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.IN_USE_SESSIONS, 2L);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.MAX_IN_USE_SESSIONS, 2L);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.GET_SESSION_TIMEOUTS, 0L);
assertThat(record.getMetrics())
.containsEntry(
MetricRegistryConstants.MAX_ALLOWED_SESSIONS, (long) options.getMaxSessions());
assertThat(record.getLabels()).containsEntry(SPANNER_LABEL_KEYS, labelValues);

final CountDownLatch latch = new CountDownLatch(1);
// Try asynchronously to take another session. This attempt should time out.
Future<Void> fut =
executor.submit(
new Callable<Void>() {
@Override
public Void call() {
latch.countDown();
Session session = pool.getReadSession();
session.close();
return null;
}
});
// Wait until the background thread is actually waiting for a session.
latch.await();
// Wait until the request has timed out.
int waitCount = 0;
while (pool.getNumWaiterTimeouts() == 0L && waitCount < 1000) {
Thread.sleep(5L);
waitCount++;
}
// Return the checked out session to the pool so the async request will get a session and
// finish.
session2.close();
session1.close();
// Verify that the async request also succeeds.
fut.get(10L, TimeUnit.SECONDS);
executor.shutdown();

session1.close();
assertThat(record.getMetrics().get(MetricRegistryConstants.GET_SESSION_TIMEOUTS).longValue())
.isAtLeast(1L);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.IN_USE_SESSIONS, 0L);
assertThat(record.getMetrics()).containsEntry(MetricRegistryConstants.MAX_IN_USE_SESSIONS, 2L);
}
Expand Down