Loading

Edit ingest pipelines

In most instances, before you ingest data into the Elastic Stack, the data needs to be manipulated. For example, you should parse your logs into structured data before ingestion. To do so, integrations use ingest pipelines.

Admonition

Ingest pipelines let you perform common transformations on your data before indexing. For example, you can use pipelines to remove fields, extract values from text, and enrich your data.

A pipeline consists of a series of configurable tasks called processors. Each processor runs sequentially, making specific changes to incoming documents. After the processors have run, Elasticsearch adds the transformed documents to your data stream or index.

Learn more in the ingest pipeline reference.

Ingest pipelines are defined in the elasticsearch/ingest_pipeline directory. They only apply to the parent data stream within which they live. For our example, this would be the apache.access dataset.

For example, the Apache integration:

apache
└───data_stream
│   └───access
│   │   └───elasticsearch/ingest_pipeline
│   │          default.yml
│   └───error
│   └───status
  1. The ingest pipeline definition for the access logs data stream of the Apache integration

An ingest pipeline definition requires a description and an array of processors. Here’s a snippet of the access logs ingest pipeline:

description: "Pipeline for parsing Apache HTTP Server access logs."
processors:
- set:
    field: event.ingested
    value: '{{_ingest.timestamp}}'
- rename:
    field: message
    target_field: event.original
- remove:
    field: apache.access.time
    ignore_failure: true

Open each elasticsearch/ingest_pipeline/default.yml file created for each data stream. Edit each ingest pipeline to match your needs.

The processor reference provides a list of all available processors and their configurations.

Integrations can use multiple pipelines to organize complex processing logic:

  • default.yml - The main pipeline that runs for all documents in the data stream
  • Custom pipelines - Additional pipelines for specific processing needs (e.g., parser.yml, enrichment.yml)
  • @custom pipeline - A user-defined pipeline that runs after the integration's pipelines

Pipelines can call other pipelines using the pipeline processor:

processors:
- pipeline:
    name: '{{ IngestPipeline "parser" }}'
    if: ctx.event?.original != null

Best practices for organizing pipelines:

  • Keep the default pipeline focused on core transformations
  • Split complex parsing logic into separate pipelines
  • Use conditional pipelines for format-specific processing
  • Document the purpose of each pipeline clearly

Here are frequently used processor patterns for data transformation:

Use the date processor to parse timestamps with multiple possible formats:

- date:
    field: apache.access.time
    target_field: "@timestamp"
    formats:
      - dd/MMM/yyyy:HH:mm:ss Z
      - ISO8601
      - UNIX_MS
    timezone: "{{ event.timezone }}"

Parse JSON strings into structured fields:

- json:
    field: message
    target_field: parsed
    add_to_root: true
    on_failure:
      - append:
          field: error.message
          value: "Failed to parse JSON: {{{ _ingest.on_failure_message }}}"

Use grok for complex patterns or dissect for fixed delimiters:

# Grok for flexible patterns
- grok:
    field: message
    patterns:
      - '%{IPORHOST:source.ip} %{USER:user.name} \[%{HTTPDATE:timestamp}\] "%{WORD:http.request.method} %{DATA:url.path}"'
    pattern_definitions:
      CUSTOM_PATTERN: "your-regex-here"

# Dissect for better performance with fixed formats
- dissect:
    field: message
    pattern: "%{source.ip} - %{user.name} [%{timestamp}] \"%{http.request.method} %{url.path}\""

Apply processors only when conditions are met:

- lowercase:
    field: http.request.method
    if: ctx.http?.request?.method != null
- remove:
    field: temp_field
    if: ctx.tags?.contains('processed')

Proper error handling ensures pipeline resilience:

For non-critical processors that may fail:

- convert:
    field: http.response.status_code
    type: long
    ignore_failure: true
  1. Continue if conversion fails

For handling specific failure scenarios:

- json:
    field: message
    on_failure:
      - set:
          field: error.type
          value: "json_parse_error"
      - set:
          field: error.message
          value: "{{{ _ingest.on_failure_message }}}"

Define fallback behavior for the entire pipeline:

description: Pipeline with error handling
processors:
  - json:
      field: message
on_failure:
  - set:
      field: error.pipeline
      value: "default"
  - set:
      field: event.kind
      value: "pipeline_error"

Common error handling patterns:

  • Use ignore_failure for optional fields
  • Add error details to dedicated error fields
  • Log errors for debugging but don't block ingestion

Testing ensures your pipelines work correctly before deployment. See the pipeline testing documentation for comprehensive testing strategies.

Pipeline performance impacts ingestion throughput:

  • Place drop processors early to filter unwanted documents
  • Run field existence checks before complex operations
  • Group related processors together

Use these processors sparingly:

  • script - Painless scripts have overhead
  • enrich - Requires lookups against Elasticsearch
  • geoip/user_agent - Database lookups
  • grok with complex patterns - Consider dissect for fixed formats
# Drop unwanted documents early
- drop:
    if: ctx.event?.dataset != "apache.access"

# Check field existence before processing
- lowercase:
    field: user.name
    if: ctx.user?.name != null

# Use dissect instead of grok when possible
- dissect:
    field: message
    pattern: "%{} - %{} [%{}] \"%{method} %{path} %{}\" %{status} %{}"
  • Check pipeline stats: GET _nodes/stats/ingest
  • Monitor ingestion rates in Stack Monitoring
  • Set up alerts for pipeline failures