Adding Elasticsearch to legacy applications using Logstash

By Shriram Untawale

Editor's note:  An earlier version of this post was published in Hackernoon

Elasticsearch has created a boom in the market with its ability to store, scale, search and analyze humongous amounts of data in near real time. The open source engine is usually used in applications that have complex search features. But it can also be used to retrofit legacy apps that lack heavy duty search.

Suppose you have a legacy app running on a MySQL database, and you want to add search to it. You don’t have to replace the database. Instead, you can preserve it as the primary data store, still handling transactions, and bring up Elasticsearch as a secondary data store to handle the search load.

The relational database remains the single source of truth, available for the usual application-db transactions, while Elasticsearch, with its tables flattened out, rips through searches, giving a legacy database value that it never had before.

So now the logical questions are:

  • How will I migrate all my data from a structured data source (MySQL) to a non-structured data source (Elasticsearch)?
  • Can this migration be done without any downtime?
  • How will I keep both data source in sync?

One of the solutions to this is using Logstash input plugin.

Replication of data to Elasticsearch using Logstash

Logstash is a plugin-based data collection and processing engine. It comes with a wide range of plugins that makes it possible to easily configure it to collect, process and forward data in many different architectures.

Processing is organized into one or more pipelines. In each pipeline, one or more input plugins receive or collect data that is then placed on an internal queue. The queue is by default small and held in memory, but it can be configured to be larger and persisted on disk in order to improve reliability and resiliency.

Processing threads read data from the queue in micro-batches and process these through any configured filter plugins in sequence. Logstash out-of-the-box comes with many plugins targeting specific types of processing. This is how data is parsed, processed and enriched.

Once the data has been processed, the processing threads send the data to the appropriate output plugins, which are responsible for formatting and sending data onwards (e.g., to Elasticsearch).

Sample Logstash Pipeline

Consider a simple e-commerce website that maintains customer and order data. With Elasticsearch we can index customer-related data to perform extensive search operations.

1. MySQL database creation

Create a sample database with name ecomdb and ensure we are using the same database

CREATE DATABASE ecomdb;
USE ecomdb;

 

2. Now create a customer table with below query and insert sample data

CREATE TABLE customer (
id INT(6)  AUTO_INCREMENT PRIMARY KEY,
firstname VARCHAR(30) NOT NULL,
lastname VARCHAR(30) NOT NULL,
email VARCHAR(50),
regdate TIMESTAMP
)
INSERT INTO `ecomdb`.`customer` (`id`, `firstname`, `lastname`, `email`, `regdate`) VALUES (1, 'Roger', 'Federer', ', 'roger.federer@yomail.com', '2019-01-21 20:21:49');

INSERT INTO `ecomdb`.`customer` (`id`, `firstname`, `lastname`, `email`, `regdate`) VALUES (2, 'Rafael', 'Nadal', 'rafael.nadal@yomail.com', '2019-01-22 20:21:49');

INSERT INTO `ecomdb`.`customer` (`id`, `firstname`, `lastname`, `email`, `regdate`) VALUES (3, 'John', 'Mcenroe', 'john.mcenroe@yomail.com', '2019-01-23 20:21:49');

INSERT INTO `ecomdb`.`customer` (`id`, `firstname`, `lastname`, `email`, `regdate`) VALUES (4, 'Ivan', 'Lendl', 'ivan.lendl@yomail.com', '2019-01-23 23:21:49');

INSERT INTO `ecomdb`.`customer` (`id`, `firstname`, `lastname`, `email`, `regdate`) VALUES (5, 'Jimmy', 'Connors', 'jimmy.connors@yomail.com', '2019-01-23 22:21:49');

 

3. Creating our Logstash configuration.

input {
  jdbc {
    jdbc_driver_library => "<pathToYourDataBaseDriver>\mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
    jdbc_user => <db username>
    jdbc_password => <db password>
    tracking_column¹ => "regdate"
    use_column_value²=>true
    statement => "SELECT * FROM ecomdb.customer where regdate >:sql_last_value;"
schedule³ => " * * * * * *"
  }
}
output {
  elasticsearch {
    document_id=> "%{id}"
    document_type => "doc"
    index => "test"
    hosts => ["http://localhost:9200"]
  }
  stdout{
  codec => rubydebug
  }
}

¹tracking_column: The column whose value is to be tracked for any changes. Here we will track regdate column as it will be updated whenever we have a new entry to our database.

² use_column_value: When set to true, uses the defined tracking_column value as the :sql_last_value. When set to false:sql_last_value reflects the last time the query was executed.

The jdbc plugin will persist the sql_last_value parameter in the form of a metadata file default location of that file is c:/users/<yourUser>. Upon query execution, this file will be updated with the current value of sql_last_value. Next time the pipeline starts up, this value will be updated by reading from the file. sql_last_value will be set to Jan 1, 1970, or 0 if use_column_value is true.

³schedule: This will periodically run statement ,values is defined in Cron format for example: “* * * * *” (execute query every minute, on the minute). Here we will execute the statement every second so even if any updates or inserts are done on our data, we will be able to migrate it in our next query execution and our data will be in sync.

⁴ document_id: The document ID for the index is useful for overwriting existing entries in Elasticsearch with the same ID. This will solve duplication issue if the logstash instance ever fails.

The file configured above can be divided into two major sections.

  • Input plugin (JDBC plugin): Here we define which database URI to connect, user credentials and the query which will give us the required data.
  • Output plugin (ElasticSearch plugin): Here we define the Elasticsearch host URL and the index name to which data is to be indexed.

Save the above code in a file named logstash-sample.conf and location of this file should be in the bin folder of your Logstash installation.

 

4. Run Logstash with below command from the bin folder of Logstash installation

logstash -f logstash-sample.conf

Logstash will fetch your data from your database and post it to ElasticSearch.

 

5. Verifying our data on ElasticSearch by executing below command

curl -X GET "localhost:9200/test/_search"

The output of the above command will be:

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 1,
    "hits": [
      {
        "_index": "test",
        "_type": "doc",
        "_id": "4",
        "_score": 1,
        "_source": {
          "firstname": "Ivan",
          "id": 4,
          "email": "ivan.lendl@yomail.com",
          "lastname": "Lendl",
          "@version": "1",
          "regdate": "2019-01-23T17:51:49.000Z",
          "@timestamp": "2019-02-02T06:20:12.413Z"
        }
      },
      {
        "_index": "test",
        "_type": "doc",
        "_id": "2",
        "_score": 1,
        "_source": {
          "firstname": "Rafael",
          "id": 2,
          "email": "rafael.nadal@yomail.com",
          "lastname": "Nadal",
          "@version": "1",
          "regdate": "2019-01-22T14:51:49.000Z",
          "@timestamp": "2019-02-02T06:20:12.411Z"
        }
      },
      {
        "_index": "test",
        "_type": "doc",
        "_id": "1",
        "_score": 1,
        "_source": {
          "firstname": "Roger",
          "id": 1,
          "email": "roger.federer@yomail.com",
          "lastname": "Federer",
          "@version": "1",
          "regdate": "2019-01-21T14:51:49.000Z",
          "@timestamp": "2019-02-02T06:20:12.389Z"
        }
      }
    ]
  }
}

Let’s add some twist to the above use case. Say suppose we wish to index order details of each user along with user details in the same document.

1. Create an order table in MySQL DB.

CREATE TABLE orders (
orderid INT(6)  AUTO_INCREMENT PRIMARY KEY,
product VARCHAR(300) NOT NULL,
description VARCHAR(300) NOT NULL,
price int(6),
customerid int(6),
ordertime TIMESTAMP,
FOREIGN KEY fk_userid(customerid)
REFERENCES customer(id)
)

INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
 VALUES (1, 'Tennis Ball', 'Wilson Australian Open', '330', '5','2019-01-22 20:21:49');

INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
 VALUES (2, 'Head Xtra Damp Vibration Dampner', 'Dampens string vibration to reduce the risk of pain', '500', '4','2019-01-23 02:21:49');

INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
 VALUES (3, 'HEAD Wristband Tennis 2.5" (White)', '80 % Cotton, 15% Nylon, 5 % Rubber (Elasthan)', '530', '3','2019-01-21 21:21:49');

INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
 VALUES (4, 'YONEX VCORE Duel G 97 Alfa (290 g)', 'Head Size 97', '4780', '2','2019-01-22 14:21:49');

INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
 VALUES (5, 'Wilson Kaos Stroke - White & Black', 'Wilson Australian Open', '9000', '1','2019-01-25 03:53:49');

With the database ready, we wish to index order details in the same document as a nested JSON object, along with user details, we will make use of a Filter plugin provided by Logstash. There are various plugins supported by Logstash, and we can choose one according to our need. We are going to use Ruby FilterWith the ruby filter, we can execute any random ruby code.

The question that might have popped up in our mind is

  • How will we get data from two different tables using JDBC input plugin? We will be using join query.
select c.id as customerid,c.firstname ,c.lastname ,c.email, c.regdate ,
od.orderid ,od.product ,od.description , od.price ,od.ordertime
from customer as c left join  orders as od on c.id = od.customerid;

 

2. Write Ruby code as per our requirement.

Ruby code for manipulating our document goes as follows.

# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
orderid =event.get("orderid")
product = event.get("product")
description = event.get("description")
price = event.get("price")
ordertime = event.get("ordertime")
orderDetails ={
   "orderid" => orderid,
   "product" => product,
   "description" => description,
   "price" => price,
   "ordertime" => ordertime
}
event.set('orderDetails',orderDetails)
event.remove('orderid')
event.remove('product')
event.remove('description')
event.remove('price')
event.remove('ordertime')
return [event] 
end

Name the ruby file as sampleRuby.rbRuby filter has a mandatory filter method that accepts a Logstash event and must return an array of events. In the above code, we have manipulated the event by creating a hash of order details and set that hash as a new field in the event. We have also removed the fields that are not required after the order details hash being added.

 

3. Adding the ruby filter to the logstash configuration file.

The new version of the logstash-sample.conf file will look as follows

input {
  jdbc {
    jdbc_driver_library => "<pathToYourDataBaseDriver>\mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
    jdbc_user => <db user name>
    jdbc_password => <db password>
    tracking_column => "regdate"
    use_column_value=>true
    statement => "select c.id as customerid,c.firstname ,c.lastname  ,c.email, c.regdate ,od.orderid ,od.product ,od.description , od.price ,od.ordertime from customer as c left join  orders as od on c.id = od.customerid where c.regdate>:sql_last_value;"
schedule => " * * * * * *" 
}
}
filter{
ruby{
path¹ => 'sampleRuby.rb'
}
}
output {
  elasticsearch {
    document_id=> "%{customerid}"
    document_type => "doc"
    index => "test"
    hosts => ["http://localhost:9200"]
  }
  stdout{
  codec => rubydebug
  }
}

¹ The path of the ruby script file that implements the filter method. Location of the ruby file should be same as that of logstash-sample.conf

 

4. Running the above config file using below command

logstash -f logstash-sample.conf

The output of the above script will be as follows

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 5,
    "max_score": 1,
    "hits": [
      {
        "_index": "test",
        "_type": "doc",
        "_id": "5",
        "_score": 1,
        "_source": {
          "orderDetails": {
            "orderid": 1,
            "description": "Wilson Australian Open",
            "product": "Tennis Ball",
            "ordertime": "2019-01-22T14:51:49.000Z",
            "price": 330
          },
          "@version": "1",
          "email": "jimmy.connors@yomail.com",
          "@timestamp": "2019-02-02T14:13:46.754Z",
          "regdate": "2019-01-23T16:51:49.000Z",
          "firstname": "Jimmy",
          "customerid": 5,
          "lastname": "Connors"
        }
      }
    ]
  }
}

As highlighted above we can see we have added a nested JSON to our existing document.

Logstash plugin can serve the purpose of migrating our legacy systems to ElasticSearch. In this way, we have migrated our search part of our application to a search engine instead of using search features provided by our datastore. We are keeping our source of truth in the SQL database, but you could also imagine migrating from the legacy datastore to a NoSQL.

Note: JDBC input plugin is not able to track the delete events(hard delete) on your database. You may consider modifying your database table with an isdeleted flag and use that column as a tracking column.



About me

I am a certified Java developer with more than five years of experience working on backend applications. I am particularly interested in Elastic Search, Python, and AWS. For HP in Pune, India, l am currently a senior software developer for the Device-as-a-Service (DaaS) program, where I contribute my extensive knowledge and experience to developing modules and applications. (I mostly worked on Java and Springboot microservices on AWS.) I also contributed to the development of a smart city project for the Government of India – a CISCO project for the city of Jaipur.

I came to HP from Cognizant and Great Software labs, where I worked on an IoT domain and major banking projects. 

Outside of work, I enjoy trekking and collecting vintage coins and stamps.
 

Author : Shriram_Untawale

Good one