Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
### Documentation

### Internal Changes
* Add retry with backoff to `CachedTokenSource` async refresh so that a failed background refresh no longer disables async until a blocking call succeeds.

### API Changes
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,23 @@ private enum TokenState {
// monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes) while increasing the likelihood
// that the token is refreshed asynchronously if the auth server is down.
private static final Duration MAX_STALE_DURATION = Duration.ofMinutes(20);
// Delay before another async refresh may be attempted after an async refresh failure.
private static final Duration ASYNC_REFRESH_RETRY_BACKOFF = Duration.ofMinutes(1);
// Default additional buffer before expiry to consider a token as expired.
// This is 40 seconds by default since Azure Databricks rejects tokens that are within 30 seconds
// of expiry.
private static final Duration DEFAULT_EXPIRY_BUFFER = Duration.ofSeconds(40);

// The token source to use for refreshing the token.
// Underlying token source used to fetch replacement tokens.
private final TokenSource tokenSource;
// Whether asynchronous refresh is enabled.
private boolean asyncDisabled = false;
// The legacy duration before expiry to consider a token as 'stale'.
private final Duration staticStaleDuration;
// Whether to use the dynamic stale duration computation or defer to the legacy duration.
private final boolean useDynamicStaleDuration;
// The dynamically computed duration before expiry to consider a token as 'stale'.
private volatile Duration dynamicStaleDuration;
private final boolean useLegacyStaleDuration;
// The earliest time at which the cached token should be considered stale.
private volatile Instant staleAfter;
// Additional buffer before expiry to consider a token as expired.
private final Duration expiryBuffer;
// Clock supplier for current time.
Expand All @@ -62,23 +64,16 @@ private enum TokenState {
protected volatile Token token;
// Whether a refresh is currently in progress (for async refresh).
private boolean refreshInProgress = false;
// Whether the last refresh attempt succeeded.
private boolean lastRefreshSucceeded = true;

private CachedTokenSource(Builder builder) {
this.tokenSource = builder.tokenSource;
this.asyncDisabled = builder.asyncDisabled;
this.staticStaleDuration = builder.staleDuration;
this.useDynamicStaleDuration = builder.useDynamicStaleDuration;
this.useLegacyStaleDuration = builder.useLegacyStaleDuration;
this.expiryBuffer = builder.expiryBuffer;
this.clockSupplier = builder.clockSupplier;
this.token = builder.token;

if (this.useDynamicStaleDuration && this.token != null) {
this.dynamicStaleDuration = computeStaleDuration(this.token);
} else {
this.dynamicStaleDuration = Duration.ofMinutes(0);
}
this.updateToken(builder.token);
}

/**
Expand All @@ -91,7 +86,7 @@ public static class Builder {
private final TokenSource tokenSource;
private boolean asyncDisabled = false;
private Duration staleDuration = DEFAULT_STALE_DURATION;
private boolean useDynamicStaleDuration = true;
private boolean useLegacyStaleDuration = false;
private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER;
private ClockSupplier clockSupplier = new UtcClockSupplier();
private Token token;
Expand Down Expand Up @@ -139,15 +134,18 @@ public Builder setAsyncDisabled(boolean asyncDisabled) {
* Sets the duration before token expiry at which the token is considered stale.
*
* <p>When asynchronous refresh is enabled, tokens that are stale but not yet expired will
* trigger a background refresh while continuing to serve the current token.
* trigger a background refresh while continuing to serve the current token. Calling this method
* opts into the legacy fixed stale-duration behavior instead of the default dynamic stale
* computation, preserving backward compatibility for callers that already provide a custom
* stale duration.
*
* @param staleDuration The duration before expiry to consider a token stale. Must be greater
* than the expiry buffer duration.
* @return This builder instance for method chaining.
*/
public Builder setStaleDuration(Duration staleDuration) {
this.staleDuration = staleDuration;
this.useDynamicStaleDuration = false;
this.useLegacyStaleDuration = true;
return this;
}

Expand Down Expand Up @@ -190,6 +188,71 @@ public CachedTokenSource build() {
}
}

/**
* Replaces the cached token and recomputes the time after which it should be treated as stale.
*
* <p>Legacy mode uses the configured fixed stale duration. Dynamic mode derives the stale window
* from the token's remaining TTL and caps it at {@link #MAX_STALE_DURATION}. The stale threshold
* is written before the volatile token write so readers that observe the new token also observe
* the matching {@code staleAfter} value.
*
* @param t The token to cache. May be null.
*/
private void updateToken(Token t) {
if (t == null || t.getExpiry() == null) {
this.staleAfter = null;
this.token = t;
return;
}

if (this.useLegacyStaleDuration) {
this.staleAfter = t.getExpiry().minus(staticStaleDuration);
} else {
Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());
Duration staleDuration = ttl.dividedBy(2);
if (staleDuration.compareTo(MAX_STALE_DURATION) > 0) {
staleDuration = MAX_STALE_DURATION;
}
if (staleDuration.compareTo(Duration.ZERO) <= 0) {
staleDuration = Duration.ZERO;
}

this.staleAfter = t.getExpiry().minus(staleDuration);
}

// Publish the token after staleAfter so readers that observe the new token also observe the
// stale threshold computed for that token. Note: handleFailedAsyncRefresh writes staleAfter
// without a subsequent volatile token write, so a concurrent reader may briefly see a stale
// staleAfter value; the only consequence is one extra async trigger, which is harmless.
this.token = t;
}

/**
* Delays the next async refresh attempt after an async refresh failure.
*
* <p>The cached token remains usable until it becomes expired. Moving {@code staleAfter} into the
* future prevents callers from immediately retrying async refresh on every stale read while the
* auth service is unhealthy.
*/
private void handleFailedAsyncRefresh() {
if (this.staleAfter != null) {
Instant now = Instant.now(clockSupplier.getClock());
this.staleAfter = now.plus(ASYNC_REFRESH_RETRY_BACKOFF);
}
}

/**
* Returns {@code true} when the currently cached token has a later expiry than {@code candidate},
* meaning the candidate should be discarded. This prevents an async refresh that was started
* before a blocking refresh from overwriting the newer token obtained by the blocking path.
*/
private boolean cachedTokenIsNewer(Token candidate) {
return token != null
&& token.getExpiry() != null
&& candidate.getExpiry() != null
&& token.getExpiry().isAfter(candidate.getExpiry());
}

/**
* Gets the current token, refreshing if necessary. If async refresh is enabled, may return a
* stale token while a refresh is in progress.
Expand All @@ -206,21 +269,6 @@ public Token getToken() {
return getTokenAsync();
}

private Duration computeStaleDuration(Token t) {
if (t.getExpiry() == null) {
return Duration.ZERO; // Tokens with no expiry are considered permanent.
}

Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());

if (ttl.compareTo(Duration.ZERO) <= 0) {
return Duration.ZERO;
}

Duration halfTtl = ttl.dividedBy(2);
return halfTtl.compareTo(MAX_STALE_DURATION) > 0 ? MAX_STALE_DURATION : halfTtl;
}

/**
* Determine the state of the current token (fresh, stale, or expired).
*
Expand All @@ -234,12 +282,11 @@ protected TokenState getTokenState(Token t) {
return TokenState.FRESH; // Tokens with no expiry are considered permanent.
}

Duration lifeTime = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());
if (lifeTime.compareTo(expiryBuffer) <= 0) {
Instant now = Instant.now(clockSupplier.getClock());
if (now.isAfter(t.getExpiry().minus(expiryBuffer))) {
return TokenState.EXPIRED;
}
Duration staleDuration = useDynamicStaleDuration ? dynamicStaleDuration : staticStaleDuration;
if (lifeTime.compareTo(staleDuration) <= 0) {
if (now.isAfter(staleAfter)) {
return TokenState.STALE;
}
return TokenState.FRESH;
Expand All @@ -265,23 +312,15 @@ protected Token getTokenBlocking() {
if (getTokenState(token) != TokenState.EXPIRED) {
return token;
}
lastRefreshSucceeded = false;
Token newToken;
try {
newToken = tokenSource.getToken();
} catch (Exception e) {
logger.error("Failed to refresh token synchronously", e);
throw e;
}
lastRefreshSucceeded = true;

// Write dynamicStaleDuration before publishing the new token via the volatile write,
// so unsynchronized readers that see the new token are guaranteed to also see the
// updated dynamicStaleDuration.
if (useDynamicStaleDuration && newToken != null) {
dynamicStaleDuration = computeStaleDuration(newToken);
}
token = newToken;
updateToken(newToken);
return token;
}
}
Expand Down Expand Up @@ -316,33 +355,33 @@ protected Token getTokenAsync() {
* succeeded.
*/
private synchronized void triggerAsyncRefresh() {
// Check token state again inside the synchronized block to avoid triggering a refresh if
// another thread updated the token in the meantime.
if (!refreshInProgress && lastRefreshSucceeded && getTokenState(token) != TokenState.FRESH) {
refreshInProgress = true;
CompletableFuture.runAsync(
() -> {
try {
// Attempt to refresh the token in the background.
Token newToken = tokenSource.getToken();
synchronized (this) {
// Write dynamicStaleDuration before publishing the new token via the volatile
// write, so unsynchronized readers that see the new token are guaranteed to also
// see the updated dynamicStaleDuration.
if (useDynamicStaleDuration && newToken != null) {
dynamicStaleDuration = computeStaleDuration(newToken);
}
token = newToken;
refreshInProgress = false;
}
} catch (Exception e) {
synchronized (this) {
lastRefreshSucceeded = false;
refreshInProgress = false;
logger.error("Asynchronous token refresh failed", e);
// Re-check inside the synchronized block: another thread may have updated the token.
// Only STALE triggers async refresh; EXPIRED tokens are handled by getTokenBlocking, so
// an async attempt is unnecessary and would race with the blocking path.
if (refreshInProgress || getTokenState(token) != TokenState.STALE) {
return;
}

refreshInProgress = true;
CompletableFuture.runAsync(
() -> {
try {
Token newToken = tokenSource.getToken();
synchronized (this) {
if (newToken != null && !cachedTokenIsNewer(newToken)) {
updateToken(newToken);
}
}
});
}
} catch (Exception e) {
synchronized (this) {
handleFailedAsyncRefresh();
logger.error("Asynchronous token refresh failed", e);
}
} finally {
synchronized (this) {
refreshInProgress = false;
}
}
});
}
}
Loading
Loading