Some years ago I have written JSqlServerBulkInsert, which provides a small abstraction layer over the
SqlServerBulkCopy
class of the Microsoft SQL Server JDBC Driver. The idea of the library is to write
a small mapping for a POJO and fire data into SQL Server... without having to maintain Stored Procedures.
But in real life it's often not only about inserting data. You probably need to create additional relations
between entities or you'll need to perform updates in the data. Simply put, JSqlServerBulkInsert and the
underlying SqlServerBulkCopy
can't do this.
So in recent years I have switched to using Stored Procedures and passing Table-valued parameters for most of the performance critical SQL Server work I am doing. It's not really obvious with the Microsoft SQL Server JDBC driver, so I made up an example here:
The Scenario: Handling Device Measurements
Imagine we are monitoring an array of machines, that send large amounts of measurements. At one point our single insert statements don't scale anymore, the connection pool exhausts and we are running into timeouts. Ouch.
And how do we update measurements? Do we perform a SELECT
lookup for every single measurement?
Instead of single inserts, we need to batch our data and let the database handle the rest of it. So how do we get the data into the SQL Server database efficiently?
Table-valued Parameters!
The Data Model
A measurement of a device probably has something like a Device Identifier, Parameter Identifier, a Timestamp the value was taken at and the measured Value. This probably leads to a Java class like this:
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.sqlservertvpexample.model;
import java.sql.Timestamp;
public class DeviceMeasurement {
private String deviceId;
private String parameterId;
private Timestamp timestamp;
private double value;
public DeviceMeasurement() {
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getParameterId() {
return parameterId;
}
public void setParameterId(String parameterId) {
this.parameterId = parameterId;
}
public Timestamp getTimestamp() {
return timestamp;
}
public void setTimestamp(Timestamp timestamp) {
this.timestamp = timestamp;
}
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
}
Simulating Data
There are no real devices here, so we need a way to fake some data for a given time range. We can define an iterator, that yields timestamps in a given interval between a start date and an end date:
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.sqlservertvpexample.data;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Iterator;
public class DateTimeIterator implements Iterator<LocalDateTime> {
private LocalDateTime endDate;
private LocalDateTime currentDate;
private Duration interval;
public DateTimeIterator(LocalDateTime startDate, LocalDateTime endDate, Duration interval) {
this.endDate = endDate;
this.interval = interval;
this.currentDate = startDate;
}
@Override
public boolean hasNext() {
return currentDate.isBefore(endDate);
}
@Override
public LocalDateTime next() {
final LocalDateTime result = currentDate;
currentDate = currentDate.plus(interval);
return result;
}
}
Next we are writing a small class DataGenerator
, that generates random values between for a given time range:
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.sqlservertvpexample.data;
import de.bytefish.sqlservertvpexample.model.DeviceMeasurement;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class DataGenerator {
private static Random random = new Random();
private LocalDateTime startDate;
private LocalDateTime endDate;
private Duration interval;
public DataGenerator(LocalDateTime startDate, LocalDateTime endDate, Duration interval) {
this.startDate = startDate;
this.endDate = endDate;
this.interval = interval;
}
public Stream<DeviceMeasurement> generate(final String deviceId, final String parameterId, final double low, final double high) {
// For Creating the Measurement TimeSteps:
final DateTimeIterator iterator = new DateTimeIterator(startDate, endDate, interval);
// Create the Stream:
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.map(measurementTimeStamp -> createDeviceMeasurement(deviceId, parameterId, measurementTimeStamp, low, high));
}
private DeviceMeasurement createDeviceMeasurement(final String deviceId, final String parameterId, final LocalDateTime timestamp, final double low, final double high) {
// Generate a Random Value for the Sensor:
final double randomValue = low + (high - low) * random.nextDouble();
// Create the Measurement:
final DeviceMeasurement data = new DeviceMeasurement();
data.setDeviceId(deviceId);
data.setParameterId(parameterId);
data.setTimestamp(Timestamp.valueOf(timestamp));
data.setValue(randomValue);
return data;
}
}
We can now initialize a DataGenerator
and use the DataGenerator#generate
method to generate random values.
And that's it for the data!
Creating the SQL Server Database
On the database side we start by creating the Database, a sample
schema and the table [sample].[DeviceMeasurement]
,
which holds the measurements and obviously maps to the Java data model:
--
-- DATABASE
--
IF DB_ID('$(dbname)') IS NULL
BEGIN
CREATE DATABASE $(dbname)
END
GO
use $(dbname)
GO
--
-- SCHEMAS
--
IF NOT EXISTS (SELECT name from sys.schemas WHERE name = 'sample')
BEGIN
EXEC('CREATE SCHEMA sample')
END
GO
--
-- TABLES
--
IF NOT EXISTS
(SELECT * FROM sys.objects
WHERE object_id = OBJECT_ID(N'[sample].[DeviceMeasurement]') AND type in (N'U'))
BEGIN
CREATE TABLE [sample].[DeviceMeasurement](
[DeviceID] [NVARCHAR](50) NOT NULL,
[ParameterID] [NVARCHAR](50) NOT NULL,
[Timestamp] [DATETIME2],
[Value] [DECIMAL](18, 2)
);
END
GO
We want to make sure, that there is only one measurement for a given device, parameter and timestamp. It
also enables faster queries, so let's add a UNIQUE INDEX
on the columns DeviceID
, ParameterID
and Timestamp
:
--
-- INDEXES
--
IF EXISTS (SELECT name FROM sys.indexes WHERE name = N'UX_DeviceMeasurement')
BEGIN
DROP INDEX [UX_DeviceMeasurement] on [sample].[DeviceMeasurement];
END
GO
CREATE UNIQUE INDEX UX_DeviceMeasurement ON [sample].[DeviceMeasurement](DeviceID, ParameterID, Timestamp);
GO
The plan is to create a TYPE
and pass the data as a TABLE
into a Stored Procedure. The Stored Procedure can then
perform a MERGE
statement on the data batch. The Stored Procedures probably already references the TYPE, so in the DDL
Script we are first dropping the Stored Procedure:
--
-- STORED PROCEDURES
--
IF OBJECT_ID(N'[sample].[InsertOrUpdateDeviceMeasurements]', N'P') IS NOT NULL
BEGIN
DROP PROCEDURE [sample].[InsertOrUpdateDeviceMeasurements];
END
GO
Now we can define the TYPE [sample].[DeviceMeasurementType]
as a TABLE. It is typically just a 1 to 1 copy of the
destination table:
IF EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'DeviceMeasurementType')
BEGIN
DROP TYPE [sample].[DeviceMeasurementType];
END
GO
CREATE TYPE [sample].[DeviceMeasurementType] AS TABLE (
[DeviceID] [NVARCHAR](50) NOT NULL,
[ParameterID] [NVARCHAR](50) NOT NULL,
[Timestamp] [DATETIME2],
[Value] [DECIMAL](18, 2)
);
GO
And finally write the Stored Procedure [sample].[InsertOrUpdateDeviceMeasurements]
, which inserts or updates measurements in the database. You can
see, that it takes the [sample].[DeviceMeasurementType]
type as its only parameter:
CREATE PROCEDURE [sample].[InsertOrUpdateDeviceMeasurements]
@TVP [sample].[DeviceMeasurementType] ReadOnly
AS
BEGIN
SET NOCOUNT ON;
MERGE [sample].[DeviceMeasurement] AS TARGET USING @TVP AS SOURCE ON (TARGET.DeviceID = SOURCE.DeviceID) AND (TARGET.ParameterID = SOURCE.ParameterID) AND (TARGET.Timestamp = SOURCE.Timestamp)
WHEN MATCHED THEN
UPDATE SET TARGET.Value = SOURCE.Value
WHEN NOT MATCHED BY TARGET THEN
INSERT (DeviceID, ParameterID, Timestamp, Value)
VALUES (SOURCE.DeviceID, SOURCE.ParameterID, SOURCE.Timestamp, SOURCE.Value);
END
GO
Calling the Stored Procedure with Batched Data
What I usually add is a class I call "...BulkProcessor", which converts the DeviceMeasurement
into the SQLServerDataTable
and passes the data to the Stored Procedure.
It's quite simple to do:
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.sqlservertvpexample.processor;
import com.microsoft.sqlserver.jdbc.SQLServerCallableStatement;
import com.microsoft.sqlserver.jdbc.SQLServerDataTable;
import com.microsoft.sqlserver.jdbc.SQLServerException;
import de.bytefish.sqlservertvpexample.model.DeviceMeasurement;
import de.bytefish.sqlservertvpexample.utils.Tuple3;
import java.sql.Connection;
import java.sql.Types;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy;
public class DeviceMeasurementBulkProcessor {
private static final String SQL_COMMAND = "{call [sample].[InsertOrUpdateDeviceMeasurements] (?)}";
public void saveAll(Connection connection, Collection<DeviceMeasurement> source) throws Exception {
// In a Batch the Values have to be unique to satisfy the Unique Constraint in the Database,
// so we group them by multiple keys and then take the first value from each of the batches:
List<DeviceMeasurement> distinctDeviceMeasurements = source.stream()
.collect(groupingBy(x -> new Tuple3<>(x.getDeviceId(), x.getParameterId(), x.getTimestamp())))
.values().stream()
.map(x -> x.get(0))
.collect(Collectors.toList());
// Build the SQLServerDataTable:
SQLServerDataTable sqlServerDataTable = buildSqlServerDataTable(distinctDeviceMeasurements);
// And insert it:
try (SQLServerCallableStatement callableStmt = (SQLServerCallableStatement) connection.prepareCall(SQL_COMMAND)) {
callableStmt.setStructured(1, "[sample].[DeviceMeasurementType]", sqlServerDataTable);
callableStmt.execute();
}
}
private SQLServerDataTable buildSqlServerDataTable(Collection<DeviceMeasurement> deviceMeasurements) throws SQLServerException {
SQLServerDataTable tvp = new SQLServerDataTable();
tvp.addColumnMetadata("DeviceID", Types.NVARCHAR);
tvp.addColumnMetadata("ParameterID", Types.NVARCHAR);
tvp.addColumnMetadata("Timestamp", Types.TIMESTAMP);
tvp.addColumnMetadata("Value", Types.DECIMAL);
for (DeviceMeasurement deviceMeasurement : deviceMeasurements) {
tvp.addRow(
deviceMeasurement.getDeviceId(),
deviceMeasurement.getParameterId(),
deviceMeasurement.getTimestamp(),
deviceMeasurement.getValue()
);
}
return tvp;
}
}
The values in a batch need to be unique, so to group by Device, Parameter and Timestamp with Javas groupingBy
I
defined a class Tuple3<T1, T2, T3>
and have overriden the Object#equals
method:
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.sqlservertvpexample.utils;
import java.util.Objects;
public class Tuple3<T1, T2, T3> {
private final T1 t1;
private final T2 t2;
private final T3 t3;
public Tuple3(T1 t1, T2 t2, T3 t3) {
this.t1 = t1;
this.t2 = t2;
this.t3 = t3;
}
public T1 getT1() {
return t1;
}
public T2 getT2() {
return t2;
}
public T3 getT3() {
return t3;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Tuple3<?, ?, ?> tuple3 = (Tuple3<?, ?, ?>) o;
return Objects.equals(t1, tuple3.t1) && Objects.equals(t2, tuple3.t2) && Objects.equals(t3, tuple3.t3);
}
@Override
public int hashCode() {
return Objects.hash(t1, t2, t3);
}
}
Wiring All The Things
And finally it's time to wire things up. There are probably a million way to create batches from a stream of data, but
in this example I opted to use RxJava, which already has a buffer
operator to create data batches:
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.sqlservertvpexample;
import de.bytefish.sqlservertvpexample.data.DataGenerator;
import de.bytefish.sqlservertvpexample.model.DeviceMeasurement;
import de.bytefish.sqlservertvpexample.processor.DeviceMeasurementBulkProcessor;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Stream;
public class Program {
private static final String connectionUrl = "jdbc:sqlserver://localhost;instanceName=SQLEXPRESS;databaseName=SampleDatabase;user=philipp;password=test_pwd";
public static void main(String[] args) {
LocalDateTime startDate = LocalDateTime.of(2020, 1, 1, 0, 0, 0);
LocalDateTime endDate = LocalDateTime.of(2020, 12, 31, 0, 0, 0);
// Create the Batch Processor:
DeviceMeasurementBulkProcessor processor = new DeviceMeasurementBulkProcessor();
// Generate a Stream of data using a fake device sending each 15s for a year. This should
// generate something around 2 Million measurements:
Stream<DeviceMeasurement> measurementStream = new DataGenerator(startDate, endDate, Duration.ofSeconds(15))
.generate("device1", "parameter1", 10, 19);
// Write Data in 80000 Value Batches:
Disposable disposable = Observable
.fromStream(measurementStream)
.buffer(80_000)
.forEach(values -> writeBatch(processor, values));
// Being a good citizen by cleaning up RxJava stuff:
disposable.dispose();
}
private static void writeBatch(DeviceMeasurementBulkProcessor processor, List<DeviceMeasurement> values) throws Exception {
try {
internalWriteBatch(processor, values);
} catch(Exception e) {
throw new RuntimeException(e);
}
}
private static void internalWriteBatch(DeviceMeasurementBulkProcessor processor, List<DeviceMeasurement> values) throws Exception {
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
processor.saveAll(connection, values);
}
}
}
And that's it for now.