Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7862234
Initial commit
BewareMyPower Mar 5, 2026
a1746a9
implemented
BewareMyPower Mar 5, 2026
15908f9
fix auth not cleared if it's not set and improve tests
BewareMyPower Mar 5, 2026
8b0899f
refactor and add a getServiceInfo method
BewareMyPower Mar 5, 2026
7ca8136
fix format
BewareMyPower Mar 5, 2026
e70a0a4
use read-write lock instead
BewareMyPower Mar 5, 2026
342505e
pass by value + std::move for better performance
BewareMyPower Mar 5, 2026
477fb94
fix thread safety due to read lock for write operation
BewareMyPower Mar 5, 2026
9114edf
fix stale lookup service might be refered
BewareMyPower Mar 5, 2026
c6de067
fix the reader cannot consume after switching to a new cluster
BewareMyPower Mar 5, 2026
a209112
address comments from copilot
BewareMyPower Mar 5, 2026
fc04394
fix libstdc++ header include error
BewareMyPower Mar 5, 2026
f7e35ea
Use ServiceInfo as the single source of truth for ClientConnection
BewareMyPower Mar 6, 2026
a01842d
revert changes on ClientConfiguration
BewareMyPower Mar 6, 2026
38a6111
simplify implementations
BewareMyPower Mar 6, 2026
792fede
Pass ServiceInfo to lookup service and fix AuthPluginTest.testInvalid…
BewareMyPower Mar 6, 2026
529fa96
remove ambiguous methods from ClientConfiguration
BewareMyPower Mar 6, 2026
4adb04a
replace the updateServiceInfo method with the ServiceInfoProvider int…
BewareMyPower Mar 6, 2026
a238057
revert unnecessary change
BewareMyPower Mar 6, 2026
d8c5b27
fix perf build
BewareMyPower Mar 6, 2026
e5f9cca
fix clang-tidy checks and remove Client from the ServiceInfoProvider …
BewareMyPower Mar 7, 2026
cdb2128
fix LookupServiceTest.testAfterClientShutdown
BewareMyPower Mar 8, 2026
b2bc7d7
fix other tests
BewareMyPower Mar 9, 2026
b91860a
revert unnecessary changes
BewareMyPower Mar 9, 2026
1144c27
clean up implementation
BewareMyPower Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
#include <pulsar/Reader.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/ServiceInfo.h>
#include <pulsar/ServiceInfoProvider.h>
#include <pulsar/TableView.h>
#include <pulsar/defines.h>

#include <memory>
#include <string>

namespace pulsar {
Expand Down Expand Up @@ -68,6 +71,19 @@ class PULSAR_PUBLIC Client {
*/
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);

/**
* Create a Pulsar client object using the specified ServiceInfoProvider.
*
* The ServiceInfoProvider is responsible for providing the service information (such as service URL)
* dynamically. For example, if it detects a primary Pulsar service is down, it can switch to a secondary
* service and update the client with the new service information.
*
* When `close` is called, the client will call `ServiceInfoProvider::close` to guarantee the lifetime of
* the provider is properly managed.
*/
static Client create(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
const ClientConfiguration& clientConfiguration);

/**
* Create a producer with default configuration
*
Expand Down Expand Up @@ -414,6 +430,13 @@ class PULSAR_PUBLIC Client {
void getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback);

/**
* Get the current service information of the client.
*
* @return the current service information
*/
ServiceInfo getServiceInfo() const;

private:
Client(const std::shared_ptr<ClientImpl>&);

Expand Down
29 changes: 5 additions & 24 deletions include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,12 @@ class PULSAR_PUBLIC ClientConfiguration {
/**
* Set the authentication method to be used with the broker
*
* You can get the configured authentication data in `ServiceInfo` returned by `Client::getServiceInfo`.
*
* @param authentication the authentication data to use
*/
ClientConfiguration& setAuth(const AuthenticationPtr& authentication);

/**
* @return the authentication data
*/
Authentication& getAuth() const;

/**
* Set timeout on client operations (subscribe, create producer, close, unsubscribe)
* Default is 30 seconds.
Expand Down Expand Up @@ -202,20 +199,6 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
ClientConfiguration& setLogger(LoggerFactory* loggerFactory);

/**
* Configure whether to use the TLS encryption on the connections.
*
* The default value is false.
*
* @param useTls
*/
ClientConfiguration& setUseTls(bool useTls);

/**
* @return whether the TLS encryption is used on the connections
*/
bool isUseTls() const;

/**
* Set the path to the TLS private key file.
*
Expand Down Expand Up @@ -243,15 +226,13 @@ class PULSAR_PUBLIC ClientConfiguration {
/**
* Set the path to the trusted TLS certificate file.
*
* You can get the configured trusted TLS certificate file path in `ServiceInfo` returned by
* `Client::getServiceInfo`.
*
* @param tlsTrustCertsFilePath
*/
ClientConfiguration& setTlsTrustCertsFilePath(const std::string& tlsTrustCertsFilePath);

/**
* @return the path to the trusted TLS certificate file
*/
const std::string& getTlsTrustCertsFilePath() const;

/**
* Configure whether the Pulsar client accepts untrusted TLS certificates from brokers.
*
Expand Down
57 changes: 57 additions & 0 deletions include/pulsar/ServiceInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_SERVICE_INFO_H_
#define PULSAR_SERVICE_INFO_H_

#include <pulsar/Authentication.h>

#include <optional>
#include <string>

namespace pulsar {

/**
* ServiceInfo encapsulates the information of a Pulsar service, which is used by the client to connect to the
* service. It includes the service URL, authentication information, and TLS configuration.
*/
class PULSAR_PUBLIC ServiceInfo final {
public:
ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = AuthFactory::Disabled(),
std::optional<std::string> tlsTrustCertsFilePath = std::nullopt);

auto& serviceUrl() const noexcept { return serviceUrl_; }
auto useTls() const noexcept { return useTls_; }
auto& authentication() const noexcept { return authentication_; }
auto& tlsTrustCertsFilePath() const noexcept { return tlsTrustCertsFilePath_; }

bool operator==(const ServiceInfo& other) const noexcept {
return serviceUrl_ == other.serviceUrl_ && useTls_ == other.useTls_ &&
authentication_ == other.authentication_ &&
tlsTrustCertsFilePath_ == other.tlsTrustCertsFilePath_;
}

private:
std::string serviceUrl_;
bool useTls_;
AuthenticationPtr authentication_;
std::optional<std::string> tlsTrustCertsFilePath_;
};

} // namespace pulsar
#endif
56 changes: 56 additions & 0 deletions include/pulsar/ServiceInfoProvider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_
#define PULSAR_SERVICE_INFO_PROVIDER_H_

#include <pulsar/ClientConfiguration.h>
#include <pulsar/ServiceInfo.h>

namespace pulsar {

class PULSAR_PUBLIC ServiceInfoProvider {
public:
/**
* The destructor will be called when `Client::close()` is invoked, and the provider should stop any
* ongoing work and release the resources in the destructor.
*/
virtual ~ServiceInfoProvider() = default;

/**
* Get the initial `ServiceInfo` connection for the client.
* This method is called **only once** internally in `Client::create()` to get the initial `ServiceInfo`
* for the client to connect to the Pulsar service. Since it's only called once, it's legal to return a
* moved `ServiceInfo` object to avoid unnecessary copying.
*/
virtual ServiceInfo initialServiceInfo() = 0;

/**
* Initialize the ServiceInfoProvider.
*
* @param onServiceInfoUpdate the callback to update `client` with the new `ServiceInfo`
*
* Note: the implementation is responsible to invoke `onServiceInfoUpdate` at least once to provide the
* initial `ServiceInfo` for the client.
*/
virtual void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) = 0;
};

}; // namespace pulsar

#endif
7 changes: 0 additions & 7 deletions include/pulsar/c/client_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,9 @@ PULSAR_PUBLIC void pulsar_client_configuration_set_logger(pulsar_client_configur
PULSAR_PUBLIC void pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *conf,
pulsar_logger_t logger);

PULSAR_PUBLIC void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls);

PULSAR_PUBLIC int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf);

PULSAR_PUBLIC void pulsar_client_configuration_set_tls_trust_certs_file_path(
pulsar_client_configuration_t *conf, const char *tlsTrustCertsFilePath);

PULSAR_PUBLIC const char *pulsar_client_configuration_get_tls_trust_certs_file_path(
pulsar_client_configuration_t *conf);

PULSAR_PUBLIC void pulsar_client_configuration_set_tls_allow_insecure_connection(
pulsar_client_configuration_t *conf, int allowInsecure);

Expand Down
41 changes: 41 additions & 0 deletions lib/AtomicSharedPtr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <memory>
namespace pulsar {

// C++17 does not have std::atomic<std::shared_ptr<T>>, so we have to manually implement it.
template <typename T>
class AtomicSharedPtr {
public:
using Pointer = std::shared_ptr<const T>;

AtomicSharedPtr() = default;
explicit AtomicSharedPtr(T value) : ptr_(std::make_shared<const T>(std::move(value))) {}

auto load() const { return std::atomic_load(&ptr_); }

void store(Pointer&& newPtr) { std::atomic_store(&ptr_, std::move(newPtr)); }

private:
std::shared_ptr<const T> ptr_;
};

} // namespace pulsar
5 changes: 3 additions & 2 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pulsar/Authentication.h>
#include <pulsar/ClientConfiguration.h>
#include <pulsar/Schema.h>
#include <pulsar/ServiceInfo.h>

#include <mutex>

Expand All @@ -38,9 +39,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool,
BinaryProtoLookupService(const ServiceInfo& serviceInfo, ConnectionPool& pool,
const ClientConfiguration& clientConfiguration)
: serviceNameResolver_(serviceUrl),
: serviceNameResolver_(serviceInfo.serviceUrl()),
cnxPool_(pool),
listenerName_(clientConfiguration.getListenerName()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
Expand Down
19 changes: 13 additions & 6 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/
#include <pulsar/Client.h>
#include <pulsar/ServiceInfoProvider.h>

#include <iostream>
#include <memory>
Expand All @@ -33,13 +34,17 @@ DECLARE_LOG_OBJECT()

namespace pulsar {

Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) {}
Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) { impl_->initialize(); }

Client::Client(const std::string& serviceUrl)
: impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration())) {}
Client::Client(const std::string& serviceUrl) : Client(serviceUrl, ClientConfiguration()) {}

Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
: impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
: Client(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}

Client Client::create(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
const ClientConfiguration& clientConfiguration) {
return Client(std::make_shared<ClientImpl>(std::move(serviceInfoProvider), clientConfiguration));
}

Result Client::createProducer(const std::string& topic, Producer& producer) {
return createProducer(topic, ProducerConfiguration(), producer);
Expand Down Expand Up @@ -193,8 +198,10 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers();

void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback) {
impl_->getLookup()
->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
.addListener(std::move(callback));
}

ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }

} // namespace pulsar
13 changes: 0 additions & 13 deletions lib/ClientConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authe
return *this;
}

Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; }

const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; }

ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) {
Expand Down Expand Up @@ -94,13 +92,6 @@ ClientConfiguration& ClientConfiguration::setMessageListenerThreads(int threads)

int ClientConfiguration::getMessageListenerThreads() const { return impl_->messageListenerThreads; }

ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) {
impl_->useTls = useTls;
return *this;
}

bool ClientConfiguration::isUseTls() const { return impl_->useTls; }

ClientConfiguration& ClientConfiguration::setValidateHostName(bool validateHostName) {
impl_->validateHostName = validateHostName;
return *this;
Expand Down Expand Up @@ -131,10 +122,6 @@ ClientConfiguration& ClientConfiguration::setTlsTrustCertsFilePath(const std::st
return *this;
}

const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const {
return impl_->tlsTrustCertsFilePath;
}

ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool allowInsecure) {
impl_->tlsAllowInsecureConnection = allowInsecure;
return *this;
Expand Down
7 changes: 7 additions & 0 deletions lib/ClientConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#define LIB_CLIENTCONFIGURATIONIMPL_H_

#include <pulsar/ClientConfiguration.h>
#include <pulsar/ServiceInfo.h>

#include <chrono>
#include <optional>

namespace pulsar {

Expand Down Expand Up @@ -53,6 +55,11 @@ struct ClientConfigurationImpl {
ClientConfiguration::ProxyProtocol proxyProtocol;

std::unique_ptr<LoggerFactory> takeLogger() { return std::move(loggerFactory); }

ServiceInfo toServiceInfo(const std::string& serviceUrl) const {
return {serviceUrl, authenticationPtr,
tlsTrustCertsFilePath.empty() ? std::nullopt : std::make_optional(tlsTrustCertsFilePath)};
}
};
} // namespace pulsar

Expand Down
Loading
Loading