feat: add leader election for Discovery and Engine components#1423
feat: add leader election for Discovery and Engine components#1423thunguo wants to merge 4 commits intoapache:developfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a DB-backed leader election mechanism so that, in multi-replica deployments, only the elected leader runs Discovery/Engine “business logic” (e.g., list-watch and DB writes), improving consistency (Issue #1425).
Changes:
- Introduces
pkg/core/leaderwith a GORMleader_leasesmodel and aLeaderElectionloop (plus unit tests). - Wires leader election into
DiscoveryandEnginecomponents (conditional on non-memory store). - Exposes DB access plumbing from the Store component (and adds a
Pool()accessor onGormStore) to support leader-election DB usage.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/store/dbcommon/gorm_store.go | Adds Pool() accessor intended to expose the shared DB connection pool. |
| pkg/core/store/component.go | Adds a leader.DBSource implementation to expose a shared *gorm.DB from the store layer. |
| pkg/core/leader/model.go | Adds the LeaderLease GORM model for leader_leases. |
| pkg/core/leader/leader.go | Implements DB-based leader election (acquire/renew/release + run loop). |
| pkg/core/leader/db_source.go | Adds DBSource interface for components that can provide a *gorm.DB. |
| pkg/core/leader/leader_test.go | Adds unit tests for leader election behavior using in-memory SQLite. |
| pkg/core/engine/component.go | Integrates leader election into Engine startup flow. |
| pkg/core/discovery/component.go | Integrates leader election into Discovery startup flow; fixes a log message typo. |
| go.mod / go.sum | Dependency graph changes (tidy/reclassification + zookeeper direct dep). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| // LeaderLease is the GORM model for the leader_leases table | ||
| // It uses optimistic locking via the Version field to ensure atomic leader elections | ||
| type LeaderLease struct { | ||
| ID uint `gorm:"primaryKey;autoIncrement"` | ||
| Component string `gorm:"uniqueIndex;size:64;not null"` | ||
| HolderID string `gorm:"size:255;not null"` | ||
| AcquiredAt time.Time `gorm:"not null"` | ||
| ExpiresAt time.Time `gorm:"not null"` | ||
| Version int64 `gorm:"not null;default:0"` |
There was a problem hiding this comment.
The struct/doc comment says the Version field is used for optimistic locking to ensure atomic leader elections, but TryAcquire does not include version in its acquisition UPDATE predicate (it only uses it for Renew). Either update the documentation to match the actual behavior, or extend acquisition to use Version as part of the atomicity guarantee.
| // RunLeaderElection runs the leader election loop | ||
| // It blocks and runs onStartLeading/onStopLeading callbacks as leadership changes | ||
| // This is designed to be run in a separate goroutine | ||
| func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan struct{}, | ||
| onStartLeading func(), onStopLeading func()) { | ||
|
|
||
| ticker := time.NewTicker(le.acquireRetry) | ||
| defer ticker.Stop() | ||
|
|
||
| renewTicker := time.NewTicker(le.renewInterval) | ||
| renewTicker.Stop() // Don't start renewal ticker yet | ||
|
|
||
| isLeader := false | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| if isLeader { | ||
| le.Release(context.Background()) | ||
| onStopLeading() | ||
| } | ||
| return | ||
| case <-stopCh: | ||
| if isLeader { | ||
| le.Release(context.Background()) | ||
| onStopLeading() | ||
| } | ||
| return | ||
| case <-ticker.C: | ||
| // Try to acquire leadership if not already leader | ||
| if !isLeader { | ||
| if le.TryAcquire(ctx) { | ||
| logger.Infof("leader election: component %s acquired leadership (holder: %s)", le.component, le.holderID) | ||
| isLeader = true | ||
| renewTicker.Reset(le.renewInterval) | ||
| onStartLeading() | ||
| } |
There was a problem hiding this comment.
RunLeaderElection waits for the first ticker.C before the initial TryAcquire, which can delay leader startup by up to acquireRetry (default 5s). Consider attempting TryAcquire once before starting the ticker loop so a leader can start work immediately on boot.
| // poolProvider is an internal interface for stores that provide DB access | ||
| // This avoids circular imports by not referencing dbcommon directly | ||
| type poolProvider interface { | ||
| Pool() interface{} // Returns *ConnectionPool, but we don't type it to avoid import | ||
| } |
There was a problem hiding this comment.
poolProvider is defined as Pool() interface{}, but GormStore.Pool() returns *ConnectionPool. Go method return types are not covariant, so store.(poolProvider) will never succeed and GetDB() will always return (nil, false), effectively disabling leader election for DB stores. Consider exposing a small, import-cycle-safe interface on DB-backed stores (e.g. a GetDB() *gorm.DB method on the store itself) or adjust the Pool() signature to exactly match the interface and then type-assert to a GetDB() *gorm.DB interface instead of using reflection.
pkg/core/store/component.go
Outdated
| // GetDB returns the shared DB connection if the underlying store is DB-backed | ||
| // Implements the leader.DBSource interface | ||
| func (sc *storeComponent) GetDB() (*gorm.DB, bool) { | ||
| // Try to get DB from any store that has a Pool() method (all GormStores share the same ConnectionPool) | ||
| for _, store := range sc.stores { | ||
| if pp, ok := store.(poolProvider); ok { | ||
| pool := pp.Pool() | ||
| if pool == nil { | ||
| continue | ||
| } | ||
| // Use reflection to call GetDB() on the pool to avoid importing dbcommon | ||
| poolVal := reflect.ValueOf(pool) | ||
| getDBMethod := poolVal.MethodByName("GetDB") | ||
| if getDBMethod.IsValid() { | ||
| result := getDBMethod.Call(nil) | ||
| if len(result) > 0 { | ||
| if db, ok := result[0].Interface().(*gorm.DB); ok { | ||
| return db, true | ||
| } | ||
| } |
There was a problem hiding this comment.
GetDB() relies on the poolProvider type assertion + reflection. Given the current poolProvider signature mismatch, this loop will never find a DB and leader election will never be initialized. Even after fixing the signature, prefer a direct type assertion to a minimal interface (e.g. interface{ GetDB() *gorm.DB }) instead of reflection to avoid runtime surprises and make this easier to test.
pkg/core/engine/component.go
Outdated
| // Run leader election with callbacks for starting/stopping leadership | ||
| e.leaderElection.RunLeaderElection(ctx, ch, | ||
| func() { // onStartLeading callback | ||
| logger.Infof("engine: became leader, starting business logic") | ||
| if err := e.startBusinessLogic(ch); err != nil { | ||
| logger.Errorf("engine: failed to start business logic: %v", err) | ||
| } | ||
| }, | ||
| func() { // onStopLeading callback | ||
| logger.Warnf("engine: lost leadership, stopping business logic") | ||
| }, | ||
| ) |
There was a problem hiding this comment.
When leadership is acquired, startBusinessLogic(ch) starts informers using the global stop channel ch, but on leadership loss the onStopLeading callback only logs. This means informers/subscribers will continue running (and can be started multiple times on leadership flaps), violating the follower contract and potentially duplicating list-watch + DB writes. Use a per-leadership stop channel/context for informers, and in onStopLeading close it and Unsubscribe any subscribers (or otherwise ensure business logic is idempotent and fully stops on leadership loss).
pkg/core/discovery/component.go
Outdated
| // Run leader election with callbacks for starting/stopping leadership | ||
| d.leaderElection.RunLeaderElection(ctx, ch, | ||
| func() { // onStartLeading callback | ||
| logger.Infof("discovery: became leader, starting business logic") | ||
| if err := d.startBusinessLogic(ch); err != nil { | ||
| logger.Errorf("discovery: failed to start business logic: %v", err) | ||
| } | ||
| }, | ||
| func() { // onStopLeading callback | ||
| logger.Warnf("discovery: lost leadership, stopping business logic") | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Same issue as engine: startBusinessLogic(ch) launches informers with the global stop channel, but onStopLeading doesn’t stop them or unsubscribe, so a node that loses leadership will keep running leader-only behavior and repeated elections can start duplicate goroutines / duplicate subscription attempts. Use a dedicated stop channel/context per leadership term and stop/unsubscribe in onStopLeading to ensure followers do not execute list-watch or writes.
| if result.Error != nil { | ||
| // If insertion fails, it means another replica just created it | ||
| // This is expected in concurrent scenarios | ||
| logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error) | ||
| le.isLeader.Store(false) | ||
| return false |
There was a problem hiding this comment.
TryAcquire treats any Create error as a benign race ("probably created by another replica") and logs it at debug level. That will also hide real DB failures (connectivity, permission, schema issues) and can leave the component stuck as follower without a visible signal. Consider checking specifically for unique-constraint violations and logging/handling other errors as warnings (or returning an error).
| acquireRetry time.Duration | ||
| isLeader atomic.Bool | ||
| currentVersion int64 | ||
| stopCh chan struct{} |
There was a problem hiding this comment.
LeaderElection has a stopCh field that is never used (the stop channel is passed into RunLeaderElection instead). Removing the unused field will simplify the struct and avoid confusion about which stop mechanism is authoritative.
| stopCh chan struct{} |
|
|
||
| // Pool returns the connection pool for this store | ||
| // Used by other components (e.g., leader election) that need direct DB access | ||
| func (gs *GormStore) Pool() *ConnectionPool { |
There was a problem hiding this comment.
This new Pool() accessor returns *ConnectionPool, but the only consumer (storeComponent.GetDB) currently looks for a Pool() interface{} method, so GormStore will not satisfy that interface and the accessor won’t be used. Align the method signature with the consumer approach (or replace this with a GetDB() *gorm.DB method on GormStore to avoid exposing the pool at all).
| func (gs *GormStore) Pool() *ConnectionPool { | |
| func (gs *GormStore) Pool() interface{} { |
| ticker := time.NewTicker(le.acquireRetry) | ||
| defer ticker.Stop() | ||
|
|
||
| renewTicker := time.NewTicker(le.renewInterval) |
There was a problem hiding this comment.
renewTicker is created but not defer-stopped on all exit paths (only stopped in some state transitions). If the function returns while renewTicker is active, the ticker goroutine can leak. Add defer renewTicker.Stop() right after creation (similar to ticker).
| renewTicker := time.NewTicker(le.renewInterval) | |
| renewTicker := time.NewTicker(le.renewInterval) | |
| defer renewTicker.Stop() |
robocanic
left a comment
There was a problem hiding this comment.
Great Work! I left some comments and hope you can discuss it with me.
robocanic
left a comment
There was a problem hiding this comment.
Great Work! The logic of LeaderElection is perfect! There's another component Counter which is also needs to do leader election. Please add leader election for Counter(pkg/console/counter/component.go)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 10 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Pool returns the connection pool for this store | ||
| // Used by other components (e.g., leader election) that need direct DB access | ||
| func (gs *GormStore) Pool() *ConnectionPool { | ||
| return gs.pool |
There was a problem hiding this comment.
This new Pool() accessor returns a concrete *ConnectionPool, but the consumer in pkg/core/store currently uses an interface-based type assertion to avoid importing dbcommon. As implemented, the signatures don't match, so DB discovery fails. Consider returning a minimal interface type (e.g., an interface that only has GetDB() *gorm.DB) so core/store can depend on that interface without importing dbcommon, and avoid reflection there.
| func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error { | ||
| if !e.needsLeaderElection { | ||
| return e.startBusinessLogic(ch) | ||
| } | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| go func() { | ||
| <-ch | ||
| cancel() | ||
| }() | ||
|
|
||
| var leaderStopCh chan struct{} | ||
|
|
||
| e.leaderElection.RunLeaderElection(ctx, ch, | ||
| func() { // onStartLeading: create a fresh stopCh for this leadership term | ||
| leaderStopCh = make(chan struct{}) | ||
| logger.Infof("engine: became leader, starting business logic") | ||
| if err := e.startBusinessLogic(leaderStopCh); err != nil { | ||
| logger.Errorf("engine: failed to start business logic: %v", err) | ||
| } | ||
| }, | ||
| func() { // onStopLeading: stop informers from the current term | ||
| logger.Warnf("engine: lost leadership, stopping business logic") | ||
| if leaderStopCh != nil { | ||
| close(leaderStopCh) | ||
| leaderStopCh = nil | ||
| } | ||
| }, | ||
| ) | ||
|
|
||
| return nil |
There was a problem hiding this comment.
With leader election enabled, Start blocks inside RunLeaderElection and only returns when the process is stopping. The runtime logs "component started successfully" only after Start returns (see pkg/core/runtime/runtime.go), so this component will never be reported as started during normal operation, and startup errors from startBusinessLogic can no longer be surfaced via the Start return value. Consider running the leader-election loop in a goroutine and returning from Start after successful initialization, while still respecting ch for shutdown.
| @@ -154,7 +224,7 @@ func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error { | |||
| } | |||
| // 2. start informers | |||
| for _, informer := range e.informers { | |||
| go informer.Run(ch) | |||
| go informer.Run(stopCh) | |||
| } | |||
| logger.Infof("resource engine %s has started successfully", e.name) | |||
| return nil | |||
There was a problem hiding this comment.
startBusinessLogic is invoked every time leadership is acquired, but it (1) calls Subscribe again without unsubscribing on leadership loss (EventBus rejects duplicate subscriber names), and (2) attempts to Run the same informer instances again. The informer implementation explicitly disallows restarting an informer once it has been stopped (HasStarted() check in pkg/core/controller/informer.go), so after the first leadership loss/failover the new leader term will not actually restart list-watch. To support leader failover, unsubscribe subscribers on onStopLeading and recreate new informer instances (and list-watchers) for each leadership term, or refactor to keep a single informer instance but gate event processing without stopping it.
| defer cancel() | ||
|
|
||
| go func() { | ||
| <-ch | ||
| cancel() | ||
| }() | ||
|
|
||
| var leaderStopCh chan struct{} | ||
|
|
||
| d.leaderElection.RunLeaderElection(ctx, ch, | ||
| func() { // onStartLeading: create a fresh stopCh for this leadership term | ||
| leaderStopCh = make(chan struct{}) | ||
| logger.Infof("discovery: became leader, starting business logic") | ||
| if err := d.startBusinessLogic(leaderStopCh); err != nil { | ||
| logger.Errorf("discovery: failed to start business logic: %v", err) | ||
| } | ||
| }, | ||
| func() { // onStopLeading: stop informers from the current term | ||
| logger.Warnf("discovery: lost leadership, stopping business logic") | ||
| if leaderStopCh != nil { | ||
| close(leaderStopCh) | ||
| leaderStopCh = nil | ||
| } | ||
| }, | ||
| ) |
There was a problem hiding this comment.
With leader election enabled, Start blocks inside RunLeaderElection and only returns when the process is stopping. The runtime reports components as started only after Start returns (see pkg/core/runtime/runtime.go), so this component won't be marked started during normal operation, and failures in startBusinessLogic are only logged (not returned). Consider running the election loop asynchronously and returning from Start once setup is complete, while still stopping the loop when ch is closed.
| defer cancel() | |
| go func() { | |
| <-ch | |
| cancel() | |
| }() | |
| var leaderStopCh chan struct{} | |
| d.leaderElection.RunLeaderElection(ctx, ch, | |
| func() { // onStartLeading: create a fresh stopCh for this leadership term | |
| leaderStopCh = make(chan struct{}) | |
| logger.Infof("discovery: became leader, starting business logic") | |
| if err := d.startBusinessLogic(leaderStopCh); err != nil { | |
| logger.Errorf("discovery: failed to start business logic: %v", err) | |
| } | |
| }, | |
| func() { // onStopLeading: stop informers from the current term | |
| logger.Warnf("discovery: lost leadership, stopping business logic") | |
| if leaderStopCh != nil { | |
| close(leaderStopCh) | |
| leaderStopCh = nil | |
| } | |
| }, | |
| ) | |
| // Tie the leader election context to the component's stop channel. | |
| go func() { | |
| <-ch | |
| cancel() | |
| }() | |
| // Run the leader election loop asynchronously so that Start can return | |
| // and the runtime can mark this component as started. | |
| go func() { | |
| var leaderStopCh chan struct{} | |
| d.leaderElection.RunLeaderElection(ctx, ch, | |
| func() { // onStartLeading: create a fresh stopCh for this leadership term | |
| leaderStopCh = make(chan struct{}) | |
| logger.Infof("discovery: became leader, starting business logic") | |
| if err := d.startBusinessLogic(leaderStopCh); err != nil { | |
| logger.Errorf("discovery: failed to start business logic: %v", err) | |
| } | |
| }, | |
| func() { // onStopLeading: stop informers from the current term | |
| logger.Warnf("discovery: lost leadership, stopping business logic") | |
| if leaderStopCh != nil { | |
| close(leaderStopCh) | |
| leaderStopCh = nil | |
| } | |
| }, | |
| ) | |
| }() |
| @@ -123,11 +192,12 @@ func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error | |||
| fmt.Sprintf("subscriber %s can not subscribe resource changed events", sub.Name())) | |||
| } | |||
| } | |||
| // 2. start informers | |||
| for name, informers := range d.informers { | |||
| for _, informer := range informers { | |||
| go informer.Run(ch) | |||
| go informer.Run(stopCh) | |||
| } | |||
| logger.Infof("resource discvoery %s has started succesfully", name) | |||
| logger.Infof("resource discovery %s has started successfully", name) | |||
| } | |||
There was a problem hiding this comment.
startBusinessLogic is called on every leadership acquisition, but it re-subscribes all subscribers without unsubscribing on leadership loss (EventBus enforces unique subscriber names) and it tries to restart the same informer instances. The informer implementation does not allow restarting once stopped (HasStarted() guard in pkg/core/controller/informer.go), so after a leader failover the new leader term will not restart list-watch and business logic will be partially/fully disabled. Unsubscribe subscribers in onStopLeading and recreate informers per leadership term (or redesign to keep informers running but prevent follower-side writes).
| // Only take over an expired lease; never pre-empt an active holder. | ||
| result := le.db.WithContext(ctx).Model(&LeaderLease{}). | ||
| Where("component = ? AND expires_at < ?", le.component, now). | ||
| Updates(map[string]interface{}{ | ||
| "holder_id": le.holderID, | ||
| "acquired_at": now, | ||
| "expires_at": expiresAt, | ||
| "version": gorm.Expr("version + 1"), | ||
| }) | ||
|
|
||
| if result.Error != nil { | ||
| logger.Warnf("leader election: failed to update lease for component %s: %v", le.component, result.Error) | ||
| le.isLeader.Store(false) | ||
| return false | ||
| } | ||
|
|
||
| // If the update succeeded (found a row to update) | ||
| if result.RowsAffected > 0 { | ||
| // Fetch the updated version | ||
| var lease LeaderLease | ||
| err := le.db.WithContext(ctx). | ||
| Where("component = ?", le.component). | ||
| First(&lease).Error | ||
| if err == nil { | ||
| le.currentVersion = lease.Version | ||
| } | ||
| le.isLeader.Store(true) | ||
| return true | ||
| } | ||
|
|
||
| // No row was updated, try to insert a new record (lease doesn't exist) | ||
| result = le.db.WithContext(ctx).Create(&LeaderLease{ | ||
| Component: le.component, | ||
| HolderID: le.holderID, | ||
| AcquiredAt: now, | ||
| ExpiresAt: expiresAt, | ||
| Version: 1, | ||
| }) | ||
|
|
||
| if result.Error != nil { | ||
| // If insertion fails, it means another replica just created it | ||
| // This is expected in concurrent scenarios | ||
| logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error) | ||
| le.isLeader.Store(false) | ||
| return false | ||
| } |
There was a problem hiding this comment.
TryAcquire falls back to Create whenever the expired-lease UPDATE affects 0 rows. That includes the common case where a non-expired lease already exists, which will trigger a unique constraint error on every retry interval and produce avoidable DB work/log noise. Consider first checking whether a lease row exists (and is unexpired) and returning false early, or use an upsert/"insert ... on conflict do nothing" pattern and treat conflict as a normal non-leader outcome without logging it as an error.
|



Please provide a description of this PR:
Add leader election for Discovery and Engine components.
Issue #1425
To help us figure out who should review this PR, please put an X in all the areas that this PR affects.
Please check any characteristics that apply to this pull request.