P
- payload's typepublic abstract class Producer<P>
extends java.lang.Object
Constructor and Description |
---|
Producer() |
Modifier and Type | Method and Description |
---|---|
void |
abortTransaction()
Aborts the ongoing transaction.
|
void |
beginTransaction()
Should be called before the start of each new transaction.
|
void |
close()
Close this producer.
|
void |
close(long timeout,
java.util.concurrent.TimeUnit timeUnit)
This method waits up to
timeout for the producer to complete the sending of all incomplete requests. |
void |
commitTransaction()
Commits the ongoing transaction.
|
void |
flush()
Invoking this method makes all buffered records immediately available to send (even if
linger.ms is
greater than 0) and blocks on the completion of the requests associated with these records. |
java.util.Map<java.lang.String,java.lang.Object> |
getConfiguration()
Gets the configuration to a producer instance
|
void |
initTransactions()
Needs to be called before any other methods when the transactional.id is set in the configuration.
|
java.util.Map<MetricName,? extends org.apache.kafka.common.Metric> |
metrics()
Get the full set of internal metrics maintained by the producer.
|
java.util.List<PartitionInfo> |
partitionsFor(java.lang.String topic)
Get the partition metadata for the give topic.
|
ProducerMetric |
recordBatchSizeAvgMetric()
The maximum size of the batches processed by the connector per clientId.
|
ProducerMetric |
recordBatchSizeMaxMetric()
The maximum size of the batches processed by the connector per clientId.
|
ProducerMetric |
recordByteRatePerTopicMetric(java.lang.String topic)
The average number of bytes sent per second for a topic.
|
ProducerMetric |
recordByteTotalPerTopicMetric(java.lang.String topic)
The total number of bytes sent for a topic.
|
ProducerMetric |
recordErrorRateMetric()
The average per-second number of record sends that resulted in errors per clientId.
|
ProducerMetric |
recordErrorRatePerTopicMetric(java.lang.String topic)
The average per-second number of record sends that resulted in errors for a topic.
|
ProducerMetric |
recordErrorTotalMetric()
The total number of record sends that resulted in errors per clientId.
|
ProducerMetric |
recordErrorTotalPerTopicMetric(java.lang.String topic)
The total number of record sends that resulted in errors for a topic.
|
ProducerMetric |
recordSendRateMetric()
The average number of records sent per second per clientId.
|
ProducerMetric |
recordSendRatePerTopicMetric(java.lang.String topic)
The average number of records sent per second for a topic.
|
ProducerMetric |
recordSendTotalMetric()
The total number of records sent per clientId.
|
ProducerMetric |
recordSendTotalPerTopicMetric(java.lang.String topic)
The total number of records sent for a topic.
|
ProducerMetric |
recordSizeAvgMetric()
The average record size per clientId.
|
ProducerMetric |
recordSizeMaxMetric()
The maximum record size per clientId.
|
void |
send(ProducerRecord record)
Asynchronously send a record to a topic.
|
void |
send(ProducerRecord<P> producerRecord,
Callback callback)
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
|
void |
sendOffsetsToTransaction(java.util.Map<TopicPartition,OffsetAndMetadata> offsets,
java.lang.String consumerGroupId)
Sends a list of specified offsets to the consumer group coordinator, also marks
those offsets as part of the current transaction.
|
void |
setConfiguration(java.util.Map<java.lang.String,java.lang.Object> configuration)
Set configuration to a producer instance
|
public java.util.Map<java.lang.String,java.lang.Object> getConfiguration()
public void send(ProducerRecord record)
send(record, null)
.
See send(ProducerRecord, Callback)
for details.record
- The record to sendjava.lang.IllegalArgumentException
- If record argumet is nullDatabusClientRuntimeException
- If send method fails. The original cause could be any of these exceptions:
SerializationException If the key or value are not valid objects given the configured serializers
BufferExhaustedException If block.on.buffer.full=false
and the buffer is full.
InterruptException If the thread is interrupted while blocked
public void send(ProducerRecord<P> producerRecord, Callback callback)
The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one.
Fully non-blocking usage can make use of the Callback
parameter to provide a callback that
will be invoked when the request is complete.
ProducerRecord record = new ProducerRecord(RoutingData, Headers, Payload);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
following example callback1
is guaranteed to execute before callback2
:
producer.send(new ProducerRecord(RoutingData, Headers, Payload), callback1);
producer.send(new ProducerRecord(RoutingData, Headers, Payload), callback2);
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
they will delay the sending of messages from other threads. If you want to execute blocking or computationally
expensive callbacks it is recommended to use your own Executor
in the callback body
to parallelize processing.
producerRecord
- The non-null record to sendcallback
- A user-supplied callback to execute when the record has been acknowledged by the server (null
indicates no callback)java.lang.IllegalArgumentException
- If record argumet is nullDatabusClientRuntimeException
- If send method fails. The original cause could be any of these exceptions:
SerializationException If the key or value are not valid objects given the configured serializers
BufferExhaustedException If block.on.buffer.full=false
and the buffer is full.
InterruptException If the thread is interrupted while blocked
public void flush()
linger.ms
is
greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
of flush()
is that any previously sent record will have completed
(e.g. Future.isDone() == true
).
A request is considered completed when it is successfully acknowledged
according to the acks
configuration you have specified or else it results in an error.
Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, however no guarantee is made about the completion of records sent after the flush call begins.
This method can be useful when consuming from some input system and producing into Kafka.
The flush()
call
gives a convenient way to ensure all previously sent messages have actually completed.
This example shows how to consume from one Kafka topic and produce to another Kafka topic:
for(ProducerRecord record: consumer.poll(100))
// Create here RoutingData, Headers and Payload objects properly
producer.send(new ProducerRecord(RoutingData, Headers, Payload);
producer.flush();
consumer.commit();
Note that the above example may drop records if the produce request fails.
If we want to ensure that this does not occur
we need to set retries=<large_number>
in our config.
DatabusClientRuntimeException
- If flush method fails. The original cause could be the following exception:
InterruptException If the thread is interrupted while blocked
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
topic
- to get infoPartitionInfo
DatabusClientRuntimeException
- If partitionsFor method fails.
The original cause could be the following exception:
InterruptException If the thread is interrupted while blocked
public java.util.Map<MetricName,? extends org.apache.kafka.common.Metric> metrics()
DatabusClientRuntimeException
- If metrics method fails.public void close()
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.
If close() is called from Callback
, a warning message will be
logged and close(0, TimeUnit.MILLISECONDS)
will be called instead. We do this because the sender thread would otherwise try to join itself and
block forever.
DatabusClientRuntimeException
- If close method fails. The original cause could be the following exception:
InterruptException If the thread is interrupted while blocked
public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)
timeout
for the producer to complete the sending of all incomplete requests.
If the producer is unable to complete all requests before the timeout expires, this method will fail any unsent and unacknowledged records immediately.
If invoked from within a Callback
this method will not block and will be equivalent to
close(0, TimeUnit.MILLISECONDS)
. This is done since no further sending will happen while
blocking the I/O thread of the producer.
timeout
- The maximum time to wait for producer to complete any pending requests. The value should be
non-negative. Specifying a timeout of zero means do not wait for pending send
requests to complete.timeUnit
- The time unit for the timeout
lDatabusClientRuntimeException
- If close method fails. The original cause could be any of these exceptions:
InterruptException If the thread is interrupted while blocked
IllegalArgumentException If the timeout
is negative.
public void setConfiguration(java.util.Map<java.lang.String,java.lang.Object> configuration)
configuration
- A map which contains the configuration to be assignedpublic void initTransactions()
DatabusClientRuntimeException
- If method fails. The original cause could be any of these exceptions:
IllegalStateException if no transactional.id has been configured
org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker does not support transactions (i.e. if its version is lower than 0.11.0.0)
org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured transactional.id is not authorized. See the exception for more details
KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
public void beginTransaction()
initTransactions()
exactly one time.DatabusClientRuntimeException
- If method fails. The original cause could be any of these exceptions:
IllegalStateException if no transactional.id has been configured or if initTransactions()
has not yet been invoked
ProducerFencedException if another producer with the same transactional.id is active
org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker does not support transactions (i.e. if its version is lower than 0.11.0.0)
org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured transactional.id is not authorized. See the exception for more details
KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
public void sendOffsetsToTransaction(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId)
This method should be used when you need to batch consumed and produced messages
together, typically in a consume-transform-produce pattern. Thus, the specified
consumerGroupId
should be the same as config parameter group.id
of the used
consumer
. Note, that the consumer should
have enable.auto.commit=false
and should also not commit offsets manually
(via sync
or
Consumer.commitAsync(OffsetCommitCallback)
commits).
offsets
- offsetsconsumerGroupId
- consumer group idDatabusClientRuntimeException
- If method fails. The original cause could be any of these exceptions:
IllegalStateException if no transactional.id has been configured or no transaction has been started
ProducerFencedException fatal error indicating another producer with the same transactional.id is active
org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker does not support transactions (i.e. if its version is lower than 0.11.0.0)
org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message format used for the offsets topic on the broker does not support transactions
org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured transactional.id is not authorized. See the exception for more details
KafkaException if the producer has encountered a previous fatal or abortable error, or for any other unexpected error
public void commitTransaction()
send(ProducerRecord)
calls which were part of the transaction hit irrecoverable
errors, this method will throw the last received exception immediately and the transaction will not be committed.
So all send(ProducerRecord)
calls in a transaction must succeed in order for this method to succeed.
DatabusClientRuntimeException If method fails. The original cause could be any of these exceptions:
IllegalStateException if no transactional.id has been configured or no transaction has been started
ProducerFencedException fatal error indicating another producer with the same transactional.id is active
org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker does not support transactions (i.e. if its version is lower than 0.11.0.0)
org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured transactional.id is not authorized. See the exception for more details
KafkaException if the producer has encountered a previous fatal or abortable error, or for any other unexpected error
public void abortTransaction()
send(ProducerRecord)
calls failed with a
ProducerFencedException
or an
instance of AuthorizationException
.
DatabusClientRuntimeException If method fails. The original cause could be any of these exceptions:
IllegalStateException if no transactional.id has been configured or no transaction has been started
ProducerFencedException fatal error indicating another producer with the same transactional.id is active
org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker does not support transactions (i.e. if its version is lower than 0.11.0.0)
org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured transactional.id is not authorized. See the exception for more details
KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
public ProducerMetric recordSendTotalMetric()
public ProducerMetric recordSendRateMetric()
public ProducerMetric recordSizeAvgMetric()
public ProducerMetric recordSizeMaxMetric()
public ProducerMetric recordErrorTotalMetric()
public ProducerMetric recordErrorRateMetric()
public ProducerMetric recordBatchSizeMaxMetric()
public ProducerMetric recordBatchSizeAvgMetric()
public ProducerMetric recordSendTotalPerTopicMetric(java.lang.String topic)
topic
- The topic name.public ProducerMetric recordSendRatePerTopicMetric(java.lang.String topic)
topic
- The topic name.public ProducerMetric recordErrorTotalPerTopicMetric(java.lang.String topic)
topic
- The topic name.public ProducerMetric recordErrorRatePerTopicMetric(java.lang.String topic)
topic
- The topic name.public ProducerMetric recordByteTotalPerTopicMetric(java.lang.String topic)
topic
- The topic name.public ProducerMetric recordByteRatePerTopicMetric(java.lang.String topic)
topic
- The topic name.