RATIS-2408. Add configurable exponential backoff reconnection for Netty DataStream client.#1349
RATIS-2408. Add configurable exponential backoff reconnection for Netty DataStream client.#1349slfan1989 wants to merge 3 commits intoapache:masterfrom
Conversation
…ty DataStream client.
|
@slfan1989 , thanks a lot for working on this! it is a great idea to use exponential backoff. We use a similar idea for How about adding a new conf |
@szetszwo Thanks for the suggestion and for pointing out the existing retry policy! Yeah, I agree — since we already have |
…ty DataStream client.
…ty DataStream client.
|
@szetszwo Thank you for the great suggestion! I took a closer look at I noticed that Ratis already has an Then in a follow-up PR, I can add a configurable Does this approach sound reasonable? |
szetszwo
left a comment
There was a problem hiding this comment.
@slfan1989 , thanks for the update!
Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13080856/1349_review.patch (need to update the test)
| Preconditions.assertTrue(minReconnectMillis > 0, () -> "minReconnectMillis = " + minReconnectMillis + " <= 0"); | ||
| Preconditions.assertTrue(maxReconnectMillis >= minReconnectMillis, | ||
| () -> "maxReconnectMillis = " + maxReconnectMillis + " < minReconnectMillis = " + minReconnectMillis); | ||
| Preconditions.assertTrue(maxReconnectAttempts >= 0, | ||
| () -> "maxReconnectAttempts = " + maxReconnectAttempts + " < 0"); |
There was a problem hiding this comment.
Let's move the preconditions to the ExponentialBackoffRetry constructor:
private ExponentialBackoffRetry(TimeDuration baseSleepTime, TimeDuration maxSleepTime, int maxAttempts) {
Objects.requireNonNull(baseSleepTime, "baseSleepTime == null");
Preconditions.assertTrue(baseSleepTime.isPositive(), () -> "baseSleepTime = " + baseSleepTime + " <= 0");
Objects.requireNonNull(maxSleepTime, "maxSleepTime == null");
Preconditions.assertTrue(maxSleepTime.compareTo(baseSleepTime) >= 0,
() -> "maxSleepTime = " + maxSleepTime + " < baseSleepTime = " + baseSleepTime);
Preconditions.assertTrue(maxAttempts >= 0, () -> "maxAttempts = " + maxAttempts + " < 0");
this.baseSleepTime = baseSleepTime;
this.maxSleepTime = maxSleepTime;
this.maxAttempts = maxAttempts;
}|
|
||
| /** | ||
| * Initial delay for reconnect attempts. | ||
| * The delay doubles on each failure with 0.5x-1.5x jitter. | ||
| */ | ||
| String RECONNECT_DELAY_KEY = PREFIX + ".reconnect.delay"; | ||
| TimeDuration RECONNECT_DELAY_DEFAULT = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); | ||
| static TimeDuration reconnectDelay(RaftProperties properties) { | ||
| return getTimeDuration(properties.getTimeDuration(RECONNECT_DELAY_DEFAULT.getUnit()), | ||
| RECONNECT_DELAY_KEY, RECONNECT_DELAY_DEFAULT, getDefaultLog()); | ||
| } | ||
| static void setReconnectDelay(RaftProperties properties, TimeDuration delay) { | ||
| setTimeDuration(properties::setTimeDuration, RECONNECT_DELAY_KEY, delay); | ||
| } | ||
|
|
||
| /** | ||
| * Maximum delay for reconnect attempts. | ||
| * The backoff increases until this upper bound. | ||
| */ | ||
| String RECONNECT_MAX_DELAY_KEY = PREFIX + ".reconnect.max-delay"; | ||
| TimeDuration RECONNECT_MAX_DELAY_DEFAULT = TimeDuration.valueOf(5, TimeUnit.SECONDS); | ||
| static TimeDuration reconnectMaxDelay(RaftProperties properties) { | ||
| return getTimeDuration(properties.getTimeDuration(RECONNECT_MAX_DELAY_DEFAULT.getUnit()), | ||
| RECONNECT_MAX_DELAY_KEY, RECONNECT_MAX_DELAY_DEFAULT, getDefaultLog()); | ||
| } | ||
| static void setReconnectMaxDelay(RaftProperties properties, TimeDuration delay) { | ||
| setTimeDuration(properties::setTimeDuration, RECONNECT_MAX_DELAY_KEY, delay); | ||
| } | ||
|
|
||
| /** | ||
| * Maximum number of reconnect attempts. | ||
| * Use {@link Integer#MAX_VALUE} for unlimited attempts. | ||
| */ | ||
| String RECONNECT_MAX_ATTEMPTS_KEY = PREFIX + ".reconnect.max-attempts"; | ||
| int RECONNECT_MAX_ATTEMPTS_DEFAULT = Integer.MAX_VALUE; | ||
| static int reconnectMaxAttempts(RaftProperties properties) { | ||
| return getInt(properties::getInt, RECONNECT_MAX_ATTEMPTS_KEY, RECONNECT_MAX_ATTEMPTS_DEFAULT, getDefaultLog()); | ||
| } | ||
| static void setReconnectMaxAttempts(RaftProperties properties, int maxAttempts) { | ||
| setInt(properties::setInt, RECONNECT_MAX_ATTEMPTS_KEY, maxAttempts); | ||
| } |
There was a problem hiding this comment.
Let's use a single conf so that we may support other RetryPolicy later.
/** A retry policy specified in comma separated format. */
String RECONNECT_POLICY_KEY = PREFIX + ".reconnect.policy";
/** ExponentialBackoffRetry with base sleep 100ms, max sleep 5ms and max attempt 100. */
String RECONNECT_POLICY_DEFAULT = "ExponentialBackoffRetry,100ms,5s,100";
static String reconnectPolicy(RaftProperties properties) {
return properties.get(RECONNECT_POLICY_KEY, RECONNECT_POLICY_DEFAULT);
}
static void setReconnectPolicy(RaftProperties properties, String retryPolicy) {
properties.set(RECONNECT_POLICY_KEY, retryPolicy);
}| this.reconnectPolicy = ExponentialBackoffRetry.newBuilder() | ||
| .setBaseSleepTime(reconnectDelay) | ||
| .setMaxSleepTime(reconnectMaxDelay) | ||
| .setMaxAttempts(maxReconnectAttempts) | ||
| .build(); |
There was a problem hiding this comment.
Let's pass reconnectPolicy. Then, we may support other RetryPolicy in the future.
Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier, RetryPolicy reconnectPolicy) {
this.address = address;
this.workerGroup = workerGroup;
this.channelInitializerSupplier = channelInitializerSupplier;
this.reconnectPolicy = reconnectPolicy;
this.ref = new AtomicReference<>(MemoizedSupplier.valueOf(this::connect));
}| if (!future.isSuccess()) { | ||
| scheduleReconnect(Connection.this + " failed", future.cause()); | ||
| } else { | ||
| resetReconnectAttempts(); |
There was a problem hiding this comment.
Let's just set it here and remove the method.
reconnectAttempts.set(0);
| final long delayMillis = Math.max(1L, action.getSleepTime().toLong(TimeUnit.MILLISECONDS)); | ||
| final TimeDuration delay = TimeDuration.valueOf(delayMillis, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Let's get the delay directly from the action:
final TimeDuration delay = action.getSleepTime();| } | ||
| if (cause != null) { | ||
| LOG.warn("", cause); | ||
| } |
There was a problem hiding this comment.
Print also the attempt:
if (cause != null) {
LOG.warn("{}: {}; reconnect to {} in {} for attempt {}",
this, message, address, delay, attempt, cause);
} else if (delay.compareTo(FIVE_HUNDRED_MS) < 0) {
LOG.info("{}: {}; reconnect to {} in {} for attempt {}", this, message, address, delay, attempt);
} else {
LOG.warn("{}: {}; reconnect to {} in {} for attempt {}", this, message, address, delay, attempt);
}
What changes were proposed in this pull request?
Implement configurable exponential backoff with jitter for Netty DataStream client reconnections to improve reliability and reduce resource usage during connection failures.
Problem
Currently, the Netty DataStream client uses a fixed 100ms delay for reconnection attempts when the connection fails. This approach has several limitations:
Solution
Implement configurable exponential backoff with jitter for DataStream client reconnections:
Configuration Support:
raft.client.datastream.reconnect.delay- Initial reconnection delay (default: 100ms)raft.client.datastream.reconnect.max-delay- Maximum backoff delay (default: 5s)Exponential Backoff:
Jitter (0.5x-1.5x):
Concurrent Safety:
Adaptive Logging:
What is the link to the Apache JIRA
RATIS-2408. Add configurable exponential backoff reconnection for Netty DataStream client.
How was this patch tested?
Unit Test.