Integrate. Transform. Explore. Operationalize.

Prepare and Explore Data at Scale

Empower data engineers, analysts, and scientists with one platform.

Logstash, the magic pipeline for eveything

Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases.

While Logstash originally drove innovation in log collection, its capabilities extend well beyond that use case. Any type of event can be enriched and transformed with a broad array of input, filter, and output plugins, with many native codecs further simplifying the ingestion process. Logstash accelerates your insights by harnessing a greater volume and variety of data.

Read more on logstash use cases here

The Logstash event processing pipeline has three stages: inputs → filters → outputs. Inputs generate events, filters modify them, and outputs ship them elsewhere. Inputs and outputs support codecs that enable you to encode or decode the data as it enters or exits the pipeline without having to use a separate filter.

You can read more about logstash pipelines here

In this post we will cover following use cases:

  1. How yo pull data from http Rest endpoint using http poller plugin
  2. How to pull data from a JMS Queue using JMS plugin
  3. How to distribute events to a JMS Queue using Stomp plugin

HTTP Poller

This Logstash input plugin allows you to call an HTTP API, decode the output of it into event(s), and send them on their merry way. In this example we will pull data from the dataset “Vehicle GPS Data: Department of Public Services” provided by the City of Cincinnati using Rest API

input {
  http_poller {
    urls => {
      b56d => {
        method => "GET"
        url => "https://data.cincinnati-oh.gov/resource/b56d-ydmm.json?$limit=50000&$offset=0&$order=time"
        headers => {
          Accept => "application/json"
        }
     }
    }
    request_timeout => 60
    schedule => { cron => "* * * * * UTC"}
    codec => "json"
    metadata_target => "[@metadata][response_metadata]"
  }
}

filter {

#Store Processing date time
mutate { rename => [ "@timestamp", "processing_time" ]}

# The main @timestamp of the event
date { 
		match => ["time", "yyyy-MM-dd'T'HH:mm:ss'.'SSS"]
		target => "@timestamp"
	}
	
date { 
		match => ["loadts", "yyyy-MM-dd'T'HH:mm:ss'.'SSS"]
		target => "loadts"
	}
	
# Some conversions
mutate {
			convert => {
				"latitude"  => "float"
				"longitude" => "float"
				"speed" => "float"
				"distance_traveled" => "float"
				"odometer" => "float"
					}
		}
#"device_position": { "type": "geo_point" }
mutate {
    rename => {
        "latitude" => "[device_position][lat]"
        "longitude" => "[device_position][lon]"
    }
}

# Unique Id of the metric
fingerprint {
			method => "MD5"
			key => "secret hashing key"
			concatenate_sources => true
			concatenate_all_fields => false
			source => ["asset", "@timestamp"]
			target => "[@metadata][document_id]"
		}
		
#remove unecessary fields added by logstash
mutate { remove_field => ["host", "@version", "time"] }
}

output {		
   elasticsearch {
        hosts => ["http://127.0.0.1:9000"]
	index => "sensors-data-%{+YYYY-MM-dd}"
	document_id => "%{[@metadata][document_id]}"
        action => "index"
	}
}

The data stored into elasticsearch can be visualized with Kibana

JMS Plugin

Logstash allows also to read events from a Jms Broker like Active MQ. Supports both Jms Queues and Topics.

When scaling Logstash it is common to add a message broker that is used to temporarily buffer incoming messages before they are being processed by one or more Logstash nodes. Data is pushed to the brokers either through a shipper like filebeat that reads logfiles and sends each event to the broker. Alternatively the application can send the log events directly using something like a Log4j appender.

A common option is to use Redis as a broker that stores the data in memory but using other options like Apache Kafka is also possible. Sometimes organizations are not that keen to introduce lots of new technology and want to reuse existing stores. ActiveMQ is a widely used messaging and integration platform that supports different protocols and looks just perfect for the use as a message broker. And here is an example of complete configuration of how logstash can read events from ActiveMQ.

 input
 {
    jms {
        broker_url => 'failover:(tcp://127.0.0.1:61616)?initialReconnectDelay=100' 
        destination => 'sensors-data' 
        factory => 'org.apache.activemq.ActiveMQConnectionFactory' 
        pub_sub => false 
        use_jms_timestamp => false 
        # JMS provider credentials if needed 
        username => 'system'
        password => 'manager'
        include_header => false
        include_properties => false
        include_body => true
        # Jar Files to include
        require_jars => ['/apache-activemq/activemq-all-5.16.0.jar'] 
    }
  }
  
output {		
   elasticsearch {
        hosts => ["http://127.0.0.1:9000"]
	index => "sensors-data-%{+YYYY-MM-dd}"
	document_id => "%{[@metadata][document_id]}"
        action => "index"
	}
}

Stomp Plugin

This plugin writes events using the STOMP protocol to a Broker that support STOMP like ActiveMQ. It’s not bundled by default with logstash, it is easy to install it by running bin/logstash-plugin install logstash-output-stomp and here is an example of output configuration that can be used to send all events to the sensors-data Queue.


output {
	stomp {
		  host => "127.0.0.1"
		  port => 61613
		  user => system
		  password => manager
		  destination => "sensors-data"
		  headers => {
			"persistent" => true
		  }
	}
}

All configurations and scripts used in this blog post can be found in our github repository

Posted on October 22, 2020 by Yassine, LASRI