Ingest Node (узел поглощения данных)

Узел поглощения данных (Ingest Node) - легковесное решение для предварительной обработки и дополнения документов в Elasticsearch до индексирования.

Если настройки узла elasticsearch не менялись, то по умолчания каждая узел (node) является ingest node. Настроить является ли узел ingest node можно в файле конфигурации elasticsearch.yml. Свойство node.ingest может принимать значения false или true, по умолчанию true.

Ingest Node используется для предварительной обработки документов до момента их фактического индексирования. Благодаря этому мы можем выполнять обработку сырых логов и дополнять их внутри Elasticsearch, минуя Logstash.

Обработка данных происходит путем перехвата составных и индексирующих запросов, применения трансформаций данных (используя pipelines) и отправки документов обратно к API, который уже кладёт измененные данные в index.

Для предварительной обработки данных перед индексированием необходимо указать контейнер (pipeline).

POST my-index-01/my_type?pipeline=my_pipeline_name {  }

Pipeline (конвейер)

Как мы уже поняли ранее именно Pipeline отвечает за саму модификацию данных. Для этого внутри Pipeline используются процессоры (processors), как множественные, так и одинарные. Список всех processors вы может найти по ссылке.

Перед использованием Pipeline вы сперва должны его зарегистрировать. Он регистрируется внутри кластера и соответственно любой узел кластера может его использовать.

PUT _ingest/pipeline/my_pipeline_name
{
  "description": "my pipline's description",
  "processors": [ 
    #one ore more processors
  ],
  "on_failure": [
    #processors which will be exucuted onfailure
  ]
}

Для примера рассмотрим процессор set, который добавляет новое поле в документ или меняет его значение, если значение поля было задано ранее.

PUT _ingest/pipeline/add_new_test_filed
{
  "description": "add test field",
  "processors": [
    { 
      "set": {
        "field": "test_field",
        "value": 10
      }  
    }
  ]  
}

Для того, чтобы проверить как наш Pipeline отрабатывает можно воспользоваться _simulate.

POST _ingest/pipeline/add_new_test_filed/_simulate
{
  "docs": [
    {
      "_source": {
        "title": "The great time",
        "author": "Bobik Bob"
      }
    }
  ]       
}

В выводе получим:

		"_source" : {
          "test_field" : 10,
          "title" : "The great time",
          "author" : "Bobik Bob"
        }

Вы также можете использовать Pipeline и при вызове API _update_by_query, синтаксис выглядит так POST my-index-01/_update_by_query?pipeline=my_pipeline_name.

Вы также можете установить Pipeline по умолчанию для индекса. Это означает, что любой документ, который будет проиндексирован в Elasticsearch, будет автоматически применен к этому Pipeline.

PUT my-index-01
{
  "settings": {
    "default_pipeline": "my_pipeline_name"
  }
}

Также естественно вы можете использовать PIpeline и при переиндексации. Каждый документ, который будет переиндексирован, пройдет через указанный PIpeline.

POST _reindex
{
   "source":{
      "index":"my-index-01"
   },
   "dest":{
      "index":"my-index-02",
      "pipeline": "my_pipeline_name"
   }
}

Ну и наконец вы можете использовать конкретный PIpeline при создании какого-либо документа.

PUT my-index-01/_doc/1?pipeline=my_pipeline_name
{
  "title": "The great time",
  "author": "Bobik Bob"
}

Использование PIpeline в Processors

Вы также можете воспользоваться возможностью указать вместо processor другой существующий PIpeline.

PUT _ingest/pipeline/add_new_test_filed_2
{
  "processors": [
    { 
      "pipeline": {
        "name": "add_new_test_filed"
      }  
    }
  ]  
}

В итоге

  • Ingest node предоставляет возможность предварительной обработки документа перед его индексированием
  • PIpeline - это набор процессоров, которые выполняются Ingest node при индексировании документа.
  • Для проверки работы PIpeline можно воспользоваться _simulate
  • Если указаны несколько Processor их выполнение будет производится по очереди