Edit ingest pipelines
In most instances, before you ingest data into the Elastic Stack, the data needs to be manipulated. For example, you should parse your logs into structured data before ingestion. To do so, integrations use ingest pipelines.
Ingest pipelines let you perform common transformations on your data before indexing. For example, you can use pipelines to remove fields, extract values from text, and enrich your data.
A pipeline consists of a series of configurable tasks called processors. Each processor runs sequentially, making specific changes to incoming documents. After the processors have run, Elasticsearch adds the transformed documents to your data stream or index.
Learn more in the ingest pipeline reference.
Ingest pipelines are defined in the elasticsearch/ingest_pipeline
directory. They only apply to the parent data stream within which they live. For our example, this would be the apache.access
dataset.
For example, the Apache integration:
apache
└───data_stream
│ └───access
│ │ └───elasticsearch/ingest_pipeline
│ │ default.yml
│ └───error
│ └───status
- The ingest pipeline definition for the access logs data stream of the Apache integration
An ingest pipeline definition requires a description and an array of processors. Here’s a snippet of the access logs ingest pipeline:
description: "Pipeline for parsing Apache HTTP Server access logs."
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- rename:
field: message
target_field: event.original
- remove:
field: apache.access.time
ignore_failure: true
Open each elasticsearch/ingest_pipeline/default.yml
file created for each data stream. Edit each ingest pipeline to match your needs.
The processor reference provides a list of all available processors and their configurations.
Integrations can use multiple pipelines to organize complex processing logic:
- default.yml - The main pipeline that runs for all documents in the data stream
- Custom pipelines - Additional pipelines for specific processing needs (e.g.,
parser.yml
,enrichment.yml
) - @custom pipeline - A user-defined pipeline that runs after the integration's pipelines
Pipelines can call other pipelines using the pipeline
processor:
processors:
- pipeline:
name: '{{ IngestPipeline "parser" }}'
if: ctx.event?.original != null
Best practices for organizing pipelines:
- Keep the default pipeline focused on core transformations
- Split complex parsing logic into separate pipelines
- Use conditional pipelines for format-specific processing
- Document the purpose of each pipeline clearly
Here are frequently used processor patterns for data transformation:
Use the date
processor to parse timestamps with multiple possible formats:
- date:
field: apache.access.time
target_field: "@timestamp"
formats:
- dd/MMM/yyyy:HH:mm:ss Z
- ISO8601
- UNIX_MS
timezone: "{{ event.timezone }}"
Parse JSON strings into structured fields:
- json:
field: message
target_field: parsed
add_to_root: true
on_failure:
- append:
field: error.message
value: "Failed to parse JSON: {{{ _ingest.on_failure_message }}}"
Use grok
for complex patterns or dissect
for fixed delimiters:
# Grok for flexible patterns
- grok:
field: message
patterns:
- '%{IPORHOST:source.ip} %{USER:user.name} \[%{HTTPDATE:timestamp}\] "%{WORD:http.request.method} %{DATA:url.path}"'
pattern_definitions:
CUSTOM_PATTERN: "your-regex-here"
# Dissect for better performance with fixed formats
- dissect:
field: message
pattern: "%{source.ip} - %{user.name} [%{timestamp}] \"%{http.request.method} %{url.path}\""
Apply processors only when conditions are met:
- lowercase:
field: http.request.method
if: ctx.http?.request?.method != null
- remove:
field: temp_field
if: ctx.tags?.contains('processed')
Proper error handling ensures pipeline resilience:
For non-critical processors that may fail:
- convert:
field: http.response.status_code
type: long
ignore_failure: true
- Continue if conversion fails
For handling specific failure scenarios:
- json:
field: message
on_failure:
- set:
field: error.type
value: "json_parse_error"
- set:
field: error.message
value: "{{{ _ingest.on_failure_message }}}"
Define fallback behavior for the entire pipeline:
description: Pipeline with error handling
processors:
- json:
field: message
on_failure:
- set:
field: error.pipeline
value: "default"
- set:
field: event.kind
value: "pipeline_error"
Common error handling patterns:
- Use
ignore_failure
for optional fields - Add error details to dedicated error fields
- Log errors for debugging but don't block ingestion
Testing ensures your pipelines work correctly before deployment. See the pipeline testing documentation for comprehensive testing strategies.
Pipeline performance impacts ingestion throughput:
- Place
drop
processors early to filter unwanted documents - Run field existence checks before complex operations
- Group related processors together
Use these processors sparingly:
- script - Painless scripts have overhead
- enrich - Requires lookups against Elasticsearch
- geoip/user_agent - Database lookups
- grok with complex patterns - Consider
dissect
for fixed formats
# Drop unwanted documents early
- drop:
if: ctx.event?.dataset != "apache.access"
# Check field existence before processing
- lowercase:
field: user.name
if: ctx.user?.name != null
# Use dissect instead of grok when possible
- dissect:
field: message
pattern: "%{} - %{} [%{}] \"%{method} %{path} %{}\" %{status} %{}"
- Check pipeline stats:
GET _nodes/stats/ingest
- Monitor ingestion rates in Stack Monitoring
- Set up alerts for pipeline failures