Skip to content

Add adaptive metadata upload batch sizing#659

Open
guysmoilov wants to merge 26 commits intomainfrom
feature/adaptive-upload
Open

Add adaptive metadata upload batch sizing#659
guysmoilov wants to merge 26 commits intomainfrom
feature/adaptive-upload

Conversation

@guysmoilov
Copy link
Member

Summary

  • make metadata uploads use adaptive batch sizing instead of a fixed batch size
  • add new config knobs for minimum/initial batch size and target per-batch upload time
  • shrink batch size on failures and grow it again after successful/fast uploads
  • add datasource tests for growth and retry-with-smaller-batch behavior

Testing

  • .venv/bin/pytest tests/data_engine/test_datasource.py -q

@coderabbitai
Copy link

coderabbitai bot commented Mar 3, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an adaptive batching system for metadata uploads: new AdaptiveBatcher and AdaptiveBatchConfig, expanded config keys for adaptive behavior, integration into Datasource upload flow, retryability detection for upload errors, and comprehensive tests for adaptive logic and Datasource interactions.

Changes

Cohort / File(s) Summary
Configuration Extension
dagshub/common/config.py
Added new env keys and public variables for adaptive batching: max/min/initial batch sizes, target batch time, growth factor, retry backoff base/max. Preserves legacy DATAENGINE_METADATA_UPLOAD_BATCH_SIZE as a fallback.
Adaptive Batching Core
dagshub/common/adaptive_batching.py
New module adding AdaptiveBatchConfig (with from_values), sizing/backoff helpers, and AdaptiveBatcher implementing adaptive run loop with growth, shrink, exponential backoff, retries, and progress handling.
Metadata Upload Integration
dagshub/data_engine/model/datasource.py
Replaced fixed-size batching and manual progress with AdaptiveBatcher.run(...), delegating batching, timing, retries, and progress UI to the new adaptive batcher.
Retryability Detection
dagshub/data_engine/model/metadata/util.py
Added is_retryable_metadata_upload_error(exc: Exception) -> bool to classify GraphQL transport, connection, and timeout-related exceptions (including wrapped originals) as retryable.
Datasource Upload Tests
tests/data_engine/test_datasource.py
Added tests validating adaptive growth/shrink, retry/backoff behavior, non-retryable aborts, min-batch constraints, partial retries, and interactions with client.update_metadata calls.
Adaptive Batching Tests
tests/common/test_adaptive_batching.py
Comprehensive tests for AdaptiveBatchConfig.from_values, helper functions, backoff calculation, and AdaptiveBatcher.run under varied timing and failure scenarios (including generator inputs and time mocks).

Sequence Diagram(s)

sequenceDiagram
  participant DS as Datasource._upload_metadata
  participant AB as AdaptiveBatcher
  participant CL as DataEngineClient.update_metadata
  participant SV as Server/API
  participant UT as is_retryable_metadata_upload_error

  DS->>AB: construct config & run(items, operation=lambda batch: CL(batch))
  AB->>CL: update_metadata(batch)
  CL->>SV: send GraphQL/HTTP request
  SV-->>CL: response / error
  alt success
    CL-->>AB: success
    AB->>AB: measure time (≤/>) target → adjust batch size (grow/shrink)
    AB->>DS: continue with remaining items
  else error
    CL-->>AB: raise Exception
    AB->>UT: classify error
    alt retryable
      AB->>AB: compute backoff & shrink batch
      AB->>AB: sleep(backoff)
      AB->>CL: retry batch
    else non-retryable
      AB-->>DS: raise abort to caller
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

  • DagsHub/metarepo#10: Implements count-based adaptive batching and retry/backoff; related to discussions about adaptive metadata batching but does not add byte-size–based batching requested there.

Poem

🐰 I nibble bytes in nimble tries,
I grow my loads when uploads fly,
When servers hiccup, I shrink and wait,
Backoff, retry — I steer my fate,
Hopping metadata to the sky!

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add adaptive metadata upload batch sizing' accurately and concisely summarizes the main change: introducing adaptive batch sizing for metadata uploads.
Description check ✅ Passed The description is clearly related to the changeset, covering the main objectives including adaptive batching, config changes, retry behavior, and testing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/adaptive-upload

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/data_engine/test_datasource.py (1)

150-188: Add one test for the slow-success downshift path.

These tests validate grow-on-success and shrink-on-exception, but they do not cover the elapsed > target_batch_time branch in dagshub/data_engine/model/datasource.py (Lines 850-855). A dedicated test here would protect the time-based adaptation behavior from regressions.


ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65e9f6a and 84632f2.

📒 Files selected for processing (3)
  • dagshub/common/config.py
  • dagshub/data_engine/model/datasource.py
  • tests/data_engine/test_datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build (3.12)
  • GitHub Check: build (3.13)
  • GitHub Check: build (3.11)
  • GitHub Check: build (3.10)
  • GitHub Check: build (3.9)
🧰 Additional context used
🧬 Code graph analysis (3)
dagshub/common/config.py (2)
dagshub/data_engine/model/datapoint.py (1)
  • get (325-334)
dagshub/data_engine/client/loaders/base.py (1)
  • get (102-124)
tests/data_engine/test_datasource.py (1)
dagshub/data_engine/model/datasource.py (3)
  • source (180-181)
  • update_metadata (1822-1857)
  • _upload_metadata (751-859)
dagshub/data_engine/model/datasource.py (4)
dagshub/data_engine/model/query_result.py (2)
  • entries (110-114)
  • entries (117-119)
dagshub/data_engine/client/data_client.py (1)
  • update_metadata (228-248)
dagshub/data_engine/client/gql_mutations.py (1)
  • update_metadata (32-47)
dagshub/data_engine/voxel_plugin_server/routes/datasource.py (1)
  • update_metadata (32-45)
🪛 Ruff (0.15.2)
tests/data_engine/test_datasource.py

[warning] 180-180: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (3)
dagshub/common/config.py (1)

63-74: Config knobs are integrated cleanly.

The new min/initial/target metadata batch settings are wired consistently for adaptive upload behavior.

tests/data_engine/test_datasource.py (1)

11-24: Test scaffolding looks good.

The direct config import plus _uploaded_batch_sizes helper make the adaptive-batching assertions clear and robust.

dagshub/data_engine/model/datasource.py (1)

758-805: Adaptive sizing setup is well-structured.

The min/initial/max normalization and bounded growth/shrink helpers are clean and easy to reason about.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates Data Engine metadata uploads to use an adaptive batch size strategy (instead of a fixed batch size), with new configuration knobs and tests validating the growth and failure-retry behavior.

Changes:

  • Added adaptive batch sizing logic to Datasource._upload_metadata, growing/shrinking based on success/failure and per-batch elapsed time.
  • Introduced new config/env knobs for min/initial batch size and a target upload time per batch.
  • Added tests covering batch-size growth and retry-with-smaller-batch on failure.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
dagshub/data_engine/model/datasource.py Implements adaptive upload batching with growth/shrink heuristics and time-based targeting.
dagshub/common/config.py Adds config variables/env keys for min/initial batch sizes and target batch time.
tests/data_engine/test_datasource.py Adds tests asserting batch growth and failure retry behavior with reduced batch sizes.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
tests/data_engine/test_datasource.py (2)

155-158: Factor repeated config patch setup into a helper/fixture.

The same patch block is repeated in multiple tests, which makes future tuning harder. A small fixture (e.g., set_upload_batch_config(...)) would reduce drift.

Also applies to: 170-174, 195-199, 211-214, 228-231


199-199: Make monotonic mocking less brittle.

Line 199 hardcodes exactly four time.monotonic() values; minor internal instrumentation changes can break this test without behavior changes. Consider a generator/helper that safely serves extra values.


ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ded5007 and 567c830.

📒 Files selected for processing (2)
  • dagshub/data_engine/model/datasource.py
  • tests/data_engine/test_datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build (3.9)
  • GitHub Check: build (3.11)
  • GitHub Check: build (3.13)
  • GitHub Check: build (3.10)
  • GitHub Check: build (3.12)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/data_engine/test_datasource.py (4)
tests/data_engine/conftest.py (1)
  • ds (18-19)
dagshub/data_engine/client/data_client.py (1)
  • update_metadata (228-248)
dagshub/data_engine/model/metadata/dtypes.py (1)
  • DatapointMetadataUpdateEntry (23-31)
dagshub/data_engine/dtypes.py (1)
  • MetadataFieldType (20-36)
🪛 Ruff (0.15.2)
tests/data_engine/test_datasource.py

[warning] 180-180: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 238-238: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (2)
tests/data_engine/test_datasource.py (1)

150-246: Great adaptive-batching test coverage.

These cases validate the key behavior transitions (grow, shrink-on-failure, slow-success downshift, retryability classification, and partial-batch edge handling) and are well-targeted.

dagshub/data_engine/model/datasource.py (1)

807-821: Retryability classification is a solid improvement.

The explicit non-retryable fail-fast path and narrowed retryable exception handling reduce noisy retries and surface real failures sooner.

Also applies to: 847-850

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/data_engine/test_datasource.py (1)

223-246: Add a regression test for failure at exact min_batch_size.

Current coverage validates partial-batch-below-min retries, but it doesn’t assert behavior when a retryable error occurs at exactly the configured minimum. Please add that case so min-floor behavior is locked in.


ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 567c830 and 4a767ab.

📒 Files selected for processing (2)
  • dagshub/data_engine/model/datasource.py
  • tests/data_engine/test_datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build (3.9)
  • GitHub Check: build (3.11)
  • GitHub Check: build (3.12)
  • GitHub Check: build (3.10)
  • GitHub Check: build (3.13)
🧰 Additional context used
🧬 Code graph analysis (1)
dagshub/data_engine/model/datasource.py (1)
dagshub/data_engine/model/errors.py (1)
  • DataEngineGqlError (35-45)
🪛 Ruff (0.15.2)
tests/data_engine/test_datasource.py

[warning] 180-180: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 238-238: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 264-264: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (2)
dagshub/data_engine/model/datasource.py (1)

807-821: Retryable error classification + bounded backoff integration looks solid.

Good call separating retryable/non-retryable failures and resetting backoff after successful uploads.

Also applies to: 860-863, 875-875

tests/data_engine/test_datasource.py (1)

150-271: Great adaptive batching test coverage expansion.

The added scenarios (growth, retry downsizing, slow-batch reduction, non-retryable abort, and backoff reset) materially improve confidence in the new upload loop.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
dagshub/data_engine/model/datasource.py (1)

860-862: Backoff cap is effectively 4s, not 5s.

The current exponent clamp prevents the expression from ever reaching the configured 5s ceiling. Consider adjusting the exponent cap so the 5s max is reachable.

Patch
-                    retry_delay_sec = min(5.0, 0.25 * (2 ** min(consecutive_retryable_failures - 1, 4)))
+                    retry_delay_sec = min(5.0, 0.25 * (2 ** min(consecutive_retryable_failures - 1, 5)))
tests/data_engine/test_datasource.py (1)

165-188: Avoid real backoff sleeps in retry tests.

These retry-path tests can incur actual sleep delays, which slows the suite. Mocking time.sleep here keeps tests fast and deterministic.

Patch
 def test_upload_metadata_retries_with_smaller_batch_after_failure(ds, mocker):
+    mocker.patch("dagshub.data_engine.model.datasource.time.sleep")
     entries = [
         DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(10)
     ]
@@
 def test_upload_metadata_retries_partial_batch_below_min(ds, mocker):
+    mocker.patch("dagshub.data_engine.model.datasource.time.sleep")
     entries = [
         DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(10)
     ]

Also applies to: 223-246


ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a767ab and 0f5e9d7.

📒 Files selected for processing (2)
  • dagshub/data_engine/model/datasource.py
  • tests/data_engine/test_datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build (3.12)
  • GitHub Check: build (3.11)
  • GitHub Check: build (3.10)
  • GitHub Check: build (3.9)
  • GitHub Check: build (3.13)
🧰 Additional context used
🧬 Code graph analysis (1)
dagshub/data_engine/model/datasource.py (3)
dagshub/data_engine/model/errors.py (1)
  • DataEngineGqlError (35-45)
dagshub/data_engine/client/data_client.py (1)
  • update_metadata (228-248)
dagshub/data_engine/client/gql_mutations.py (1)
  • update_metadata (32-47)
🪛 Ruff (0.15.2)
tests/data_engine/test_datasource.py

[warning] 180-180: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 238-238: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 264-264: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (2)
dagshub/data_engine/model/datasource.py (1)

807-821: Good retryability split and failure handling path.

The non-retryable fast-fail branch plus targeted retryable exception filtering is a strong reliability improvement here.

Also applies to: 848-872

tests/data_engine/test_datasource.py (1)

150-290: Excellent adaptive-batching test coverage.

These scenarios exercise core growth/shrink behavior, retry classification, min-bound handling, and backoff reset semantics well.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@guysmoilov guysmoilov requested a review from Copilot March 3, 2026 23:00
@guysmoilov guysmoilov self-assigned this Mar 3, 2026
@guysmoilov guysmoilov added the enhancement New feature or request label Mar 3, 2026
Move generic adaptive batching logic (config, size adaptation, retry,
progress) out of data_engine.model.metadata into dagshub.common.adaptive_batching.
Config defaults come from common.config so callers don't need boilerplate.
Domain-specific is_retryable_metadata_upload_error stays in metadata.util.
Accept Iterable[T] instead of List[T], consuming via itertools.islice.
When total is unknown (no __len__), progress shows a counter instead of
a bar. Failed batches are re-prepended to the iterator for retry.
- Pass None (not float('inf')) as progress total for unbounded iterables
  to avoid OverflowError in MofNCompleteColumn
- Respect min_batch_size on failure path: floor at config.min_batch_size
  instead of hardcoded 1, and abort when batch_size <= min_batch_size
- Replace nested itertools.chain retry with a pending list to avoid
  unbounded recursion depth under repeated failures
Add batch_growth_factor, retry_backoff_base_seconds, retry_backoff_max_seconds
to AdaptiveBatchConfig with env var defaults in common.config. Add 34 unit
tests covering config normalization, batch sizing functions, retry delay,
and full AdaptiveBatcher.run integration (lists, generators, retries).
_next_batch_after_success could return the same batch_size forever when
batch_size == bad_batch_size - 1. Fix by guaranteeing +1 progress in
the stall guard. Also clear last_bad_batch_size when a fast success
occurs at or above it, since the bound is stale. Add regression tests
for convergence and the stall scenario, plus from_values() defaults.
Replace the convoluted _midpoint + multi-conditional logic in
_next_batch_after_success and _next_batch_after_retryable_failure with
straightforward (a+b)//2 midpoints, a _clamp helper, and clear
docstrings explaining each function's strategy. Remove _midpoint's
defensive max(1,...) hack — callers now handle boundaries directly.
When last_bad_batch_size drops below last_good_batch_size (from slow
batches or errors), clear the good bound since it is no longer useful
for binary search. Add integration test for the slow-batch path.
Use (min + ceiling) // 2 instead of batch_size // 2 in the failure
fallback so we binary-search within the valid range rather than jumping
to the ceiling when bad_batch_size is much smaller than batch_size.
Add integration test verifying batch size grows past a cleared bad bound.
When a DataEngineGqlError wraps a TimeoutError, ConnectionError, or
requests exception, the old code only checked for TransportServerError
and TransportConnectionFailed, causing retryable errors to be
misclassified as fatal. Recurse into the wrapped exception instead.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
dagshub/common/config.py (1)

60-67: Make the Python config name match the new “max” semantics.

DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MAX_KEY is explicit, but the exported variable is still dataengine_metadata_upload_batch_size. After this PR, that name is easy to misread as the current/fixed batch size when it actually feeds AdaptiveBatchConfig.max_batch_size. Adding dataengine_metadata_upload_batch_size_max and keeping the old name as a compatibility alias would make the new API much clearer.


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fba7f8fd-fdd9-4174-96a3-615b138ab08d

📥 Commits

Reviewing files that changed from the base of the PR and between 6ae0a58 and 232ebbf.

📒 Files selected for processing (5)
  • dagshub/common/adaptive_batching.py
  • dagshub/common/config.py
  • dagshub/data_engine/model/datasource.py
  • dagshub/data_engine/model/metadata/util.py
  • tests/common/test_adaptive_batching.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • dagshub/data_engine/model/datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Agent
  • GitHub Check: build (3.11)
  • GitHub Check: build (3.12)
  • GitHub Check: build (3.9)
  • GitHub Check: build (3.13)
  • GitHub Check: build (3.10)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/common/test_adaptive_batching.py (1)
dagshub/common/adaptive_batching.py (8)
  • AdaptiveBatchConfig (21-73)
  • AdaptiveBatcher (148-259)
  • _clamp (76-77)
  • _get_retry_delay_seconds (136-145)
  • _next_batch_after_retryable_failure (105-133)
  • _next_batch_after_success (80-102)
  • from_values (31-73)
  • run (161-259)
dagshub/common/adaptive_batching.py (1)
dagshub/common/rich_util.py (1)
  • get_rich_progress (6-15)
🪛 Ruff (0.15.5)
tests/common/test_adaptive_batching.py

[warning] 288-288: Unused lambda argument: batch

(ARG005)


[warning] 294-294: Unused lambda argument: batch

(ARG005)


[warning] 306-306: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 340-340: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (1)
tests/common/test_adaptive_batching.py (1)

45-216: Nice helper-level coverage.

The normalization, clamp, sizing, and backoff assertions here should make future tuning changes much safer.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
tests/data_engine/test_datasource.py (2)

178-183: Add one datasource-level case for DataEngineGqlError-wrapped transient failures.

These tests only simulate raw TimeoutError/ValueError, but the retryability change in dagshub.data_engine.model.metadata.util.is_retryable_metadata_upload_error is specifically about wrapped DataEngineGqlErrors. Without one end-to-end case here, the regression can slip back in even if the helper still has unit coverage.

Example adjustment
+from dagshub.data_engine.model.errors import DataEngineGqlError
+
 def test_upload_metadata_retries_with_smaller_batch_after_failure(ds, mocker):
@@
     def _flaky_upload(_ds, upload_entries):
         if len(upload_entries) == 8 and not has_failed["value"]:
             has_failed["value"] = True
-            raise TimeoutError("simulated timeout")
+            raise DataEngineGqlError(TimeoutError("simulated timeout"), support_id="test-support-id")

Also applies to: 204-209, 242-245, 263-270, 288-295, 310-313


162-162: Prefer behavioral assertions over exact batch-size sequences.

These assertions couple the datasource tests to the current AdaptiveBatcher heuristics ([8, 4, 6, 7, 8, 7], etc.). Since the adaptive math is already covered by the dedicated batching tests in this PR, these integration tests will be noisy on future tuning changes that do not break datasource behavior.

Also applies to: 188-188, 214-214, 230-230, 247-247, 272-272, 297-297, 315-316


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 404b21ce-2d3a-4cb2-8a4e-c536709a8d6d

📥 Commits

Reviewing files that changed from the base of the PR and between 232ebbf and 0453a33.

📒 Files selected for processing (2)
  • dagshub/data_engine/model/metadata/util.py
  • tests/data_engine/test_datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build (3.13)
  • GitHub Check: build (3.11)
  • GitHub Check: build (3.12)
  • GitHub Check: build (3.10)
  • GitHub Check: build (3.9)
🧰 Additional context used
🧬 Code graph analysis (1)
dagshub/data_engine/model/metadata/util.py (1)
dagshub/data_engine/model/errors.py (1)
  • DataEngineGqlError (35-45)
🪛 Ruff (0.15.5)
tests/data_engine/test_datasource.py

[warning] 181-181: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 207-207: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 265-265: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 291-291: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (1)
dagshub/data_engine/model/metadata/util.py (1)

28-42: Good fix for wrapped retryable errors.

This keeps the retry decision consistent when a transient transport/timeout failure is wrapped in DataEngineGqlError.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/data_engine/test_datasource.py (1)

150-162: Consider explicitly mocking adaptive_batch_growth_factor for test robustness.

This test doesn't mock adaptive_batch_growth_factor, so it relies on the default value. The expected [2, 12] requires a growth factor of at least 6 (2 × 6 = 12). If the default changes, this test would fail silently.

📋 Proposed fix
     mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 2)
     mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0)
+    mocker.patch.object(dagshub.common.config, "adaptive_batch_growth_factor", 8)
 
     ds._upload_metadata(entries)

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dcb603da-1805-4f65-b4f8-317ae4664644

📥 Commits

Reviewing files that changed from the base of the PR and between 0453a33 and 00294fe.

📒 Files selected for processing (2)
  • dagshub/common/adaptive_batching.py
  • tests/data_engine/test_datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Agent
  • GitHub Check: build (3.13)
  • GitHub Check: build (3.10)
  • GitHub Check: build (3.9)
  • GitHub Check: build (3.12)
  • GitHub Check: build (3.11)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/data_engine/test_datasource.py (5)
tests/data_engine/conftest.py (1)
  • ds (18-19)
dagshub/data_engine/model/datasource.py (4)
  • Datasource (158-1713)
  • source (181-182)
  • update_metadata (1727-1762)
  • _upload_metadata (752-764)
dagshub/data_engine/client/data_client.py (1)
  • update_metadata (228-248)
dagshub/data_engine/model/metadata/dtypes.py (1)
  • DatapointMetadataUpdateEntry (23-31)
dagshub/data_engine/dtypes.py (1)
  • MetadataFieldType (20-36)
dagshub/common/adaptive_batching.py (1)
dagshub/common/rich_util.py (1)
  • get_rich_progress (6-15)
🪛 Ruff (0.15.5)
tests/data_engine/test_datasource.py

[warning] 181-181: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 207-207: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 264-264: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 291-291: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (15)
tests/data_engine/test_datasource.py (8)

23-25: LGTM!

Clean helper function that extracts batch sizes from mock call history. Good use of list comprehension.


165-188: LGTM!

Good coverage of the retry-with-smaller-batch scenario. The test correctly verifies that after a retryable failure, the batch size shrinks via binary search and the remaining items (from pending + iterator) are processed.


191-214: LGTM!

Excellent test for the binary search convergence behavior. The expected [8, 4, 6, 7, 8, 7] correctly demonstrates the batcher avoiding the known-bad size of 8 until conditions change, then successfully processing it.


217-230: LGTM!

Good test for the slow-success path. The time mocking correctly simulates a slow first batch (2s > 1s target) triggering a batch size reduction.


233-247: LGTM!

Correctly verifies that non-retryable errors (like ValueError) abort immediately without retry attempts.


250-272: LGTM!

Good edge case coverage. The test correctly verifies that tail batches below the minimum configured size get exactly one retry attempt before aborting, and validates the backoff delay.


275-297: LGTM!

Properly verifies that the exponential backoff resets after each successful batch, rather than accumulating across all failures.


300-316: LGTM!

Correctly verifies that when starting at the minimum batch size, retryable failures abort immediately without retry (since there's nowhere to shrink to).

dagshub/common/adaptive_batching.py (7)

20-73: LGTM!

Well-designed immutable config class. The from_values factory method handles missing values with sensible defaults and properly normalizes edge cases (e.g., ensuring min ≤ initial ≤ max).


80-102: LGTM!

Good growth strategy. The binary search toward known-bad bounds enables faster convergence, and the max(candidate, batch_size + 1) guarantee prevents infinite stalling at a given size.


105-134: LGTM!

Correct shrink strategy with guaranteed convergence (ceiling = batch_size - 1 ensures strict decrease). The binary search toward known-good bounds is efficient.


162-166: LGTM!

Good handling of both Sized and unsized iterables. For unsized inputs, total=None correctly enables indeterminate progress display, and empty iterables are handled gracefully (the loop breaks when batch is empty).


200-241: LGTM!

Robust error handling:

  • Non-retryable errors abort immediately
  • The exhausted_shrink condition correctly distinguishes between "shrunk to minimum" vs "short tail batch"
  • Failed batch items are correctly re-queued for retry with smaller batches

243-266: LGTM!

Well-designed success path:

  • Fast success clears stale bad bounds and grows
  • Slow success triggers shrinking (same as retryable failure), treating slow batches as a signal to reduce size
  • Final progress update at Line 266 ensures accurate completion display even for unbounded iterables

137-146: The duck-typed SimpleNamespace approach is safe and correct. The wait_exponential implementation only reads the attempt_number attribute, so providing just this attribute via SimpleNamespace avoids unnecessary RetryCallState construction without compatibility issues. The existing comment and type: ignore suppression appropriately document the approach.

@guysmoilov guysmoilov requested a review from kbolashev March 11, 2026 07:25
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
dagshub/common/adaptive_batching.py (1)

137-146: Consider documenting the duck-typing approach.

The SimpleNamespace trick to duck-type tenacity's RetryCallState is clever but may confuse future maintainers. A brief inline comment explaining why this works would help.

📝 Suggested documentation improvement
 def _get_retry_delay_seconds(consecutive_retryable_failures: int, config: AdaptiveBatchConfig) -> float:
-    # SimpleNamespace duck-types the .attempt_number attribute that tenacity's
-    # wait strategies read, avoiding the heavier RetryCallState constructor.
+    # tenacity's wait strategies only read .attempt_number from their state object.
+    # Using SimpleNamespace avoids constructing the full RetryCallState which requires
+    # a retry decorator context. This duck-typing is intentional and tested.
     strategy = wait_exponential(

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 29872a54-175c-48b1-b5f5-d5c64275b893

📥 Commits

Reviewing files that changed from the base of the PR and between 00294fe and dbf7a68.

📒 Files selected for processing (3)
  • dagshub/common/adaptive_batching.py
  • dagshub/common/config.py
  • tests/data_engine/test_datasource.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Agent
  • GitHub Check: build (3.9)
  • GitHub Check: build (3.12)
  • GitHub Check: build (3.13)
  • GitHub Check: build (3.10)
  • GitHub Check: build (3.11)
🧰 Additional context used
🧬 Code graph analysis (2)
dagshub/common/adaptive_batching.py (1)
dagshub/common/rich_util.py (1)
  • get_rich_progress (6-15)
tests/data_engine/test_datasource.py (4)
tests/data_engine/conftest.py (1)
  • ds (18-19)
dagshub/data_engine/model/datasource.py (4)
  • Datasource (158-1713)
  • source (181-182)
  • update_metadata (1727-1762)
  • _upload_metadata (752-764)
dagshub/data_engine/model/metadata/dtypes.py (1)
  • DatapointMetadataUpdateEntry (23-31)
dagshub/data_engine/dtypes.py (1)
  • MetadataFieldType (20-36)
🪛 Ruff (0.15.5)
tests/data_engine/test_datasource.py

[warning] 181-181: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 207-207: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 264-264: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 291-291: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (20)
tests/data_engine/test_datasource.py (10)

11-11: LGTM!

The import of dagshub.common.config is necessary for patching configuration values in the adaptive batching tests.


23-25: LGTM!

Clean helper function that extracts batch sizes from mock call history. The list comprehension is readable and correctly accesses call.args[1] which corresponds to the batch parameter in update_metadata(ds, batch).


150-162: LGTM!

Good test coverage for the growth behavior. The test correctly verifies that batching starts at the configured initial size (2) and grows to consume the remaining entries (12).


165-188: LGTM!

Comprehensive test for retry-with-smaller-batch behavior. The test correctly verifies that after a failure at batch size 8, the system retries with a smaller batch (4) and then grows appropriately (6) for remaining entries.


191-214: LGTM!

Good test that verifies the binary search behavior around known-bad batch sizes. The progression [8, 4, 6, 7, 8, 7] correctly demonstrates that after the initial failure at 8, the system avoids that size until it's cleared by a fast success.


217-230: LGTM!

Good test for the "slow success" path that reduces batch size when uploads succeed but exceed the target time. The time mocking correctly simulates a 2-second elapsed time against a 1-second target.


233-247: LGTM!

Important test ensuring non-retryable errors propagate immediately without retry attempts. The test correctly verifies that ValueError (a non-retryable error) causes immediate failure with only one batch attempted.


250-272: LGTM!

Critical test for edge case handling of tail batches below the configured minimum. Verifies that such batches get exactly one retry before aborting, preventing infinite retry loops.


275-297: LGTM!

Good test ensuring the exponential backoff resets after a successful operation. The test correctly verifies that both sleep calls use the base delay (0.25) rather than escalating, confirming the counter reset.


300-316: LGTM!

Important edge case test for when the system starts at the minimum batch size and encounters a persistent failure. Verifies that it aborts immediately without unnecessary retries or sleep calls.

dagshub/common/config.py (3)

61-70: LGTM!

Good backward compatibility handling. The explicit MAX key takes precedence, then falls back to the legacy key, then the default. This ensures existing environment-based overrides continue to work.


72-78: LGTM!

The configuration for min and initial batch sizes looks good. Setting min=1 addresses the previous review concern about allowing shrinkage for large datapoints. Using min as the default for initial is a conservative choice that prioritizes stability over speed.


80-92: LGTM!

The target batch time and backoff configuration keys follow good naming conventions with _SECONDS suffix to indicate units. The defaults (5.0s target time, 0.25s-4.0s backoff range) are reasonable for network operations.

dagshub/common/adaptive_batching.py (7)

1-16: LGTM!

Clean imports with appropriate type hints. The TypeVar("T") enables generic typing for the batch items.


20-73: LGTM!

Well-designed configuration class with proper normalization. The from_values classmethod provides sensible defaults from the config module and ensures all values are valid (e.g., batch_growth_factor >= 2 to prevent stalling, target_batch_time >= 0.01 to prevent edge cases).


76-103: LGTM!

The _next_batch_after_success function implements a sound binary search strategy. Key insight: the max(candidate, batch_size + 1) on line 100 guarantees forward progress, preventing stalls when the binary search midpoint equals the current size.


105-134: LGTM!

Solid shrinking logic with binary search toward known-good sizes. The ceiling = batch_size - 1 on line 123 ensures strict shrinkage, preventing infinite retry loops at the same size.


149-165: LGTM!

Clean class initialization with sensible defaults. The early return for empty input (lines 164-165) is correct - when total is 0 (sized empty collection), we return immediately. For unsized iterables, total is None, so we proceed to the loop which will exit on the first empty batch.


183-241: LGTM!

The main batching loop is well-structured:

  • Pending items (from failed batches) are processed before new items
  • The exhausted_shrink logic (line 211) correctly handles both "at minimum" and "tail batch" cases
  • Re-queuing failed items with pending = batch + pending (line 240) preserves order
  • Stale bounds are cleared appropriately (lines 225-226) to prevent stuck binary searches

243-265: LGTM!

The success path correctly:

  • Resets the consecutive failure counter (line 244)
  • Updates progress (line 246)
  • Distinguishes between fast success (grow batch) and slow success (shrink batch)
  • Clears stale bounds when appropriate (lines 253-254, 260-261)

The final progress update (line 266) ensures the progress bar shows the correct completed count for both sized and unsized inputs.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MAX_KEY,
os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_KEY, DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_DEFAULT_MAX),
)
)
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataengine_metadata_upload_batch_size was removed. Since dagshub.common.config is a public import surface, this is a breaking change for any external code that referenced the old attribute. Consider keeping a backwards-compatible alias (e.g., dataengine_metadata_upload_batch_size = dataengine_metadata_upload_batch_size_max or = dataengine_metadata_upload_batch_size_initial) and documenting/deprecating it instead of removing outright.

Suggested change
)
)
# Backwards-compatible alias for legacy code. Prefer using the more specific
# *_min / *_initial / *_max settings instead.
dataengine_metadata_upload_batch_size = dataengine_metadata_upload_batch_size_max

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is worth doing or not

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-review-done Review loop by AI finished enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants