Log messages are one of the most useful sources of feedback for developers. They can help give a developer insight into the state of a system that they develop or maintain. In some cases, it’s beneficial to store the logs in a central place in order to avoid having to collect this informations from multiple places later on. 

That was the case when I was asked to find a way to collect log messages from 100 websites that were running on Drupal (version 7) and make them available for evaluation as one of the last steps of a long-running test suite. Amazon Kinesis Streams came to my mind when I had to cast my vote on a service that was prepared to receive a big amount of data and process it near real-time.

But let’s get down to business, and take a look at one of the ways Drupal logs can be collected into a Kinesis stream.

Problem

At first glance, the problem seems to be a complex one, but, luckily, open source communities produce a lot of great solutions to facilitate your work. Here are a couple of things that could come to mind, if you are faced with a problem like this:

  • Drupal stores almost everything in database, like it does with log messages, but it would be better to store them in a file
  • Find a tool that collects logs from a file and send them to Kinesis continuously
  • Find a way to make it work without coding too much

Solution

One of the most common way to store log messages in a file is logging to syslog. If logs are available in a file, Fluentd can do the remaining work for you, in addition, it has a plugin that sends events to Amazon Kinesis Streams directly.

Storing logs in a file

Syslog-ng is an open source log management solution that allows you to flexibly collect, parse, classify, and correlate logs from across your infrastructure. It allows you far more customizations than a regular syslog.

But how can you configure a Drupal site to make its logs collected by syslog-ng?

The answer is actually pretty easy and common for Drupal developers, there is a module for it. It’s only available for version 7, but you can also use the core syslog module to store the logs in a file. 
First, you have to install the drupal module on your site and configure it according to your expectations. Obviously, the module requires syslog-ng to be installed in your environment. After you customize the module, it will provide you with some configuration like the following:

  1. destination d_drupal { 
  2. file('/var/log/drupal.log'); 
  3. };
  4.  
  5. filter f_drupal {
  6. program('drupal');
  7. };
  8.  
  9. log {
  10. source(s_src); 
  11. filter(f_drupal); 
  12. destination(d_drupal);
  13. };

This configuration should be stored in a file that is placed in the /etc/syslog-ng/conf.d/ directory (e.g. /etc/syslog-ng/conf.d/10-drupal.conf). It tells syslog-ng, what rule it should follow. According to the example above, logs coming from your Drupal site will be saved to a file that is located at  /var/log/drupal.log (destination). After enabling syslog-ng logging on the module’s configuration page and restarting syslog-ng, Drupal log messages should show up in the destination file.

Sending logs to the stream

There are multiple ways to send data to Kinesis, you can use AWS CLI, a 3rd party tool or custom scripts. In most cases, the best choice is to use something that has already been used and tested by others. awslabs created output plugins for Fluentd that sends data to Amazon Kinesis Streams.

First, you have to install Fluentd. td-agent is the stable distribution package of Fluentd, it’s recommended for large scale environments. Its configuration should be placed in a file that is located at /etc/td-agent/td-agent.conf:

  1. ## Drupal log source
  2. <source>
  3.   type tail
  4.   format syslog
  5.   path /var/log/drupal.log
  6.   pos_file /var/log/td-agent/tmp/drupal.log.pos
  7.   keep_time_key true
  8.   refresh_interval 15
  9.   tag syslog.drupal
  10. </source>

According to the example above, td-agent will use its core tail input plugin to continuously track the /var/log/drupal.log syslog’s output file and refresh it every 15 seconds. It will also add a syslog.drupal tag to the collected data in order to help other plugins identify what they’re processing. If you want to keep the time field of the records, you have to set the keep_time_key parameter to true, because the parser removes it by default. It’s also highly recommend to set a pos_file that stores the position of the last read in a separated file. Next to the input plugin, you need to configure the output plugin too, so you also need to install and configure the Fluentd plugin for Amazon Kinesis to send the parsed data to a stream:

  1. ## Kinesis stream output
  2. <match syslog.drupal>
  3.    type kinesis_streams
  4.    stream_name my_kinesis_stream
  5.    region us-east-1
  6.    flush_interval 10
  7.    buffer_chunk_limit 1m
  8.    try_flush_interval 0.8
  9.    queued_chunk_flush_interval 0.1
  10.    num_threads 15
  11.    detach_process 5
  12.    debug true
  13. </match>

In this case, the agent will use the kinesis_streams output plugin to send data that is tagged with syslog.drupal to a kinesis stream that is named my_kinesis_stream in the us-east-1 region. There are other parameters that are worth mentioning, they are all intended for a better throughput. In the example above, the buffer plugin collects chunks in a queue until the top chunk exceeds the size or time limit that you can set by buffer_chunk_limit and flush_interval parameters. queued_chunk_flush_interval specifies the interval between data flushes for queued chunks. try_flush_interval controls how frequently the thread checks to create a new chunk and flush a pending one. The plugin can also be configured to execute multiple processes at the same time. detach_process indicates the number of processes that are executed in parallel,  num_threads parameter tells Fluentd how many threads a process should have. After restarting td-agent, the logs from the destination file will be sent to Kinesis continuously.

Authentication

Like every amazon web service, Kinesis also requires authentication. There are several ways to authenticate your requests, and Kinesis Fluentd plugin supports each of them:

  • Add aws_secret_key and aws_access_key parameters to the configuration file
  • Suggest ‘aws configure’ with AWS CLI
  • Store the keys in environment variables (e.g. AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  • Use IAM roles (assume_role_credentials)

Or, if your Drupal sites are running on EC2 instances, you can simply decide to use IAM instance profiles, in this case, credentials are unnecessary.

Reading From the Stream

The logs can be stored in the streams temporarily for up to 7 days (24 hours by default). Other AWS services - such as S3, Redshift or Lambda - can read these streams and process, store or analyze these logs. No matter what you want to do with the data, you need a kinesis consumer to retrieve them from a stream. Amazon provides some really useful documentation on how you can create these applications, but you can find a wide variety of consumers on github as well. Let’s see an example in Python that uses boto3, which is the official AWS SDK for that language.

  1. import boto3
  2. client = boto3.client('kinesis')

Before you can get data from the stream you need to obtain the shard iterator for the shard you are interested in. The example expects that your stream has only one shard. A shard iterator represents the position of the stream and shard from which the consumer will read.

  1. shard_iterator = client.get_shard_iterator(
  2.    StreamName                = stream_name,
  3.    ShardId                   = shard_id,
  4.    ShardIteratorType         = shard_iterator_type,
  5.    StartingSequenceNumber    = starting_sequence_num,
  6.    Timestamp                 = timestamp
  7. )

StreamName parameter holds the name of your stream, ShardId identifies the shard that the stream has. ShardIteratorType represents the position from which you want to start reading the stream. The following shard iterator types are available:

  • AT_SEQUENCE_NUMBER - Start reading from the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.
  • AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.
  • AT_TIMESTAMP - Start reading from the position denoted by a specific timestamp, provided in the value Timestamp.
  • TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard.
  • LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.

After you receive the first shard iterator, you can repeatedly send requests to retrieve records.

  1. repeat = True
  2. while (repeat):
  3.     response = client.get_records(
  4.        ShardIterator    = shard_iterator,
  5.        Limit            = 1000
  6.     )
  7.     shard_iterator = response[‘NextShardIterator’]
  8.     # DO SOMETHING WITH THE RETRIEVED DATA...

The Limit parameter specifies the maximum number of records to return.

And that’s it. Let’s take a look back and see what we have:

  • Drupal sites send log messages to syslog
  • Syslog-ng stores the messages in a file
  • Fluentd collect the logs from the file and send them to Kinesis
  • A consumer application reads the data out from the stream

These are the main steps to collect logs from multiple Drupal sites to Kinesis. It will probably take some time to configure it according to your expectations, but the skeleton is ready. Have fun!