Closed
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
Original file line numberDiff line numberDiff line change
Expand Up@@ -451,11 +451,6 @@ void initTransaction() {
this.tracer = builder.tracer;
}

@Override
public void setSpan(ISpan span) {
this.span = span;
}

long getSeqNo() {
return seqNo.incrementAndGet();
}
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -47,11 +47,6 @@ final class AsyncTransactionManagerImpl
this.options = Options.fromTransactionOptions(options);
}

@Override
public void setSpan(ISpan span) {
this.span = span;
}

@Override
public void close() {
closeAsync();
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -91,9 +91,6 @@ interface SessionTransaction {

/** Invalidates the transaction, generally because a new one has been started on the session. */
void invalidate();

/** Registers the current span on the transaction. */
void setSpan(ISpan span);
}

private final SpannerImpl spanner;
Expand All@@ -105,7 +102,6 @@ interface SessionTransaction {
private volatile Instant lastUseTime;
@Nullable private final Instant createTime;
private final boolean isMultiplexed;
private ISpan currentSpan;

SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
this.spanner = spanner;
Expand DownExpand Up@@ -143,14 +139,6 @@ public String getName() {
return options;
}

void setCurrentSpan(ISpan span) {
currentSpan = span;
}

ISpan getCurrentSpan() {
return currentSpan;
}

Instant getLastUseTime() {
return lastUseTime;
}
Expand DownExpand Up@@ -308,7 +296,7 @@ public ReadContext singleUse(TimestampBound bound) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setSpan(tracer.getCurrentSpan())
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
Expand All@@ -330,7 +318,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setSpan(tracer.getCurrentSpan())
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.buildSingleUseReadOnlyTransaction());
Expand All@@ -352,7 +340,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setSpan(tracer.getCurrentSpan())
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
Expand All@@ -370,12 +358,12 @@ public AsyncRunner runAsync(TransactionOption... options) {

@Override
public TransactionManager transactionManager(TransactionOption... options) {
return new TransactionManagerImpl(this, currentSpan, tracer, options);
return new TransactionManagerImpl(this, tracer.getCurrentSpan(), tracer, options);
}

@Override
public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) {
return new AsyncTransactionManagerImpl(this, currentSpan, options);
return new AsyncTransactionManagerImpl(this, tracer.getCurrentSpan(), options);
}

@Override
Expand DownExpand Up@@ -470,7 +458,7 @@ TransactionContextImpl newTransaction(Options options) {
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setSpan(currentSpan)
.setSpan(tracer.getCurrentSpan())
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(poolMaintainerClock == null ? new Clock() : poolMaintainerClock)
Expand All@@ -479,14 +467,12 @@ TransactionContextImpl newTransaction(Options options) {

<T extends SessionTransaction> T setActive(@Nullable T ctx) {
throwIfTransactionsPending();

if (activeTransaction != null) {
activeTransaction.invalidate();
}
activeTransaction = ctx;
readyTransactionId = null;
if (activeTransaction != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be an issue.

activeTransaction.setSpan(currentSpan);
if (!this.isMultiplexed) {
if (activeTransaction != null) {
activeTransaction.invalidate();
}
activeTransaction = ctx;
readyTransactionId = null;
}
return ctx;
}
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -40,15 +40,6 @@ final class TransactionManagerImpl implements TransactionManager, SessionTransac
this.options = Options.fromTransactionOptions(options);
}

ISpan getSpan() {
return span;
}

@Override
public void setSpan(ISpan span) {
this.span = span;
}

@Override
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -985,11 +985,7 @@ public TransactionRunner allowNestedTransaction() {
this.options = Options.fromTransactionOptions(options);
this.txn = session.newTransaction(this.options);
this.tracer = session.getTracer();
}

@Override
public void setSpan(ISpan span) {
this.span = span;
this.span = tracer.getCurrentSpan();
}

@Nullable
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -1455,7 +1455,6 @@ public void testSessionNotFoundReadWriteTransaction() {
when(closedSession.beginTransactionAsync(any(), eq(true))).thenThrow(sessionNotFound);
when(closedSession.getTracer()).thenReturn(tracer);
TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession);
closedTransactionRunner.setSpan(span);
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);

final SessionImpl openSession = mock(SessionImpl.class);
Expand All@@ -1470,7 +1469,6 @@ public void testSessionNotFoundReadWriteTransaction() {
.thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn")));
when(openSession.getTracer()).thenReturn(tracer);
TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession);
openTransactionRunner.setSpan(span);
when(openSession.readWriteTransaction()).thenReturn(openTransactionRunner);

ResultSet openResultSet = mock(ResultSet.class);
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -146,7 +146,6 @@ public void setUp() {
Span oTspan = mock(Span.class);
span = new OpenTelemetrySpan(oTspan);
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
transactionRunner.setSpan(span);
}

@SuppressWarnings("unchecked")
Expand DownExpand Up@@ -312,9 +311,7 @@ public void prepareReadWriteTransaction() {
throw new IllegalStateException();
}
};
session.setCurrentSpan(new OpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)));
TransactionRunnerImpl runner = new TransactionRunnerImpl(session);
runner.setSpan(span);
assertThat(usedInlinedBegin).isFalse();
runner.run(
transaction -> {
Expand DownExpand Up@@ -347,7 +344,6 @@ private long[] batchDmlException(int status) {
ApiFutures.immediateFuture(ByteString.copyFromUtf8(UUID.randomUUID().toString())));
when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName());
TransactionRunnerImpl runner = new TransactionRunnerImpl(session);
runner.setSpan(span);
ExecuteBatchDmlResponse response1 =
ExecuteBatchDmlResponse.newBuilder()
.addResultSets(
Expand Down