Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions src/iceberg/catalog/rest/auth/auth_managers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

#include <algorithm>
#include <cctype>
#include <unordered_set>

#include "iceberg/catalog/rest/auth/auth_properties.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
#include "iceberg/util/string_util.h"

namespace iceberg::rest::auth {
Expand All @@ -33,6 +35,17 @@ namespace {
using AuthManagerRegistry =
std::unordered_map<std::string, AuthManagerFactory, StringHash, StringEqual>;

/// \brief Known authentication types that are defined in the Iceberg spec.
const std::unordered_set<std::string, StringHash, StringEqual>& KnownAuthTypes() {
static const std::unordered_set<std::string, StringHash, StringEqual> types = {
AuthProperties::kAuthTypeNone,
AuthProperties::kAuthTypeBasic,
AuthProperties::kAuthTypeOAuth2,
AuthProperties::kAuthTypeSigV4,
};
return types;
}

// Infer the authentication type from properties.
std::string InferAuthType(
const std::unordered_map<std::string, std::string>& properties) {
Expand All @@ -51,9 +64,29 @@ std::string InferAuthType(
return AuthProperties::kAuthTypeNone;
}

/// \brief Authentication manager that performs no authentication.
class NoopAuthManager : public AuthManager {
public:
Result<std::shared_ptr<AuthSession>> CatalogSession(
[[maybe_unused]] HttpClient& shared_client,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties)
override {
return AuthSession::MakeDefault({});
}
};

// Get the global registry of auth manager factories.
AuthManagerRegistry& GetRegistry() {
static AuthManagerRegistry registry;
static AuthManagerRegistry registry = [] {
AuthManagerRegistry r;
r[AuthProperties::kAuthTypeNone] =
[]([[maybe_unused]] std::string_view name,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& props)
-> Result<std::unique_ptr<AuthManager>> {
return std::make_unique<NoopAuthManager>();
};
return r;
}();
return registry;
}

Expand All @@ -71,8 +104,10 @@ Result<std::unique_ptr<AuthManager>> AuthManagers::Load(
auto& registry = GetRegistry();
auto it = registry.find(auth_type);
if (it == registry.end()) {
// TODO(Li Shuxu): Fallback to default auth manager implementations
return NotImplemented("Authentication type '{}' is not supported", auth_type);
if (KnownAuthTypes().contains(auth_type)) {
return NotImplemented("Authentication type '{}' is not yet supported", auth_type);
}
return InvalidArgument("Unknown authentication type: '{}'", auth_type);
}

return it->second(name, properties);
Expand Down
108 changes: 82 additions & 26 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

#include <nlohmann/json.hpp>

#include "iceberg/catalog/rest/auth/auth_manager.h"
#include "iceberg/catalog/rest/auth/auth_managers.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/catalog/rest/constant.h"
#include "iceberg/catalog/rest/endpoint.h"
Expand Down Expand Up @@ -65,13 +68,19 @@ std::unordered_set<Endpoint> GetDefaultEndpoints() {
};
}

/// \brief Fetch server config and merge it with client config
Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,
const RestCatalogProperties& current_config) {
/// \brief Fetch server configuration from the REST catalog server.
Result<CatalogConfig> FetchServerConfig(
const ResourcePaths& paths, const RestCatalogProperties& current_config,
const std::shared_ptr<auth::AuthSession>& session) {
ICEBERG_ASSIGN_OR_RAISE(auto config_path, paths.Config());
HttpClient client(current_config.ExtractHeaders());

// Get authentication headers
std::unordered_map<std::string, std::string> auth_headers;
ICEBERG_RETURN_UNEXPECTED(session->Authenticate(auth_headers));

ICEBERG_ASSIGN_OR_RAISE(const auto response,
client.Get(config_path, /*params=*/{}, /*headers=*/{},
client.Get(config_path, /*params=*/{}, auth_headers,
*DefaultErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
return CatalogConfigFromJson(json);
Expand Down Expand Up @@ -114,10 +123,21 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
if (!file_io) {
return InvalidArgument("FileIO is required to create RestCatalog");
}

std::string catalog_name = config.Get(RestCatalogProperties::kName);
ICEBERG_ASSIGN_OR_RAISE(auto auth_manager,
auth::AuthManagers::Load(catalog_name, config.configs()));

ICEBERG_ASSIGN_OR_RAISE(
auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)),
config.Get(RestCatalogProperties::kPrefix)));
ICEBERG_ASSIGN_OR_RAISE(auto server_config, FetchServerConfig(*paths, config));

// Create init session for fetching server configuration
HttpClient init_client(config.ExtractHeaders());
ICEBERG_ASSIGN_OR_RAISE(auto init_session,
auth_manager->InitSession(init_client, config.configs()));
ICEBERG_ASSIGN_OR_RAISE(auto server_config,
FetchServerConfig(*paths, config, init_session));

std::unique_ptr<RestCatalogProperties> final_config = RestCatalogProperties::FromMap(
MergeConfigs(server_config.defaults, config.configs(), server_config.overrides));
Expand All @@ -139,27 +159,43 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
final_config->Get(RestCatalogProperties::kPrefix)));

return std::shared_ptr<RestCatalog>(
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),
std::move(endpoints)));
auto client = std::make_unique<HttpClient>(final_config->ExtractHeaders());
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
auth_manager->CatalogSession(*client, final_config->configs()));
return std::shared_ptr<RestCatalog>(new RestCatalog(
std::move(final_config), std::move(file_io), std::move(client), std::move(paths),
std::move(endpoints), std::move(auth_manager), std::move(catalog_session)));
}

RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::shared_ptr<FileIO> file_io,
std::unique_ptr<HttpClient> client,
std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints)
std::unordered_set<Endpoint> endpoints,
std::unique_ptr<auth::AuthManager> auth_manager,
std::shared_ptr<auth::AuthSession> catalog_session)
: config_(std::move(config)),
file_io_(std::move(file_io)),
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
client_(std::move(client)),
paths_(std::move(paths)),
name_(config_->Get(RestCatalogProperties::kName)),
supported_endpoints_(std::move(endpoints)) {}
supported_endpoints_(std::move(endpoints)),
auth_manager_(std::move(auth_manager)),
catalog_session_(std::move(catalog_session)) {}

std::string_view RestCatalog::name() const { return name_; }

Result<std::unordered_map<std::string, std::string>> RestCatalog::AuthHeaders() const {
std::unordered_map<std::string, std::string> headers;
ICEBERG_RETURN_UNEXPECTED(catalog_session_->Authenticate(headers));
return headers;
}

Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

std::vector<Namespace> result;
std::string next_token;
while (true) {
Expand All @@ -172,7 +208,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, params, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
client_->Get(path, params, auth_headers, *NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListNamespacesResponseFromJson(json));
result.insert(result.end(), list_response.namespaces.begin(),
Expand All @@ -189,10 +225,12 @@ Status RestCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

CreateNamespaceRequest request{.namespace_ = ns, .properties = properties};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(path, json_request, /*headers=*/{},
client_->Post(path, json_request, auth_headers,
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto create_response, CreateNamespaceResponseFromJson(json));
Expand All @@ -203,8 +241,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::GetNamespaceProperties());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{},
client_->Get(path, /*params=*/{}, auth_headers,
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto get_response, GetNamespaceResponseFromJson(json));
Expand All @@ -214,8 +254,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
Status RestCatalog::DropNamespace(const Namespace& ns) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DropNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Delete(path, /*params=*/{}, /*headers=*/{},
client_->Delete(path, /*params=*/{}, auth_headers,
*DropNamespaceErrorHandler::Instance()));
return {};
}
Expand All @@ -227,21 +269,25 @@ Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

return CaptureNoSuchNamespace(
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
client_->Head(path, auth_headers, *NamespaceErrorHandler::Instance()));
}

Status RestCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->NamespaceProperties(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

UpdateNamespacePropertiesRequest request{
.removals = std::vector<std::string>(removals.begin(), removals.end()),
.updates = updates};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(path, json_request, /*headers=*/{},
client_->Post(path, json_request, auth_headers,
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto update_response,
Expand All @@ -251,8 +297,9 @@ Status RestCatalog::UpdateNamespaceProperties(

Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListTables());

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

std::vector<TableIdentifier> result;
std::string next_token;
while (true) {
Expand All @@ -262,7 +309,7 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Get(path, params, auth_headers, *TableErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListTablesResponseFromJson(json));
result.insert(result.end(), list_response.identifiers.begin(),
Expand All @@ -282,6 +329,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
const std::unordered_map<std::string, std::string>& properties, bool stage_create) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

CreateTableRequest request{
.name = identifier.name,
Expand All @@ -296,7 +344,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
return LoadTableResultFromJson(json);
Expand All @@ -320,6 +368,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

CommitTableRequest request{.identifier = identifier};
request.requirements.reserve(requirements.size());
Expand All @@ -334,7 +383,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto commit_response, CommitTableResponseFromJson(json));
Expand Down Expand Up @@ -363,14 +412,15 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DeleteTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

std::unordered_map<std::string, std::string> params;
if (purge) {
params["purgeRequested"] = "true";
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Delete(path, params, auth_headers, *TableErrorHandler::Instance()));
return {};
}

Expand All @@ -381,19 +431,22 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

return CaptureNoSuchTable(
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Head(path, auth_headers, *TableErrorHandler::Instance()));
}

Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RenameTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Rename());
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

RenameTableRequest request{.source = from, .destination = to};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

return {};
}
Expand All @@ -402,9 +455,11 @@ Result<std::string> RestCatalog::LoadTableInternal(
const TableIdentifier& identifier) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Get(path, /*params=*/{}, auth_headers, *TableErrorHandler::Instance()));
return response.body();
}

Expand All @@ -422,6 +477,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

RegisterTableRequest request{
.name = identifier.name,
Expand All @@ -431,7 +487,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
Expand Down
Loading
Loading