Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,24 @@ Result<std::unique_ptr<NameMapping>> NameMappingFromJson(const nlohmann::json& j
return NameMapping::Make(std::move(mapped_fields));
}

std::optional<std::string> UpdateMappingFromJsonString(
std::string_view mapping_json, const std::map<int32_t, SchemaField>& updates,
const std::multimap<int32_t, int32_t>& adds) {
auto json_result = FromJsonString(std::string(mapping_json));
if (!json_result) return std::nullopt;

auto current_mapping = NameMappingFromJson(*json_result);
if (!current_mapping) return std::nullopt;

auto updated_mapping = UpdateMapping(**current_mapping, updates, adds);
if (!updated_mapping) return std::nullopt;

auto json_str = ToJsonString(ToJson(**updated_mapping));
if (!json_str) return std::nullopt;

return std::move(*json_str);
}

nlohmann::json ToJson(const TableIdentifier& identifier) {
nlohmann::json json;
json[kNamespace] = identifier.ns.levels;
Expand Down
12 changes: 12 additions & 0 deletions src/iceberg/json_serde_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

#pragma once

#include <map>
#include <memory>
#include <optional>
#include <string>
#include <string_view>

#include <nlohmann/json_fwd.hpp>

Expand Down Expand Up @@ -347,6 +351,14 @@ ICEBERG_EXPORT nlohmann::json ToJson(const NameMapping& name_mapping);
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> NameMappingFromJson(
const nlohmann::json& json);

/// \brief Update a name mapping from its JSON string and return updated JSON.
///
/// Parses the JSON, calls UpdateMapping, and serializes the result.
/// Returns nullopt if any step fails.
ICEBERG_EXPORT std::optional<std::string> UpdateMappingFromJsonString(
std::string_view mapping_json, const std::map<int32_t, SchemaField>& updates,
const std::multimap<int32_t, int32_t>& adds);

/// \brief Serializes a `TableIdentifier` object to JSON.
///
/// \param identifier The `TableIdentifier` object to be serialized.
Expand Down
161 changes: 161 additions & 0 deletions src/iceberg/name_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,158 @@ class CreateMappingVisitor {
}
};

// Visitor class for updating name mappings with schema changes
class UpdateMappingVisitor {
public:
UpdateMappingVisitor(const std::map<int32_t, SchemaField>& updates,
const std::multimap<int32_t, int32_t>& adds)
: updates_(updates), adds_(adds) {}

Result<std::unique_ptr<MappedFields>> VisitMapping(const NameMapping& mapping) {
auto fields_result = VisitFields(mapping.AsMappedFields());
ICEBERG_RETURN_UNEXPECTED(fields_result);
return AddNewFields(std::move(*fields_result),
-1 /* parent ID for top-level fields */);
}

private:
Result<std::unique_ptr<MappedFields>> VisitFields(const MappedFields& fields) {
// Recursively visit all fields
std::vector<MappedField> field_results;
field_results.reserve(fields.Size());

for (const auto& field : fields.fields()) {
auto field_result = VisitField(field);
ICEBERG_RETURN_UNEXPECTED(field_result);
field_results.push_back(std::move(*field_result));
}

// Build update assignments map for removing reassigned names
std::unordered_map<std::string, int32_t> update_assignments;
std::ranges::for_each(field_results, [&](const auto& field) {
if (field.field_id.has_value()) {
auto update_it = updates_.find(field.field_id.value());
if (update_it != updates_.end()) {
update_assignments.emplace(std::string(update_it->second.name()),
field.field_id.value());
}
}
});

// Remove reassigned names from all fields
for (auto& field : field_results) {
field = RemoveReassignedNames(field, update_assignments);
}

return MappedFields::Make(std::move(field_results));
}

Result<MappedField> VisitField(const MappedField& field) {
// Update this field's names
std::unordered_set<std::string> field_names = field.names;
if (field.field_id.has_value()) {
auto update_it = updates_.find(field.field_id.value());
if (update_it != updates_.end()) {
field_names.insert(std::string(update_it->second.name()));
}
}

std::unique_ptr<MappedFields> nested_mapping = nullptr;
if (field.nested_mapping != nullptr) {
auto nested_result = VisitFields(*field.nested_mapping);
ICEBERG_RETURN_UNEXPECTED(nested_result);
nested_mapping = std::move(*nested_result);
}

// Add a new mapping for any new nested fields
if (field.field_id.has_value()) {
auto nested_result =
AddNewFields(std::move(nested_mapping), field.field_id.value());
ICEBERG_RETURN_UNEXPECTED(nested_result);
nested_mapping = std::move(*nested_result);
}

return MappedField{
.names = std::move(field_names),
.field_id = field.field_id,
.nested_mapping = std::move(nested_mapping),
};
}

Result<std::unique_ptr<MappedFields>> AddNewFields(
std::unique_ptr<MappedFields> mapping, int32_t parent_id) {
auto range = adds_.equal_range(parent_id);
std::vector<const SchemaField*> fields_to_add;
for (auto it = range.first; it != range.second; ++it) {
auto update_it = updates_.find(it->second);
if (update_it != updates_.end()) {
fields_to_add.push_back(&update_it->second);
}
}

if (fields_to_add.empty()) {
return std::move(mapping);
}

std::vector<MappedField> new_fields;
CreateMappingVisitor create_visitor;
for (const auto* field_to_add : fields_to_add) {
auto nested_result = VisitType(
*field_to_add->type(),
[&create_visitor](const auto& type) { return create_visitor.Visit(type); });
ICEBERG_RETURN_UNEXPECTED(nested_result);

new_fields.emplace_back(MappedField{
.names = {std::string(field_to_add->name())},
.field_id = field_to_add->field_id(),
.nested_mapping = std::move(*nested_result),
});
}

if (mapping == nullptr || mapping->Size() == 0) {
return MappedFields::Make(std::move(new_fields));
}

// Build assignments map for removing reassigned names
std::unordered_map<std::string, int32_t> assignments;
for (const auto* field_to_add : fields_to_add) {
assignments.emplace(std::string(field_to_add->name()), field_to_add->field_id());
}

// create a copy of fields that can be updated (append new fields, replace existing
// for reassignment)
std::vector<MappedField> fields;
fields.reserve(mapping->Size() + new_fields.size());
for (const auto& field : mapping->fields()) {
fields.push_back(RemoveReassignedNames(field, assignments));
}

fields.insert(fields.end(), std::make_move_iterator(new_fields.begin()),
std::make_move_iterator(new_fields.end()));

return MappedFields::Make(std::move(fields));
}

static MappedField RemoveReassignedNames(
const MappedField& field,
const std::unordered_map<std::string, int32_t>& assignments) {
std::unordered_set<std::string> updated_names = field.names;
std::erase_if(updated_names, [&](const std::string& name) {
auto assign_it = assignments.find(name);
return assign_it != assignments.end() &&
(!field.field_id.has_value() || assign_it->second != field.field_id.value());
});
return MappedField{
.names = std::move(updated_names),
.field_id = field.field_id,
.nested_mapping = field.nested_mapping,
};
}

const std::map<int32_t, SchemaField>& updates_;
const std::multimap<int32_t, int32_t>& adds_;
};

} // namespace

Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema) {
Expand All @@ -335,4 +487,13 @@ Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema) {
return NameMapping::Make(std::move(*result));
}

Result<std::unique_ptr<NameMapping>> UpdateMapping(
const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
const std::multimap<int32_t, int32_t>& adds) {
UpdateMappingVisitor visitor(updates, adds);
auto result = visitor.VisitMapping(mapping);
ICEBERG_RETURN_UNEXPECTED(result);
return NameMapping::Make(std::move(*result));
}

} // namespace iceberg
9 changes: 4 additions & 5 deletions src/iceberg/name_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once

#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <span>
Expand Down Expand Up @@ -143,16 +144,14 @@ ICEBERG_EXPORT std::string ToString(const NameMapping& mapping);
/// \return A new NameMapping instance initialized with the schema's fields and names.
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema);

/// TODO(gangwu): implement this function once SchemaUpdate is supported
///
/// \brief Update a name-based mapping using changes to a schema.
/// \param mapping a name-based mapping
/// \param updates a map from field ID to updated field definitions
/// \param adds a map from parent field ID to nested fields to be added
/// \return an updated mapping with names added to renamed fields and the mapping extended
/// for new fields
// ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
// const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
// const std::multimap<int32_t, int32_t>& adds);
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
const std::multimap<int32_t, int32_t>& adds);

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ if(ICEBERG_BUILD_BUNDLE)
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
name_mapping_update_test.cc
set_snapshot_test.cc
transaction_test.cc
update_location_test.cc
Expand Down
Loading
Loading