ELK Stack

ELK Stack

El stack ELK es una solución para la recolección, la centralización y la explotación de logs.
Estos últimos años, se ha vuelto la solución recomendada para los sistemas complejos y distribuidos que generan muchos logs.

Lleva al menos esos tres componentes:
E para ElasticSearch
L para Logstash
K para Kibana

ElasticSearch

Es una base de datos orientada a documentos (JSON) con schema opcional, tiene tres casos de uso principales :

  • Motor de búsqueda de texto
  • Análisis de logs
  • Business Intelligence

ElasticSearch es muy bueno para almacenar grandes volúmenes de documentos, indexarlos, y buscar textos en ellos, lo que le hace una herramienta muy adaptada para analizar las cantidades de archivos logs que generan las aplicaciones hoy en día.

Logstash

Es un procesador de datos, que funciona como un pipeline en tres partes :

  • Input
  • Filter
  • Output

Logstash viene con varios plugins de entrada y salida, lo que le permite colectar datos de varios fuentes y mandarlos a varias partes, los plugins de filtro, permiten tratar estos datos dándole sentido, como por ejemplo, convirtiendo una IP en una zona geográfica.

Kibana

Es un visualizador de datos, especialmente desarrollado para ElasticSearch.
Permite explorar los datos recolectados en ElasticSearch, construir varios tipos de gráficos, incluyendo mapas, y componer dashboards a partir de esos gráficos.

Elastic

Esos tres componentes son obras de la compañía Elastic (https://www.elastic.co/) y son Open Source :
– ElasticSearch (Java) https://github.com/elastic/elasticsearch
– Kibana (Javascript) https://github.com/elastic/kibana
– Logstash (Ruby) https://github.com/elastic/logstash

¿Como funcionan juntos?

La configuración de ElasticSearch y Kibana es bastante fácil, solo hay que indicarle a Kibana la URL de ElasticSearch, por ejemplo en una configuración docker-compose :

elastic:
image: docker.elastic.co/elasticsearch/elasticsearch:5.6.3
volumes:
- logs:/usr/share/elasticsearch/data
ports:
- 9200:9200
kibana:
image: docker.elastic.co/kibana/kibana:5.6.3
environment:
- ELASTICSEARCH_URL=http://elastic:9200
ports:
- 5601:5601
depends_on:
- elastic

Eso aplica también para el output de Logstash, solo hay que ocupar el plugin de salida para ElasticSearch y indicarle la URL:

output {
    elasticsearch {
        hosts => "elastic:9200"
    }
}

Lo más complicado es la recolección de los logs, ya que no hay dos aplicaciones que ocupen el mismo formato.

Modelo 1
Logstash puede colectar los logs directamente desde el filesystem, pero en este caso, se necesitará una instancia de logstash en cada nodo. Es el modelo 1, que se representa a continuación :

ELK1

Modelo 2
Un modelo más evolucionado consiste en ocupar Filebeat, es un agente liviano especialmente escrito para ese caso de uso, es capaz de comunicarse con Logstash de forma inteligente. También es de Elastic y es Open Source (escrito en Go), hace parte de una familia más grande llamada “Beats” que conlleva otros productos relacionados.
En este modelo, se puede tener una instancia única de Logstash con un agente Filebeat en cada nodo :

ELK2

Modelo 3
Finalmente, se pueden configurar los componentes en un tercer modelo, agregándole un message queue broker, lo que permite desacoplar la recolección y el tratamiento de los logs :

ELK3

De esa forma, se puede detener el ElasticSearch, por mantención por ejemplo, sin pérdida de logs, ya que se acumulan en la cola. También permite que no se colapse el sistema cuando llegan muchos logs al mismo tiempo; la cola hará un papel de amortiguador de carga, en ese caso. Como Message Broker, entre otros, se puede ocupar Redis.
Existen plugins de entrada y de salida para Logstash.

Salida:

output {
    redis {
        host => "redis"
        data_type => "list"
        key => "logstash"
    }
}

Entrada:

input {
    redis {
        host => "redis"
        data_type => "list"
        key => "logstash"
    }
}

Ejemplos de configuración de Filebeat

Este ejemplo es una configuración (filebeat.yml) que mueve los logs de un servidor HTTP Nginx a una instancia Logstash:

filebeat.prospectors:
- input_type: log
paths:
- /var/log/nginx/access.log
name: "nginx"
output.logstash:
hosts: ["logstash-collector:5044"]

Este segundo ejemplo colecta los logs de una aplicación corriendo en JBoss:

filebeat.prospectors:
- input_type: log
paths:
- /opt/jboss/jboss-eap-6.4/standalone/log/server.log
multiline:
pattern: '^[0-9]{2}/[0-9]{2}/[0-9]{4}'
negate: true
match: after
- input_type: log
paths:
- /opt/jboss/jboss-eap-6.4/standalone/log/app/*.log
multiline:
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
name: "hub"
output.logstash:
hosts: ["logstash-collector:5044"]

Acá se puede ver que Filebeat es capaz de leer desde diferentes archivos, y también, gracias al multiline pattern, de unir varias líneas del archivo en una sola entrada de log (lo que típicamente ocurre con los logs que producen las aplicaciones Java).

Ejemplos de configuración de Logstash

Este es un ejemplo de archivo de configuración en el modelo 3.
Corresponde a la primera instancia de Logstash que colecta los logs de los diferentes agentes Filebeat y los manda en Redis, sin tratamiento inmediato.

Archivo logstash.conf :

input {
    beats {
        port => 5044
    }
}

output {
    redis {
        host => "redis"
        data_type => "list"
        key => "logstash"
    }
}

Este segundo ejemplo, corresponde a la segunda instancia de Logstash, la que recupera los logs desde la cola Redis, los transforma en algo con sentido y los indexa en ElasticSearch :

input {
    redis {
        host => "redis"
        data_type => "list"
        key => "logstash"
    }
}

filter {
    if [beat][name] == "meta" {
        if [source] = ~"meta-search\.log$" {
            grok {
                match => {
                    "message" => "(\e?\[\d+m)+\[%{DATESTAMP:datetime}\] \[%{DATA:user}\] \[%{LOGLEVEL:level}] \[th:%{DATA:thread}\] \[%{GREEDYDATA:category}\]:%{INT:line} - %{GREEDYDATA:message}"
                }
                overwrite => ["message"]
            }
        } else {
            grok {
                match => {
                    "message" => "\[%{DATESTAMP:datetime}\] \[%{DATA:user}\] \[%{LOGLEVEL:level}] \[%{GREEDYDATA:category}\]:%{INT:line} - %{GREEDYDATA:message}"
                }
                overwrite => ["message"]
            }
        }
        date {
            match => ["datetime", "dd/MM/yyyy HH:mm:ss,SSS"]
            timezone => "America/Santiago"
            remove_field => ["datetime"]
        }
    } else if [beat][name] == "hub" {
        if [source] = ~"server\.log$" {
            grok {
                match => {
                    "message" => "%{DATESTAMP:datetime} %{LOGLEVEL:level} +\[%{GREEDYDATA:category}\] \(%{DATA:thread}\) %{GREEDYDATA:message}"
                }
                overwrite => ["message"]
            }
            date {
                match => ["datetime", "dd/MM/yyyy HH:mm:ss,SSS"]
                timezone => "America/Santiago"
                remove_field => ["datetime"]
            }
        } else {
            grok {
                match => {
                    "message" => "%{TIMESTAMP_ISO8601:datetime} %{LOGLEVEL:level} %{GREEDYDATA:category}:%{INT:line} - %{GREEDYDATA:message}"
                }
                overwrite => ["message"]
            }
            date {
                match => ["datetime", "yyyy-MM-dd HH:mm:ss"]
                timezone => "America/Santiago"
                remove_field => ["datetime"]
            }
        }
    } else if [beat][name] == "hazelcast" {
        grok {
            match => {
                "message" => "%{MONTH:month} %{MONTHDAY:day}, %{YEAR:year} %{TIME:time} (?<ampm>[AP]M) %{GREEDYDATA:category}\n%{LOGLEVEL:level}: \[%{GREEDYDATA:ip}\]:?(?<port>[0-9]{0,5}) \[%{GREEDYDATA:groupname}\] \[%{GREEDYDATA:version}\] ?%{GREEDYDATA:message}"
            }
            overwrite => ["message"]
        }
        mutate {
            add_field => {
                "datetime" => "%{year} %{month} %{day} %{time} %{ampm}"
            }
        }
        date {
            match => ["datetime", "yyyy MMM dd h:mm:ss a"]
            timezone => "America/Santiago"
            remove_field => ["year", "month", "day", "time", "ampm", "datetime"]
        }
    } else if [beat][name] == "elasticsearch" {
        grok {
            match => {
                "message" => "\[%{TIMESTAMP_ISO8601:datetime}\]\[%{LOGLEVEL:level} ?\]\[%{NOTSPACE:category} *\] \[%{WORD:nodename}\] %{GREEDYDATA:message}"
            }
            overwrite => ["message"]
        }
        date {
            match => ["datetime", "ISO8601"]
            timezone => "America/Santiago"
            remove_field => ["datetime"]
        }
    } else if [beat][name] == "nginx" {
        grok {
            match => {
                "message" => "\[%{HTTPDATE:time_local}\]\[%{IPORHOST:remote_addr}\]\[%{GREEDYDATA:request}\]\[%{INT:status}\]\[%{INT:body_bytes_sent}\]\[%{GREEDYDATA:http_user_agent}\]"
            }
        }
        date {
            match => ["time_local", "dd/MMM/YYYY:HH:mm:ss Z"]
            remove_field => ["time_local"]
        }
        geoip {
            source => "remote_addr"
        }
        useragent {
            source => "http_user_agent"
        }
    }
}

output {
    elasticsearch {
        hosts => "elastic:9200"
    }
}

Por supuesto, es acá donde ocurre toda la magia, por ejemplo el filtro geoip permite sacar informaciones geográficas de las IP que aparecen en los logs de Nginx, el filtro useragent sirve para formatear el http header user-agent en el tipo de navegador que ocupan los clientes.

El filtro grok, sin duda el más importante, permite separar los diferentes campos que lleva cada mensaje del log, gracias a expresiones regulares, que ocupan un formato bien particular.

Resultó muy útil ocupar este tester online: http://grokconstructor.appspot.com/do/match, que permite probar la sintaxis del pattern grok con extractos de su archivo log. Kibana en sus últimas versiones, también incluye un debugger de grok patterns.

El filtro date también es muy importante ya que permite asignar a cada entrada del log la fecha real en la cual sucedió el evento, sin eso, por defecto se le asignaría la fecha de entrada en Logstash que no es necesariamente pertinente.

En un próximo post, introduciremos el stack EFK, que es más adecuada para recolectar y centralizar los logs en un entorno basado en contenedores Docker.