Nunc Fluens

"Nunc fluens facit tempus, nunc stans facit aeternitatem." - Boethius, _The Consolation of Philosophy_
"The value of philosophy is, in fact, to be sought largely in its very uncertainty . . . it keeps alive our sense of wonder by showing familiar things in an unfamiliar aspect." - Bertrand Russell, _The Problems of Philosophy_

Retry Handling with fluent-plugin-elasticsearch

Thanks to @jcantrill et. al. the retry logic in fluent-plugin-elasticsearch has been greatly improved. These changes are in version 1.16.1 and later: https://github.com/uken/fluent-plugin-elasticsearch/releases/tag/v1.16.1

Unique ID per record

All records must be assigned a unique ID. OpenShift logging does it with the new elasticsearch_genid plugin type provided by fluent-plugin-elasticsearch. https://github.com/openshift/origin-aggregated-logging/blob/master/fluentd/configs.d/openshift/filter-post-genid.conf#L3

<filter **>
  @type elasticsearch_genid
  hash_id_key viaq_msg_id
</filter>

You will have to add a section like this to your fluentd config in order to assign a unique ID to each record. The @type elasticsearch plugin config should be changed like this:

@type elasticsearch
...
id_key viaq_msg_id
write_operation 'create'

This tells the plugin to use the generated ID for the _id https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_document_metadata.html#_id metadata field associated with the record. This also tells the plugin to use the create https://www.elastic.co/guide/en/elasticsearch/guide/2.x/create-doc.html operation to add the new record instead of the default index, which will update the record if it exists. In this case, we want Elasticsearch to return 409 so that we will not attempt to resubmit this record.

Retry handling

When fluent-plugin-elasticsearch resubmits a failed record that is a candidate for a retry (e.g. the request returned a 429 for the record), the record is resubmitted back into the fluentd record queue for processing. By default, it is submitted back to the very beginning of processing, and will go back through all of your processing pipeline again. You will usually want to avoid this. There are two ways to route records so that you can control the reprocessing - tagging and/or labeling. You can use either separately or both in conjunction.

Retry tagging

There is a new parameter in fluent-plugin-elasticsearch - retry_tag - https://github.com/uken/fluent-plugin-elasticsearch#retry_tag

@type elasticsearch
...
retry_tag retry_es

It is best to use a tag that isn’t used by anything else and will not match other filters, to avoid having records reprocessed. When records are resubmitted, they will use this tag instead of whatever tag they were using before. Then you can have something like this in your fluentd config:

# handler for initial attempt and retries
<match retry_es **my_initial_tag**>
  @type elasticsearch
  ...
  retry_tag retry_es
  ... other config ...
</match>

This is good for a simple case where you don’t have much processing, and you only have one or two matches that don’t use @type copy. You can’t do this for @type copy because your retried records would be sent again to all of your copy destinations, causing duplicate records at those destinations. In that case, you will need to define a separate @type elasticsearch config to handle only the Elasticsearch retries:

# handler for retries
<match retry_es>
  @type elasticsearch
  ...
  retry_tag retry_es
  ... other config ...
</match>
# handler for initial attempt and copy destinations
<match **my_initial_tag**>
  @type copy
  <store>
    @type elasticsearch
    ...
    retry_tag retry_es
    ... other config ...
  </store>
  <store>
    @type secure_forward
    ...
  </store>
</match>

This is the method used by OpenShift logging as there may be multiple copy destinations specified by the user. For example: https://github.com/openshift/origin-aggregated-logging/blob/master/fluentd/configs.d/openshift/output-operations.conf The configuration of the retry fluent-plugin-elasticsearch should be identical to the initial fluent-plugin-elasticsearch. This is config duplication, so you’ll need to use some other method to keep it DRY e.g. move all of the common config to a separate file and @include it.

Retry labeling

You can also specify a label https://docs.fluentd.org/v0.12/articles/life-of-a-fluentd-event#labels and https://docs.fluentd.org/v0.12/articles/routing-examples#input--%3E-filter--%3E-output-with-label to route records to. To do this, add a @label @LABEL_NAME to your fluent-plugin-elasticsearch config:

@type elasticsearch
...
@label @RETRY_ES

Then add a <label @RETRY_ES> section to your config:

<label @RETRY_ES>
  <match **>
    @type elasticsearch
    ...
    @label @RETRY_ES
  </match>
</label>

With labeling, you can ensure that the retried record is sent directly to the label section, avoiding the rest of your processing pipeline.

You can use labeling and tagging both:

@type elasticsearch
...
@label @RETRY_ES
retry_tag retry_es

Then add a <label @RETRY_ES> section to your config:

<label @RETRY_ES>
  <match retry_es>
    @type elasticsearch
    ...
    @label @RETRY_ES
    retry_tag retry_es
  </match>
</label>

The configuration of the retry fluent-plugin-elasticsearch should be identical to the initial fluent-plugin-elasticsearch. This is config duplication, so you’ll need to use some other method to keep it DRY e.g. move all of the common config to a separate file and @include it.

The parameters max_retry_wait, disable_retry_limit, retry_limit, and retry_wait won’t be as important when using the configuration specified above, because the retries will happen internally to the fluent-plugin-elasticsearch. In a sense, that code path will be avoided. They will still be used in the case where the error is due to connection or network issues e.g. cannot connect to Elasticsearch.

Error record handling

Records that have “hard” errors (schema violations, corrupted data, etc.) that cannot be retried will be sent to the @ERROR handler. Think of this as a “dead letter queue”, for messages that cannot be delivered. If you add a <label @ERROR> section to your fluentd config, you can handle these records how you want. For example:

<label @ERROR>
  <match **>
    @type file
    path /var/log/fluent/dlq
    time_slice_format %Y%m%d
    time_slice_wait 10m
    time_format %Y%m%dT%H%M%S%z
    compress gzip
  </match>
</label>

This will write error records to the dlq file. See https://docs.fluentd.org/v0.12/articles/out_file for more information about the file output.