File tree

8 files changed

+263
-147
lines changed

8 files changed

+263
-147
lines changed
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,10 @@ SessionImpl createSession() {
215215
spanner.getOptions().getDatabaseRole(),
216216
spanner.getOptions().getSessionLabels(),
217217
options);
218-
return new SessionImpl(
219-
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
218+
SessionReference sessionReference =
219+
new SessionReference(
220+
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
221+
return new SessionImpl(spanner, sessionReference);
220222
} catch (RuntimeException e) {
221223
span.setStatus(e);
222224
throw e;
@@ -248,7 +250,9 @@ void createMultiplexedSession(SessionConsumer consumer) {
248250
true);
249251
SessionImpl sessionImpl =
250252
new SessionImpl(
251-
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), null);
253+
spanner,
254+
new SessionReference(
255+
session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
252256
consumer.onSessionReady(sessionImpl);
253257
} catch (Throwable t) {
254258
span.setStatus(t);
@@ -348,10 +352,11 @@ private List<SessionImpl> internalBatchCreateSessions(
348352
res.add(
349353
new SessionImpl(
350354
spanner,
351-
session.getName(),
352-
session.getCreateTime(),
353-
session.getMultiplexed(),
354-
options));
355+
new SessionReference(
356+
session.getName(),
357+
session.getCreateTime(),
358+
session.getMultiplexed(),
359+
options)));
355360
}
356361
return res;
357362
} catch (RuntimeException e) {
@@ -367,6 +372,6 @@ SessionImpl sessionWithId(String name) {
367372
synchronized (this) {
368373
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
369374
}
370-
return new SessionImpl(spanner, name, options);
375+
return new SessionImpl(spanner, new SessionReference(name, options));
371376
}
372377
}
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.spanner;
1818

1919
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
20-
import static com.google.common.base.Preconditions.checkNotNull;
2120

2221
import com.google.api.core.ApiFuture;
2322
import com.google.api.core.SettableApiFuture;
@@ -28,7 +27,6 @@
2827
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
2928
import com.google.cloud.spanner.Options.TransactionOption;
3029
import com.google.cloud.spanner.Options.UpdateOption;
31-
import com.google.cloud.spanner.SessionClient.SessionId;
3230
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
3331
import com.google.cloud.spanner.spi.v1.SpannerRpc;
3432
import com.google.common.base.Ticker;
@@ -97,49 +95,23 @@ interface SessionTransaction {
9795
}
9896

9997
private final SpannerImpl spanner;
100-
private final String name;
101-
private final DatabaseId databaseId;
98+
private final SessionReference sessionReference;
10299
private SessionTransaction activeTransaction;
103-
private final Map<SpannerRpc.Option, ?> options;
104-
private volatile Instant lastUseTime;
105-
@Nullable private final Instant createTime;
106-
private final boolean isMultiplexed;
107100
private ISpan currentSpan;
108101

109-
SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
102+
SessionImpl(SpannerImpl spanner, SessionReference sessionReference) {
110103
this.spanner = spanner;
111104
this.tracer = spanner.getTracer();
112-
this.options = options;
113-
this.name = checkNotNull(name);
114-
this.databaseId = SessionId.of(name).getDatabaseId();
115-
this.lastUseTime = Instant.now();
116-
this.createTime = null;
117-
this.isMultiplexed = false;
118-
}
119-
120-
SessionImpl(
121-
SpannerImpl spanner,
122-
String name,
123-
com.google.protobuf.Timestamp createTime,
124-
boolean isMultiplexed,
125-
Map<SpannerRpc.Option, ?> options) {
126-
this.spanner = spanner;
127-
this.tracer = spanner.getTracer();
128-
this.options = options;
129-
this.name = checkNotNull(name);
130-
this.databaseId = SessionId.of(name).getDatabaseId();
131-
this.lastUseTime = Instant.now();
132-
this.createTime = convert(createTime);
133-
this.isMultiplexed = isMultiplexed;
105+
this.sessionReference = sessionReference;
134106
}
135107

136108
@Override
137109
public String getName() {
138-
return name;
110+
return sessionReference.getName();
139111
}
140112

141113
Map<SpannerRpc.Option, ?> getOptions() {
142-
return options;
114+
return sessionReference.getOptions();
143115
}
144116

145117
void setCurrentSpan(ISpan span) {
@@ -151,19 +123,27 @@ ISpan getCurrentSpan() {
151123
}
152124

153125
Instant getLastUseTime() {
154-
return lastUseTime;
126+
return sessionReference.getLastUseTime();
155127
}
156128

157129
Instant getCreateTime() {
158-
return createTime;
130+
return sessionReference.getCreateTime();
159131
}
160132

161133
boolean getIsMultiplexed() {
162-
return isMultiplexed;
134+
return sessionReference.getIsMultiplexed();
135+
}
136+
137+
SessionReference getSessionReference() {
138+
return sessionReference;
163139
}
164140

165141
void markUsed(Instant instant) {
166-
lastUseTime = instant;
142+
sessionReference.markUsed(instant);
143+
}
144+
145+
public DatabaseId getDatabaseId() {
146+
return sessionReference.getDatabaseId();
167147
}
168148

169149
@Override
@@ -211,7 +191,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
211191
Options options = Options.fromTransactionOptions(transactionOptions);
212192
final CommitRequest.Builder requestBuilder =
213193
CommitRequest.newBuilder()
214-
.setSession(name)
194+
.setSession(getName())
215195
.setReturnCommitStats(options.withCommitStats())
216196
.addAllMutations(mutationsProto);
217197

@@ -239,7 +219,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
239219
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
240220
try (IScope s = tracer.withSpan(span)) {
241221
return SpannerRetryHelper.runTxWithRetriesOnAborted(
242-
() -> new CommitResponse(spanner.getRpc().commit(request, this.options)));
222+
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
243223
} catch (RuntimeException e) {
244224
span.setStatus(e);
245225
throw e;
@@ -271,7 +251,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
271251
List<BatchWriteRequest.MutationGroup> mutationGroupsProto =
272252
MutationGroup.toListProto(mutationGroups);
273253
final BatchWriteRequest.Builder requestBuilder =
274-
BatchWriteRequest.newBuilder().setSession(name).addAllMutationGroups(mutationGroupsProto);
254+
BatchWriteRequest.newBuilder()
255+
.setSession(getName())
256+
.addAllMutationGroups(mutationGroupsProto);
275257
RequestOptions batchWriteRequestOptions = getRequestOptions(transactionOptions);
276258
if (batchWriteRequestOptions != null) {
277259
requestBuilder.setRequestOptions(batchWriteRequestOptions);
@@ -282,7 +264,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
282264
}
283265
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
284266
try (IScope s = tracer.withSpan(span)) {
285-
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
267+
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), getOptions());
286268
} catch (Throwable e) {
287269
span.setStatus(e);
288270
throw SpannerExceptionFactory.newSpannerException(e);
@@ -303,7 +285,7 @@ public ReadContext singleUse(TimestampBound bound) {
303285
.setSession(this)
304286
.setTimestampBound(bound)
305287
.setRpc(spanner.getRpc())
306-
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
288+
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId()))
307289
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
308290
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
309291
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
@@ -325,7 +307,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
325307
.setSession(this)
326308
.setTimestampBound(bound)
327309
.setRpc(spanner.getRpc())
328-
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
310+
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId()))
329311
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
330312
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
331313
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
@@ -347,7 +329,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
347329
.setSession(this)
348330
.setTimestampBound(bound)
349331
.setRpc(spanner.getRpc())
350-
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
332+
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId()))
351333
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
352334
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
353335
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
@@ -379,14 +361,14 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
379361

380362
@Override
381363
public ApiFuture<Empty> asyncClose() {
382-
return spanner.getRpc().asyncDeleteSession(name, options);
364+
return spanner.getRpc().asyncDeleteSession(getName(), getOptions());
383365
}
384366

385367
@Override
386368
public void close() {
387369
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
388370
try (IScope s = tracer.withSpan(span)) {
389-
spanner.getRpc().deleteSession(name, options);
371+
spanner.getRpc().deleteSession(getName(), getOptions());
390372
} catch (RuntimeException e) {
391373
span.setStatus(e);
392374
throw e;
@@ -400,11 +382,11 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
400382
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
401383
final BeginTransactionRequest request =
402384
BeginTransactionRequest.newBuilder()
403-
.setSession(name)
385+
.setSession(getName())
404386
.setOptions(createReadWriteTransactionOptions(transactionOptions))
405387
.build();
406388
final ApiFuture<Transaction> requestFuture =
407-
spanner.getRpc().beginTransactionAsync(request, options, routeToLeader);
389+
spanner.getRpc().beginTransactionAsync(request, getOptions(), routeToLeader);
408390
requestFuture.addListener(
409391
() -> {
410392
try (IScope s = tracer.withSpan(span)) {
@@ -446,7 +428,7 @@ TransactionContextImpl newTransaction(Options options) {
446428
.setOptions(options)
447429
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
448430
.setRpc(spanner.getRpc())
449-
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
431+
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId()))
450432
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
451433
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
452434
.setSpan(currentSpan)
@@ -458,9 +440,11 @@ TransactionContextImpl newTransaction(Options options) {
458440

459441
<T extends SessionTransaction> T setActive(@Nullable T ctx) {
460442
throwIfTransactionsPending();
461-
462-
if (activeTransaction != null) {
463-
activeTransaction.invalidate();
443+
// multiplexed sessions support running concurrent transactions
444+
if (!getIsMultiplexed()) {
445+
if (activeTransaction != null) {
446+
activeTransaction.invalidate();
447+
}
464448
}
465449
activeTransaction = ctx;
466450
if (activeTransaction != null) {
@@ -472,11 +456,4 @@ <T extends SessionTransaction> T setActive(@Nullable T ctx) {
472456
TraceWrapper getTracer() {
473457
return tracer;
474458
}
475-
476-
private Instant convert(com.google.protobuf.Timestamp timestamp) {
477-
if (timestamp == null) {
478-
return null;
479-
}
480-
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
481-
}
482459
}

0 commit comments

Comments
 (0)