File tree

6 files changed

+240
-5
lines changed

6 files changed

+240
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,9 @@
2626
<method>com.google.cloud.storage.BucketInfo$Builder setUpdateTime(java.lang.Long)</method>
2727
<differenceType>7013</differenceType>
2828
</difference>
29+
<difference>
30+
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
31+
<method>long getCurrentUploadOffset(java.lang.String)</method>
32+
<differenceType>7012</differenceType>
33+
</difference>
2934
</differences>
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
5151
// Contains metadata of the updated object or null if upload is not completed.
5252
private StorageObject storageObject;
5353

54+
// Detect if flushBuffer() is being retried or not.
55+
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
56+
// occuring.
57+
private boolean retrying = false;
58+
59+
boolean isRetrying() {
60+
return retrying;
61+
}
62+
5463
StorageObject getStorageObject() {
5564
return storageObject;
5665
}
@@ -63,11 +72,105 @@ protected void flushBuffer(final int length, final boolean last) {
6372
new Runnable() {
6473
@Override
6574
public void run() {
66-
storageObject =
67-
getOptions()
68-
.getStorageRpcV1()
69-
.writeWithResponse(
70-
getUploadId(), getBuffer(), 0, getPosition(), length, last);
75+
if (!isRetrying()) {
76+
// Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
77+
retrying = true;
78+
storageObject =
79+
getOptions()
80+
.getStorageRpcV1()
81+
.writeWithResponse(
82+
getUploadId(), getBuffer(), 0, getPosition(), length, last);
83+
} else {
84+
// Retriable interruption occurred.
85+
// Variables:
86+
// chunk = getBuffer()
87+
// localNextByteOffset == getPosition()
88+
// chunkSize = getChunkSize()
89+
//
90+
// Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
91+
// we are retrying from first chunk start from 0 offset.
92+
//
93+
// Case 2: localNextByteOffset == remoteNextByteOffset:
94+
// Special case of Case 1 when a chunk is retried.
95+
//
96+
// Case 3: localNextByteOffset < remoteNextByteOffset
97+
// && driftOffset < chunkSize:
98+
// Upload progressed and localNextByteOffset is not in-sync with
99+
// remoteNextByteOffset and driftOffset is less than chunkSize.
100+
// driftOffset must be less than chunkSize for it to retry using
101+
// chunk maintained in memory.
102+
// Find the driftOffset by subtracting localNextByteOffset from
103+
// remoteNextByteOffset.
104+
// Use driftOffset to determine where to restart from using the chunk in
105+
// memory.
106+
//
107+
// Case 4: localNextByteOffset < remoteNextByteOffset
108+
// && driftOffset == chunkSize:
109+
// Special case of Case 3.
110+
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
111+
// to the next chunk.
112+
//
113+
// Case 5: localNextByteOffset < remoteNextByteOffset
114+
// && driftOffset > chunkSize:
115+
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
116+
// chunk maintained in memory. This is not possible unless there's multiple
117+
// clients uploading to the same resumable upload session.
118+
//
119+
// Case 6: localNextByteOffset > remoteNextByteOffset:
120+
// For completeness, this case is not possible because it would require retrying
121+
// a 400 status code which is not allowed.
122+
//
123+
// Get remote offset from API
124+
long remoteNextByteOffset =
125+
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
126+
long localNextByteOffset = getPosition();
127+
int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
128+
int retryChunkLength = length - driftOffset;
129+
130+
if (localNextByteOffset == 0 && remoteNextByteOffset == 0
131+
|| localNextByteOffset == remoteNextByteOffset) {
132+
// Case 1 and 2
133+
storageObject =
134+
getOptions()
135+
.getStorageRpcV1()
136+
.writeWithResponse(
137+
getUploadId(), getBuffer(), 0, getPosition(), length, last);
138+
} else if (localNextByteOffset < remoteNextByteOffset
139+
&& driftOffset < getChunkSize()) {
140+
// Case 3
141+
storageObject =
142+
getOptions()
143+
.getStorageRpcV1()
144+
.writeWithResponse(
145+
getUploadId(),
146+
getBuffer(),
147+
driftOffset,
148+
remoteNextByteOffset,
149+
retryChunkLength,
150+
last);
151+
} else if (localNextByteOffset < remoteNextByteOffset
152+
&& driftOffset == getChunkSize()) {
153+
// Case 4
154+
// Continue to next chunk
155+
retrying = false;
156+
return;
157+
} else {
158+
// Case 5
159+
StringBuilder sb = new StringBuilder();
160+
sb.append(
161+
"Remote offset has progressed beyond starting byte offset of next chunk.");
162+
sb.append(
163+
"This may be a symptom of multiple clients uploading to the same upload session.\n\n");
164+
sb.append("For debugging purposes:\n");
165+
sb.append("uploadId: ").append(getUploadId()).append('\n');
166+
sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
167+
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
168+
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
169+
throw new StorageException(0, sb.toString());
170+
}
171+
}
172+
// Request was successful and retrying state is now disabled.
173+
retrying = false;
71174
}
72175
}),
73176
getOptions().getRetrySettings(),
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,51 @@ public void write(
747747
writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last);
748748
}
749749

750+
@Override
751+
public long getCurrentUploadOffset(String uploadId) {
752+
try {
753+
GenericUrl url = new GenericUrl(uploadId);
754+
HttpRequest httpRequest = storage.getRequestFactory().buildPutRequest(url, null);
755+
756+
httpRequest.getHeaders().setContentRange("bytes */*");
757+
// Turn off automatic redirects.
758+
// HTTP 308 are returned if upload is incomplete.
759+
// See: https://cloud.google.com/storage/docs/performing-resumable-uploads
760+
httpRequest.setFollowRedirects(false);
761+
762+
HttpResponse response = null;
763+
try {
764+
response = httpRequest.execute();
765+
int code = response.getStatusCode();
766+
String message = response.getStatusMessage();
767+
if (code == 201 || code == 200) {
768+
throw new StorageException(0, "Resumable upload is already complete.");
769+
}
770+
StringBuilder sb = new StringBuilder();
771+
sb.append("Not sure what occurred. Here's debugging information:\n");
772+
sb.append("Response:\n").append(response.toString()).append("\n\n");
773+
throw new StorageException(0, sb.toString());
774+
} catch (HttpResponseException ex) {
775+
int code = ex.getStatusCode();
776+
if (code == 308 && ex.getHeaders().getRange() == null) {
777+
// No progress has been made.
778+
return 0;
779+
} else {
780+
// API returns last byte received offset
781+
String range = ex.getHeaders().getRange();
782+
// Return next byte offset by adding 1 to last byte received offset
783+
return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
784+
}
785+
} finally {
786+
if (response != null) {
787+
response.disconnect();
788+
}
789+
}
790+
} catch (IOException ex) {
791+
throw translate(ex);
792+
}
793+
}
794+
750795
@Override
751796
public StorageObject writeWithResponse(
752797
String uploadId,
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,15 @@ void write(
328328
int length,
329329
boolean last);
330330

331+
/**
332+
* Requests current byte offset from Cloud Storage API. Used to recover from a failure in some
333+
* bytes were committed successfully to the open resumable session.
334+
*
335+
* @param uploadId resumable upload ID URL
336+
* @throws StorageException upon failure
337+
*/
338+
long getCurrentUploadOffset(String uploadId);
339+
331340
/**
332341
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
333342
* returns metadata of the updated object, otherwise returns null.
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ public void write(
139139
throw new UnsupportedOperationException("Not implemented yet");
140140
}
141141

142+
@Override
143+
public long getCurrentUploadOffset(String uploadId) {
144+
throw new UnsupportedOperationException("Not implemented yet");
145+
}
146+
142147
@Override
143148
public StorageObject writeWithResponse(
144149
String uploadId,
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,74 @@ public void testWriteWithoutFlush() throws IOException {
134134
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
135135
}
136136

137+
@Test
138+
public void testWriteWithFlushRetryChunk() throws IOException {
139+
StorageException exception = new StorageException(new SocketException("Socket closed"));
140+
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
141+
Capture<byte[]> capturedBuffer = Capture.newInstance();
142+
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
143+
expect(
144+
storageRpcMock.writeWithResponse(
145+
eq(UPLOAD_ID),
146+
capture(capturedBuffer),
147+
eq(0),
148+
eq(0L),
149+
eq(MIN_CHUNK_SIZE),
150+
eq(false)))
151+
.andThrow(exception);
152+
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
153+
expect(
154+
storageRpcMock.writeWithResponse(
155+
eq(UPLOAD_ID),
156+
capture(capturedBuffer),
157+
eq(0),
158+
eq(0L),
159+
eq(MIN_CHUNK_SIZE),
160+
eq(false)))
161+
.andReturn(null);
162+
replay(storageRpcMock);
163+
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
164+
writer.setChunkSize(MIN_CHUNK_SIZE);
165+
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
166+
assertTrue(writer.isOpen());
167+
assertNull(writer.getStorageObject());
168+
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
169+
}
170+
171+
@Test
172+
public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
173+
StorageException exception = new StorageException(new SocketException("Socket closed"));
174+
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
175+
Capture<byte[]> capturedBuffer = Capture.newInstance();
176+
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
177+
expect(
178+
storageRpcMock.writeWithResponse(
179+
eq(UPLOAD_ID),
180+
capture(capturedBuffer),
181+
eq(0),
182+
eq(0L),
183+
eq(MIN_CHUNK_SIZE),
184+
eq(false)))
185+
.andThrow(exception);
186+
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L);
187+
expect(
188+
storageRpcMock.writeWithResponse(
189+
eq(UPLOAD_ID),
190+
capture(capturedBuffer),
191+
eq(10),
192+
eq(10L),
193+
eq(MIN_CHUNK_SIZE - 10),
194+
eq(false)))
195+
.andReturn(null);
196+
replay(storageRpcMock);
197+
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
198+
writer.setChunkSize(MIN_CHUNK_SIZE);
199+
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
200+
assertTrue(writer.isOpen());
201+
assertNull(writer.getStorageObject());
202+
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
203+
}
204+
137205
@Test
138206
public void testWriteWithFlush() throws IOException {
139207
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);

0 commit comments

Comments
 (0)