In this article I am going to show how to write a custom Apache Flink SinkFunction, that bulk writes results of a DataStream into a PostgreSQL database.
What we are going to build
Processed data often needs to be written into a relational database, simply because SQL makes it easy to work with data. Often enough you will also need to generate reports for customers or feed an existing application, which uses the relational database.
A custom data sink for Apache Flink needs to implement the SinkFunction interface. If a resource needs to be opened and closed, then a RichSinkFunction needs to be implemented.
Source Code
You can find the full source code for the example in my git repository at:
PostgreSQL SinkFunction
BasePostgresSink
We start by implementing the abstract base class BasePostgresSink<TEntity>
. It implements the RichSinkFunction, so it can create
a new BulkProcessor
when opening the Sink, and close the BulkProcessor
when closing the Sink.
You may wonder, why I don't pass the BulkProcessor
as a dependency into the base class. It's simply because Apache Flink serializes and distributes
the RichSinkFunction to each of its workers. That's why the BulkProcessor
is created inside of the RichSinkFunction, because all members of a
RichSinkFunction need to be Serializable.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package stream.sinks.pgsql;
import de.bytefish.pgbulkinsert.IPgBulkInsert;
import de.bytefish.pgbulkinsert.pgsql.processor.BulkProcessor;
import de.bytefish.pgbulkinsert.pgsql.processor.handler.BulkWriteHandler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import pgsql.connection.PooledConnectionFactory;
import java.net.URI;
public abstract class BasePostgresSink<TEntity> extends RichSinkFunction<TEntity> {
private final URI databaseUri;
private final int bulkSize;
private BulkProcessor<TEntity> bulkProcessor;
public BasePostgresSink(URI databaseUri, int bulkSize) {
this.databaseUri = databaseUri;
this.bulkSize = bulkSize;
}
@Override
public void invoke(TEntity entity) throws Exception {
bulkProcessor.add(entity);
}
@Override
public void open(Configuration parameters) throws Exception {
this.bulkProcessor = new BulkProcessor<>(new BulkWriteHandler<>(getBulkInsert(), new PooledConnectionFactory(databaseUri)), bulkSize);
}
@Override
public void close() throws Exception {
bulkProcessor.close();
}
protected abstract IPgBulkInsert<TEntity> getBulkInsert();
}
PooledConnectionFactory
The BulkProcessor
of PgBulkInsert needs a way to obtain a Connection
for the database access. I don't like reinventing
the wheel, so in my projects I simply use the great DBCP2 project for handling database connections.
You can add the following dependencies to your pom.xml
to include DBCP2 in your project:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.0.1</version>
</dependency>
The Connection Factory for the BulkProcessor
can then be implemented like this.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package pgsql.connection;
import de.bytefish.pgbulkinsert.functional.Func1;
import org.apache.commons.dbcp2.BasicDataSource;
import java.net.URI;
import java.sql.Connection;
public class PooledConnectionFactory implements Func1<Connection> {
private final BasicDataSource connectionPool;
public PooledConnectionFactory(URI databaseUri) {
this.connectionPool = new BasicDataSource();
initializeConnectionPool(connectionPool, databaseUri);
}
private void initializeConnectionPool(BasicDataSource connectionPool, URI databaseUri) {
final String dbUrl = "jdbc:postgresql://" + databaseUri.getHost() + databaseUri.getPath();
if (databaseUri.getUserInfo() != null) {
connectionPool.setUsername(databaseUri.getUserInfo().split(":")[0]);
connectionPool.setPassword(databaseUri.getUserInfo().split(":")[1]);
}
connectionPool.setDriverClassName("org.postgresql.Driver");
connectionPool.setUrl(dbUrl);
connectionPool.setInitialSize(1);
}
@Override
public Connection invoke() throws Exception {
return connectionPool.getConnection();
}
}
Example
Database Setup
First of all we are going to write the DDL scripts for creating the database schema and tables.
Schema
I am using schemas to keep my database clean and so should you. A database schema logically groups the objects such as tables, views,
stored procedures, ... and makes it possible to assign user permissions to the schema. In this example the sample
schema is going
to contain the tables for the station and measurement data.
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'sample') THEN
CREATE SCHEMA sample;
END IF;
END;
$$;
Tables
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'sample'
AND table_name = 'station'
) THEN
CREATE TABLE sample.station
(
station_id SERIAL PRIMARY KEY,
wban VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
state VARCHAR(255),
location VARCHAR(255),
latitude REAL NOT NULL,
longitude REAL NOT NULL,
ground_height SMALLINT,
station_height SMALLINT,
TimeZone SMALLINT
);
END IF;
IF NOT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'sample'
AND table_name = 'weather_data'
) THEN
CREATE TABLE sample.weather_data
(
wban VARCHAR(255),
dateTime TIMESTAMP,
temperature REAL,
windSpeed REAL,
stationPressure REAL,
skyCondition VARCHAR(255)
);
END IF;
END;
$$;
Constraints
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uk_station_wban') THEN
ALTER TABLE sample.station
ADD CONSTRAINT uk_station_wban
UNIQUE (wban);
END IF;
END;
$$;
Security
DO $$
BEGIN
REVOKE ALL ON sample.station FROM public;
REVOKE ALL ON sample.weather_data FROM public;
END;
$$;
Station Data
We are not going to persist the station data in the SQL database. That's why the Stations are initially populated with a simple insert script, which was generated from the original data. It contains all the informations, which is also set in the domain model.
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'sample'
AND table_name = 'station'
) THEN
INSERT INTO sample.station(wban, name, state, location, latitude, longitude, ground_height, station_height, timeZone)
SELECT '00100', 'ARKADELPHIA', 'AR', 'DEXTER B FLORENCE MEM FLD AP', 34.09972, -93.06583, 182, NULL, -6
WHERE NOT EXISTS (SELECT 1 FROM sample.station WHERE wban='00100');
-- ... station data
END IF;
END
$$;
Creating a Deployment Script
You could copy and paste the above scripts for this tutorial. This is totally OK for small applications, but it won't scale for any real project. Believe me, you need to automate the task of creating and migrating a database as early as possible in your project.
I am working in a Windows environment right now, so I have used a Batch file to automate the database setup. There is no magic going on, I am just
setting the path to psql
and use the PGPASSWORD
environment variable to pass the password to the command line.
@echo off
:: Copyright (c) Philipp Wagner. All rights reserved.
:: Licensed under the MIT license. See LICENSE file in the project root for full license information.
set PGSQL_EXECUTABLE="C:\Program Files\PostgreSQL\9.4\bin\psql.exe"
set STDOUT=stdout.log
set STDERR=stderr.log
set LOGFILE=query_output.log
set HostName=localhost
set PortNumber=5432
set DatabaseName=sampledb
set UserName=philipp
set Password=
call :AskQuestionWithYdefault "Use Host (%HostName%) Port (%PortNumber%) [Y,n]?" reply_
if /i [%reply_%] NEQ [y] (
set /p HostName="Enter HostName: "
set /p PortNumber="Enter Port: "
)
call :AskQuestionWithYdefault "Use Database (%DatabaseName%) [Y,n]?" reply_
if /i [%reply_%] NEQ [y] (
set /p ServerName="Enter Database: "
)
call :AskQuestionWithYdefault "Use User (%UserName%) [Y,n]?" reply_
if /i [%reply_%] NEQ [y] (
set /p UserName="Enter User: "
)
set /p PGPASSWORD="Password: "
1>%STDOUT% 2>%STDERR% (
:: Schemas
%PGSQL_EXECUTABLE% -h %HostName% -p %PortNumber% -d %DatabaseName% -U %UserName% < 01_Schemas/schema_sample.sql -L %LOGFILE%
:: Tables
%PGSQL_EXECUTABLE% -h %HostName% -p %PortNumber% -d %DatabaseName% -U %UserName% < 02_Tables/tables_sample.sql -L %LOGFILE%
:: Keys
%PGSQL_EXECUTABLE% -h %HostName% -p %PortNumber% -d %DatabaseName% -U %UserName% < 03_Keys/keys_sample.sql -L %LOGFILE%
:: Security
%PGSQL_EXECUTABLE% -h %HostName% -p %PortNumber% -d %DatabaseName% -U %UserName% < 05_Security/security_sample.sql -L %LOGFILE%
:: Data
%PGSQL_EXECUTABLE% -h %HostName% -p %PortNumber% -d %DatabaseName% -U %UserName% < 06_Data/data_sample_stations.sql -L %LOGFILE%
)
goto :end
:: The question as a subroutine
:AskQuestionWithYdefault
setlocal enableextensions
:_asktheyquestionagain
set return_=
set ask_=
set /p ask_="%~1"
if "%ask_%"=="" set return_=y
if /i "%ask_%"=="Y" set return_=y
if /i "%ask_%"=="n" set return_=n
if not defined return_ goto _asktheyquestionagain
endlocal & set "%2=%return_%" & goto :EOF
:end
pause
PostgreSQL Model
You have already seen, that we are building a separate model for each use case. This keeps the analysis model clean and so we do not leak any
database related modelling into the analysis model (foreign keys, column names, ...). Note, that the measurement time should be stored as a
PostgreSQL timestamp
, so it doesn't have any timezone information. The data in the station table already holds the relevant timezone offset.
I have decided against using the Primary Key of a Station as a Foreign Key constraint, because such measurement data should be stored as fast as possible, without being eventually slowed down by Foreign Key constraints.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package pgsql.model;
import java.time.LocalDateTime;
public class LocalWeatherData {
private String wban;
private LocalDateTime dateTime;
private Float temperature;
private Float windSpeed;
private Float stationPressure;
private String skyCondition;
public LocalWeatherData(String wban, LocalDateTime dateTime, Float temperature, Float windSpeed, Float stationPressure, String skyCondition) {
this.wban = wban;
this.dateTime = dateTime;
this.temperature = temperature;
this.windSpeed = windSpeed;
this.stationPressure = stationPressure;
this.skyCondition = skyCondition;
}
public String getWban() {
return wban;
}
public LocalDateTime getDateTime() {
return dateTime;
}
public Float getTemperature() {
return temperature;
}
public Float getWindSpeed() {
return windSpeed;
}
public Float getStationPressure() {
return stationPressure;
}
public String getSkyCondition() {
return skyCondition;
}
}
PgBulkInsert Database Mapping
The BasePostgresSink
function has to implement a function, that returns a PgBulkInsert<TEntity>
. A PgBulkInsert<TEntity>
in PgBulkInsert simply
defines the mapping between a database table and the domain model.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package pgsql.mapping;
import de.bytefish.pgbulkinsert.PgBulkInsert;
public class LocalWeatherDataBulkInsert extends PgBulkInsert<pgsql.model.LocalWeatherData> {
public LocalWeatherDataBulkInsert(String schemaName, String tableName) {
super(schemaName, tableName);
mapString("wban", pgsql.model.LocalWeatherData::getWban);
mapTimeStamp("dateTime", pgsql.model.LocalWeatherData::getDateTime);
mapReal("temperature", pgsql.model.LocalWeatherData::getTemperature);
mapReal("windSpeed", pgsql.model.LocalWeatherData::getWindSpeed);
mapReal("stationPressure", pgsql.model.LocalWeatherData::getStationPressure);
mapString("skyCondition", pgsql.model.LocalWeatherData::getSkyCondition);
}
}
LocalWeatherDataPostgresSink
With the PostgreSQL domain model defined, the BasePostgresSink
and the PgBulkInsert
mapping, the LocalWeatherDataPostgresSink
for the example can easily be implemented.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package stream.sinks.pgsql;
import de.bytefish.pgbulkinsert.IPgBulkInsert;
import java.net.URI;
public class LocalWeatherDataPostgresSink extends BasePostgresSink<pgsql.model.LocalWeatherData> {
public LocalWeatherDataPostgresSink(URI databaseUri, int bulkSize) {
super(databaseUri, bulkSize);
}
@Override
protected IPgBulkInsert<pgsql.model.LocalWeatherData> getBulkInsert() {
return new pgsql.mapping.LocalWeatherDataBulkInsert("sample", "weather_data");
}
}
Plugging it into the DataStream
Once the SinkFunction is written, it can be plugged into the existing DataStream pipeline. In the example the general DataStream<model.LocalWeatherData>
is first transformed into a DataStream
// Converts the general stream into the Postgres-specific representation:
DataStream<pgsql.model.LocalWeatherData> pgsqlDailyMaxTemperature = maxTemperaturePerDay
.map(new MapFunction<model.LocalWeatherData, pgsql.model.LocalWeatherData>() {
@Override
public pgsql.model.LocalWeatherData map(model.LocalWeatherData localWeatherData) throws Exception {
return pgsql.converter.LocalWeatherDataConverter.convert(localWeatherData);
}
});
// Add a new Postgres Sink with a Bulk Size of 1000 entities:
pgsqlDailyMaxTemperature.addSink(new LocalWeatherDataPostgresSink(URI.create("postgres://philipp:test_pwd@127.0.0.1:5432/sampledb"), 1000));
Converter
The LocalWeatherDataConverter
simply takes a model.LocalWeatherData
and converts it into a pgsql.model.LocalWeatherData
.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package pgsql.converter;
import java.time.LocalDateTime;
public class LocalWeatherDataConverter {
public static pgsql.model.LocalWeatherData convert(model.LocalWeatherData modelLocalWeatherData) {
String wban = modelLocalWeatherData.getStation().getWban();
LocalDateTime dateTime = modelLocalWeatherData.getDate().atTime(modelLocalWeatherData.getTime());
Float temperature = modelLocalWeatherData.getTemperature();
Float windSpeed = modelLocalWeatherData.getWindSpeed();
Float stationPressure = modelLocalWeatherData.getStationPressure();
String skyCondition = modelLocalWeatherData.getSkyCondition();
return new pgsql.model.LocalWeatherData(wban, dateTime, temperature, windSpeed, stationPressure, skyCondition);
}
}
Conclusion
In this article you have seen how to write a custom SinkFunction for Apache Flink. In the next article you will see how to write a custom SinkFunction for writing into an Elasticsearch database and visualize the results with Kibana, which is a Frontend to Elasticsearch.