|
Modern C++ Kafka API
|
Configuration for the Kafka Producer. More...
#include <ProducerConfig.h>
Public Member Functions | |
| ProducerConfig (const ProducerConfig &)=default | |
| ProducerConfig (const PropertiesMap &kvMap) | |
Public Member Functions inherited from KAFKA_API::clients::Config | |
| Config (const Config &)=default | |
| Config (const PropertiesMap &kvMap) | |
Public Member Functions inherited from KAFKA_API::Properties | |
| Properties (const Properties &)=default | |
| Properties (PropertiesMap kvMap) | |
| bool | operator== (const Properties &rhs) const |
| template<class T > | |
| Properties & | put (const std::string &key, const T &value) |
| Set a property. More... | |
| void | remove (const std::string &key) |
| Remove the property (if one exists). | |
| bool | contains (const std::string &key) const |
| Check whether the map contains a property. | |
| template<class T > | |
| T & | get (const std::string &key) const |
| Get a property reference. More... | |
| Optional< std::string > | getProperty (const std::string &key) const |
| Get a property. | |
| void | eraseProperty (const std::string &key) |
| Remove a property. | |
| std::string | toString () const |
| const PropertiesMap & | map () const |
| Get all properties with a map. | |
Static Public Attributes | |
| static constexpr const char * | ACKS = "acks" |
| The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful. More... | |
| static constexpr const char * | QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages" |
| Maximum number of messages allowed on the producer queue. More... | |
| static constexpr const char * | QUEUE_BUFFERING_MAX_KBYTES = "queue.buffering.max.kbytes" |
| Maximum total message size sum allowed on the producer queue. More... | |
| static constexpr const char * | LINGER_MS = "linger.ms" |
| Delay in milliseconds to wait for messages in the producer queue, to accumulate before constructing messages batches to transmit to brokers. More... | |
| static constexpr const char * | BATCH_NUM_MESSAGES = "batch.num.messages" |
| Maximum number of messages batched in one messageSet. More... | |
| static constexpr const char * | BATCH_SIZE = "batch.size" |
| Maximum size (in bytes) of all messages batched in one MessageSet (including protocol framing overhead). More... | |
| static constexpr const char * | MESSAGE_MAX_BYTES = "message.max.bytes" |
| Maximum Kafka protocol request message size. More... | |
| static constexpr const char * | MESSAGE_TIMEOUT_MS = "message.timeout.ms" |
| This value is enforced locally and limits the time a produced message waits for successful delivery. More... | |
| static constexpr const char * | REQUEST_TIMEOUT_MS = "request.timeout.ms" |
This value is only enforced by the brokers and relies on ACKS being non-zero. More... | |
| static constexpr const char * | PARTITIONER = "partitioner" |
| The default partitioner for a ProducerRecord (with no partition assigned). More... | |
| static constexpr const char * | MAX_IN_FLIGHT = "max.in.flight" |
| Maximum number of in-flight requests per broker connection. More... | |
| static constexpr const char * | ENABLE_IDEMPOTENCE = "enable.idempotence" |
When set to true, the producer will ensure that messages are succefully sent exactly once and in the original order. More... | |
| static constexpr const char * | TRANSACTIONAL_ID = "transactional.id" |
| It's used to identify the same transactional producer instance across process restarts. | |
| static constexpr const char * | TRANSACTION_TIMEOUT_MS = "transaction.timeout.ms" |
| Th maximus amount of time in milliseconds that the transaction coordinator will wait for a trnsaction status update from the producer before proactively ablrting the ongoing transaction. More... | |
Static Public Attributes inherited from KAFKA_API::clients::Config | |
| static constexpr const char * | ENABLE_MANUAL_EVENTS_POLL = "enable.manual.events.poll" |
| To poll the events manually (otherwise, it would be done with a background polling thread). More... | |
| static constexpr const char * | LOG_CB = "log_cb" |
| Log callback. More... | |
| static constexpr const char * | ERROR_CB = "error_cb" |
| Log callback. More... | |
| static constexpr const char * | STATS_CB = "stats_cb" |
| Statistics callback. More... | |
| static constexpr const char * | OAUTHBEARER_TOKEN_REFRESH_CB = "oauthbearer_token_refresh_cb" |
| OAUTHBEARER token refresh callback. More... | |
| static constexpr const char * | INTERCEPTORS = "interceptors" |
| Interceptors for thread start/exit, brokers' state change, etc. More... | |
| static constexpr const char * | BOOTSTRAP_SERVERS = "bootstrap.servers" |
| The string contains host:port pairs of brokers (splitted by ",") that the consumer will use to establish initial connection to the Kafka cluster. More... | |
| static constexpr const char * | CLIENT_ID = "client.id" |
| Client identifier. | |
| static constexpr const char * | LOG_LEVEL = "log_level" |
| Log level (syslog(3) levels). | |
| static constexpr const char * | SOCKET_TIMEOUT_MS = "socket.timeout.ms" |
| Timeout for network requests. More... | |
| static constexpr const char * | SECURITY_PROTOCOL = "security.protocol" |
| Protocol used to communicate with brokers. More... | |
| static constexpr const char * | SASL_MECHANISM = "sasl.mechanisms" |
| SASL mechanism to use for authentication. More... | |
| static constexpr const char * | SASL_USERNAME = "sasl.username" |
| SASL username for use with the PLAIN and SASL-SCRAM-. More... | |
| static constexpr const char * | SASL_PASSWORD = "sasl.password" |
| SASL password for use with the PLAIN and SASL-SCRAM-. More... | |
| static constexpr const char * | SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd" |
| Shell command to refresh or acquire the client's Kerberos ticket. | |
| static constexpr const char * | SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name" |
| The client's Kerberos principal name. | |
| static constexpr const char * | SASL_OAUTHBEARER_METHOD = "sasl.oauthbearer.method" |
| Set to "default" or "oidc" to control with login method to be used. More... | |
| static constexpr const char * | SASL_OAUTHBEARER_CLIENT_ID = "sasl.oauthbearer.client.id" |
| Public identifier for the applicaition. More... | |
| static constexpr const char * | SASL_OAUTHBEARER_CLIENT_SECRET = "sasl.oauthbearer.client.secret" |
| Client secret only known to the application and the authorization server. More... | |
| static constexpr const char * | SASL_OAUTHBEARER_EXTENSIONS = "sasl.oauthbearer.extensions" |
| Allow additional information to be provided to the broker. More... | |
| static constexpr const char * | SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope" |
| Client use this to specify the scope of the access request to the broker. More... | |
| static constexpr const char * | SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url" |
| OAuth/OIDC issuer token endpoint HTTP(S) URI used to retreve token. More... | |
| static constexpr const char * | SASL_OAUTHBEARER_CONFIG = "sasl.oauthbearer.config" |
| SASL/OAUTHBEARER configuration. More... | |
| static constexpr const char * | ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = "enable.sasl.oauthbearer.unsecure.jwt" |
| Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set. More... | |
Additional Inherited Members | |
Public Types inherited from KAFKA_API::Properties | |
| using | PropertiesMap = std::map< std::string, ValueType > |
Configuration for the Kafka Producer.
|
staticconstexpr |
The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful.
1) acks=0, the producer will not wait for a reply from the broker before assuming the message was sent successfully. 2) acks=1, the producer will receive a success response from the broker the moment the leader replica received the message. 3) acks=all, the producer will receive a success response from the broker once all in-sync replicas received the message. Note: if "ack=all", please make sure the topic's replication factor be larger than 1. That means, if the topic is automaticly created by producer's send, the default.replication.factor property for the kafka server should be larger than 1. The "ack=all" property is mandatory for reliability requirements, but would increase the ack latency and impact the throughput. Default value: all
|
staticconstexpr |
Maximum number of messages batched in one messageSet.
The total MessageSet size is also limited by MESSAGE_MAX_BYTES. Default value: 10000
|
staticconstexpr |
Maximum size (in bytes) of all messages batched in one MessageSet (including protocol framing overhead).
Default value: 1000000
|
staticconstexpr |
When set to true, the producer will ensure that messages are succefully sent exactly once and in the original order.
Default value: false
|
staticconstexpr |
Delay in milliseconds to wait for messages in the producer queue, to accumulate before constructing messages batches to transmit to brokers.
Default value: 0 (KafkaSyncProducer); 0.5 (KafkaAsyncProducer)
|
staticconstexpr |
Maximum number of in-flight requests per broker connection.
Default value: 1000000 (while enable.idempotence=false); 5 (while enable.idempotence=true)
|
staticconstexpr |
Maximum Kafka protocol request message size.
Note: Should be coordinated with the brokers's configuration. Otherwise, any larger message would be rejected! Default value: 1000000
|
staticconstexpr |
This value is enforced locally and limits the time a produced message waits for successful delivery.
Note: If failed to get the ack within this limit, an exception would be thrown (in SyncProducer.send()), or an error code would be passed into the delivery callback (AsyncProducer). Default value: 300000
|
staticconstexpr |
The default partitioner for a ProducerRecord (with no partition assigned).
Note: It's not the same with Java version's "partitioner.class" property Available options: 1) random – random distribution 2) consistent – CRC32 hash of key (ProducerRecords with empty/null key are mapped to single partition) 3) consistent_random – CRC32 hash of key (ProducerRecords with empty/null key are randomly partitioned) 4) murmur2 – Java Producer compatible Murmur2 hash of key (ProducerRecords with null key are mapped to single partition) 5) murmur2_random – Java Producer compatible Murmur2 hash of key (ProducerRecords with null key are randomly partitioned. It's equivalent to the Java Producer's default partitioner) 6) fnv1a – FNV-1a hash of key (ProducerRecords with null key are mapped to single partition) 7) fnv1a_random – FNV-1a hash of key (ProducerRecords with null key are randomly partitioned) Default value: murmur2_random
|
staticconstexpr |
Maximum total message size sum allowed on the producer queue.
Default value: 0x100000 (1GB)
|
staticconstexpr |
Maximum number of messages allowed on the producer queue.
Default value: 100000
|
staticconstexpr |
This value is only enforced by the brokers and relies on ACKS being non-zero.
Note: The leading broker waits for in-sync replicas to acknowledge the message, and will return an error if the time elapses without the necessary acks. Default value: 5000
|
staticconstexpr |
Th maximus amount of time in milliseconds that the transaction coordinator will wait for a trnsaction status update from the producer before proactively ablrting the ongoing transaction.
Default value: 60000