Timeseries Databases Part 2: Writing Data to InfluxDB from .NET

What is InfluxDB?

InfluxDB is a database written specifically for handling time series data.

The influxdata website writes on InfluxDB:

InfluxDB is a high-performance data store written specifically for time series data. It allows for high throughput ingest, compression and real-time querying of that same data. InfluxDB is written entirely in Go and it compiles into a single binary with no external dependencies.

In Part 1 of the series I have shown how to parse the DWD Open Data dataset, which will be used in this article.

All code to reproduce the article can be found in my GitHub repository at:

The Dataset

The DWD Open Data portal of the Deutscher Wetterdienst (DWD) gives access to the historical weather data in Germany. I decided to use all available historical air temperature data for Germany given in a 10 minute resolution (FTP Link).

If you want to recreate the example, you can find the list of CSV files in the GitHub repository at: GermanWeatherDataExample/Resources/files.md.

The DWD dataset is given as CSV files and has a size of approximately 25.5 GB.

Implementation

The InfluxDB Line Protocol

The data points are written using the InfluxDB Line Protocol, which has the following syntax:

<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]

The protocol has already been implemented for C# by the very good influxdb-csharp library:

Writing to InfluxDB: The LocalWeatherDataBatchProcessor

The LocalWeatherDataBatchProcessor is just a wrapper around the influxdb-csharp LineProtocolClient, which opens up a new connection to the database for each batch to write:

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using InfluxDB.LineProtocol.Client;
using InfluxDB.LineProtocol.Payload;

namespace InfluxExperiment.Influx.Client
{
    public class LocalWeatherDataBatchProcessor
    {
        private readonly string database;
        private readonly string connectionString;

        public LocalWeatherDataBatchProcessor(string connectionString, string database)
        {
            this.database = database;
            this.connectionString = connectionString;
        }

        public Task<LineProtocolWriteResult> WriteAsync(LineProtocolPayload source, CancellationToken cancellationToken = default(CancellationToken))
        {
            if(source == null)
            {
                return Task.FromResult(new LineProtocolWriteResult(true, string.Empty));
            }

            var client = new LineProtocolClient(new Uri(connectionString), database);

            return client.WriteAsync(source, cancellationToken);
        }
    }
}

Converting to LineProtocolPayload: The LocalWeatherDataConverter

What's left is converting from a list of LocalWeatherData items to the LineProtocolPayload. This is done by first converting each LocalWeatherData item to a LineProtocolPoint and then adding them to the LineProtocolPayload:

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Globalization;
using System.Threading.Tasks;
using InfluxDB.LineProtocol.Payload;
using Experiments.Common.Csv.Model;
using CsvLocalWeatherDataType = Experiments.Common.Csv.Model.LocalWeatherData;

namespace InfluxExperiment.Converters
{
    public static class LocalWeatherDataConverter
    {
        public static LineProtocolPayload Convert(IEnumerable<CsvLocalWeatherDataType> source)
        {
            if (source == null)
            {
                return null;
            }

            LineProtocolPayload payload = new LineProtocolPayload();

            foreach (var item in source)
            {
                var point = Convert(item);

                if (point != null)
                {
                    payload.Add(point);
                }
            }

            return payload;
        }

        public static LineProtocolPoint Convert(LocalWeatherData source)
        {
            if (source == null)
            {
                return null;
            }

            var fields = new Dictionary<string, object>();

            fields.AddFieldValue("air_temperature_at_2m", source.AirTemperatureAt2m);
            fields.AddFieldValue("air_temperature_at_5cm", source.AirTemperatureAt5cm);
            fields.AddFieldValue("dew_point_temperature_at_2m", source.DewPointTemperatureAt2m);
            fields.AddFieldValue("relative_humidity", source.RelativeHumidity);

            // No Measurements to be inserted:
            if (fields.Count == 0)
            {
                return null;
            }

            var tags = new Dictionary<string, string>
                {
                    {"station_identifier", source.StationIdentifier},
                    {"quality_code", source.QualityCode.ToString(CultureInfo.InvariantCulture)}
                };

            return new LineProtocolPoint("weather_measurement", new ReadOnlyDictionary<string, object>(fields), tags, source.TimeStamp);
        }

        private static void AddFieldValue(this IDictionary<string, object> dictionary, string key, object value)
        {
            if (value == null)
            {
                return;
            }

            dictionary.Add(key, value);
        }
    }

}

Connecting the Parts: The LINQ Import Pipeline

That's it already. The Import Pipeline now simply reads the CSV data, batches the valid records, converts each batch into a LineProtocolPayload and then writes the payload to InfluxDB using the LocalWeatherDataBatchProcessor:

private static void ProcessLocalWeatherData(string csvFilePath)
{
    if (log.IsInfoEnabled)
    {
        log.Info($"Processing File: {csvFilePath}");
    }

    // Construct the Batch Processor:
    var processor = new LocalWeatherDataBatchProcessor(ConnectionString, Database);

    // Access to the List of Parsers:
    var batches = Parsers
        // Use the LocalWeatherData Parser:
        .LocalWeatherDataParser
        // Read the File:
        .ReadFromFile(csvFilePath, Encoding.UTF8, 1)
        // Get the Valid Results:
        .Where(x => x.IsValid)
        // And get the populated Entities:
        .Select(x => x.Result)
        // Let's stay safe! Stop parallelism here:
        .AsEnumerable()
        // Evaluate:
        .Batch(10000)
        // Convert each Batch into a LineProtocolPayload:
        .Select(measurements => LocalWeatherDataConverter.Convert(measurements));

    foreach (var batch in batches)
    {
        try
        {
            var result = processor.WriteAsync(batch).GetAwaiter().GetResult();

            // Log all unsuccessful writes, but do not quit execution:
            if (!result.Success)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(result.ErrorMessage);
                }
            }
        }
        catch (Exception e)
        {
            // Some Pokemon Exception Handling here. I am seeing TaskCanceledExceptions with the 
            // InfluxDB .NET Client. At the same time I do not want to quit execution, because 
            // some batches fail:
            if (log.IsErrorEnabled)
            {
                log.Error(e, "Error occured writing InfluxDB Payload");
            }
        }
    }
}

I caught all exceptions here and simply log them, instead of rethrowing. I saw some non-reproducible TaskCanceledExceptions for the influxdb-csharp client, which may be due to my implementation. And I didn't want to quit the whole import because there is a problem writing one batch out of 400 million recods. In a real setup this should be handeled with greater care instead of just dropping the data!

I chose a batch size of 10,000, because the InfluxDB Documentation states:

We recommend writing points in batches of 5,000 to 10,000 points. Smaller batches, and more HTTP requests, will result in sub-optimal performance.

Turning the Knobs: InfluxDB Configuration

InfluxDB 1.7.1 with the default configuration was unable to import the entire dataset. It consumes too much memory under load and could not write the batches anymore. After reading through documentation I am quite confident, that the default shard duration and retention policy has to be adjusted, so that the shards do not stay in memory forever:

The default configuration of InfluxDB is optimized for realtime data with a short retention duration and a short shard duration. This makes InfluxDB chewing up the entire RAM, just because for the historic data too many shards are created and the cached data is never written to disk.

So I am now creating the database using DURATION set to infinite (inf) to keep measurements forever. The SHARD DURATION is set to 4 weeks for limiting the number of shards being created during the import. This can be done by running the following CREATE DATABASE statement:

CREATE DATABASE "weather_data" WITH DURATION inf REPLICATION 1 SHARD DURATION 4w NAME "weather_data_policy"

In the influxdb.conf I am setting the cache-snapshot-write-cold-duration to 5 seconds for flushing the caches more agressively:

cache-snapshot-write-cold-duration = "5s"

Results

InfluxDB was able to import the entire dataset. The final database has 398,704,931 measurements and has a file size of 7.91 GB. I was very impressed, that InfluxDB was able to compress the data from 25.5 GB raw CSV data to 7.91 GB!

In a later post we will look at the read behavior of InfluxDB.