In my professional life I have often seen large CSV files, which need to be imported as efficient as possible. That's why I have developed JTinyCsvParser and PgBulkInsert for Java 1.8. Both provide a Streaming API and are very easy to combine.
In this example I will show you how to combine both libraries and insert a large dataset.
Dataset
In this post we are parsing and importing a real life dataset. It's the local weather data in March 2015 gathered
by all weather stations in the USA. You can obtain the data QCLCD201503.zip
from:
The File size is 557 MB
and it has 4,496,262
lines.
The CSV file has around 40
columns. We only want to use three columns of the file (WBAN
, Sky Condition
and Date
),
which correspond to the indices 0 (WBAN
), 1 (SkyCondition
) and 4 (Date
). It's not really necessary to know, what the
meaning behind the columns is, I just needed a large dataset.
Preparing the Database
Here is the SQL Script to create the table, where the data will be stored.
CREATE TABLE sample.unit_test
(
wban text,
sky_condition text,
date timestamp
);
Domain Model
The corresponding domain model in the application might look like this.
private class LocalWeatherData
{
private String wban;
private LocalDate date;
private String skyCondition;
public String getWban() {
return wban;
}
public void setWban(String wban) {
this.wban = wban;
}
public LocalDate getDate() {
return date;
}
public void setDate(LocalDate date) {
this.date = date;
}
public String getSkyCondition() {
return skyCondition;
}
public void setSkyCondition(String skyCondition) {
this.skyCondition = skyCondition;
}
}
TinyCsvParser
When using JTinyCsvParser you have to define a CsvMapping
, which defines how the domain model and the CSV file map to each other. You
can see, that the column indices in the CSV file map to the setters of the domain model.
The date in the CSV file has the format yyyyMMdd
, which is not the default date format of Java. That's why a LocalDateConverter
with a custom format is passed into the property mapping.
public class LocalWeatherDataMapper extends CsvMapping<LocalWeatherData>
{
public LocalWeatherDataMapper(IObjectCreator creator)
{
super(creator);
MapProperty(0, String.class, LocalWeatherData::setWban);
MapProperty(1, LocalDate.class, LocalWeatherData::setDate, new LocalDateConverter(DateTimeFormatter.ofPattern("yyyyMMdd")));
MapProperty(4, String.class, LocalWeatherData::setSkyCondition);
}
}
PgBulkInsert
Add the following dependencies to your pom.xml
to include PgBulkInsert in your project:
<dependency>
<groupId>de.bytefish</groupId>
<artifactId>pgbulkinsert</artifactId>
<version>1.4</version>
</dependency>
For the bulk inserts to PostgreSQL the mapping between the database table and the domain model needs to be defined. This is done by implementing
the abstract base class PgBulkInsert<TEntity>
.
We can now implement it for the Person
class, and again you can see how the getters of the domain model map to the column names of the
database table.
public class LocalWeatherDataBulkInserter extends PgBulkInsert<LocalWeatherData>
{
public LocalWeatherDataBulkInserter() {
super("sample", "unit_test");
MapString("wban", LocalWeatherData::getWban);
MapString("sky_condition", LocalWeatherData::getSkyCondition);
MapDate("date", LocalWeatherData::getDate);
}
}
Import Pipeline
Finally we can write the import pipeline, which uses JTinyCsvParser and PgBulkInsert to import the CSV data into PostgreSQL.
@Test
public void bulkInsertWeatherDataTest() throws SQLException {
// Do not process the CSV file in parallel (Java 1.8 bug!):
CsvParserOptions options = new CsvParserOptions(true, ",", false);
// The Mapping to employ:
LocalWeatherDataMapper mapping = new LocalWeatherDataMapper(() -> new LocalWeatherData());
// Construct the parser:
CsvParser<LocalWeatherData> parser = new CsvParser<>(options, mapping);
// Create the BulkInserter used for Bulk Inserts into the Database:
LocalWeatherDataBulkInserter bulkInserter = new LocalWeatherDataBulkInserter();
// Read the file. Make sure to wrap it in a try with resources block, so the file handle gets disposed properly:
try(Stream<CsvMappingResult<LocalWeatherData>> stream = parser.readFromFile(FileSystems.getDefault().getPath("C:\\Users\\philipp\\Downloads\\csv", "201503hourly.txt"), StandardCharsets.UTF_8)) {
// Filter the Stream of Mapping results, so only valid entries are processed:
Stream<LocalWeatherData> localWeatherDataStream = stream.filter(e -> e.isValid()).map(e -> e.getResult());
// Now bulk insert valid entries into the PostgreSQL database:
bulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), localWeatherDataStream);
}
// Check if we have the correct amount of rows in the DB:
Assert.assertEquals(4496262, getRowCount());
}
Explanation
A lot of things are going on here. First of all the CsvParser
is constructed. This is done by using the CsvParserOptions
and the CsvMapping
for the entity to parse. Then a PgBulkInsert
object is instantiated, which is the LocalWeatherDataBulkInserter
defined above.
Then the CSV file is parsed. You have to be careful and wrap the CsvParser<TEntity>.readFromFile
method in a try-with-resources statement,
so the File Handle gets disposed properly. The result of the parsing is a Stream<CsvMappingResult<LocalWeatherData>>
, which is then consumed by the LocalWeatherDataBulkInserter
. The
Stream<CsvMappingResult<LocalWeatherData>>
result type might look scary, but it basically only contains the parsed object and a flag if it is valid or not. A CSV file might contain invalid
entries (wrong formats), and you don't want to stop parsing because a single line is invalid.
We only want to import valid entities into the database, so the stream is further filtered for valid entries and then mapped to the result entities. This stream is then consumed by the
LocalWeatherDataBulkInserter
, which also takes a PGConnection. If you work with a java.sql.Connection
, then you can try to get the underlying PGConnection
with the utility
method in PostgreSqlUtils.getPGConnection
.
Conclusion
The example shows how to build a very efficient import pipeline with JTinyCsvParser and PgBulkInsert. Both projects are available from the Central Maven Repository, so they can be easily integrated into existing projects.
Full Source Code
TransactionalTestBase
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
import org.junit.After;
import org.junit.Before;
import java.sql.Connection;
import java.sql.DriverManager;
public abstract class TransactionalTestBase {
protected Connection connection;
@Before
public void setUp() throws Exception {
connection = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:5432/sampledb", "philipp", "test_pwd");
onSetUpBeforeTransaction();
connection.setAutoCommit(false); // Start the Transaction:
onSetUpInTransaction();
}
@After
public void tearDown() throws Exception {
onTearDownInTransaction();
connection.rollback();
onTearDownAfterTransaction();
connection.close();
}
protected void onSetUpInTransaction() throws Exception {}
protected void onSetUpBeforeTransaction() throws Exception {}
protected void onTearDownInTransaction() throws Exception {}
protected void onTearDownAfterTransaction() throws Exception {}
}
IntegrationTest
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
import de.bytefish.jtinycsvparser.CsvParser;
import de.bytefish.jtinycsvparser.CsvParserOptions;
import de.bytefish.jtinycsvparser.builder.IObjectCreator;
import de.bytefish.jtinycsvparser.mapping.CsvMapping;
import de.bytefish.jtinycsvparser.mapping.CsvMappingResult;
import de.bytefish.jtinycsvparser.typeconverter.LocalDateConverter;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.PgBulkInsert;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.util.PostgreSqlUtils;
import org.junit.Assert;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.stream.Stream;
public class IntegrationTest extends TransactionalTestBase {
@Override
protected void onSetUpInTransaction() throws Exception {
createTable();
}
private class LocalWeatherData
{
private String wban;
private LocalDate date;
private String skyCondition;
public String getWban() {
return wban;
}
public void setWban(String wban) {
this.wban = wban;
}
public LocalDate getDate() {
return date;
}
public void setDate(LocalDate date) {
this.date = date;
}
public String getSkyCondition() {
return skyCondition;
}
public void setSkyCondition(String skyCondition) {
this.skyCondition = skyCondition;
}
}
public class LocalWeatherDataBulkInserter extends PgBulkInsert<LocalWeatherData>
{
public LocalWeatherDataBulkInserter() {
super("sample", "unit_test");
MapString("wban", LocalWeatherData::getWban);
MapString("sky_condition", LocalWeatherData::getSkyCondition);
MapDate("date", LocalWeatherData::getDate);
}
}
public class LocalWeatherDataMapper extends CsvMapping<LocalWeatherData>
{
public LocalWeatherDataMapper(IObjectCreator creator)
{
super(creator);
MapProperty(0, String.class, LocalWeatherData::setWban);
MapProperty(1, LocalDate.class, LocalWeatherData::setDate, new LocalDateConverter(DateTimeFormatter.ofPattern("yyyyMMdd")));
MapProperty(4, String.class, LocalWeatherData::setSkyCondition);
}
}
@Test
public void bulkInsertWeatherDataTest() throws SQLException {
// Do not process the CSV file in parallel (Java 1.8 bug!):
CsvParserOptions options = new CsvParserOptions(true, ",", false);
// The Mapping to employ:
LocalWeatherDataMapper mapping = new LocalWeatherDataMapper(() -> new LocalWeatherData());
// Construct the parser:
CsvParser<LocalWeatherData> parser = new CsvParser<>(options, mapping);
// Create the BulkInserter used for Bulk Inserts into the Database:
LocalWeatherDataBulkInserter bulkInserter = new LocalWeatherDataBulkInserter();
// Read the file. Make sure to wrap it in a try with resources block, so the file handle gets disposed properly:
try(Stream<CsvMappingResult<LocalWeatherData>> stream = parser.readFromFile(FileSystems.getDefault().getPath("C:\\Users\\philipp\\Downloads\\csv", "201503hourly.txt"), StandardCharsets.UTF_8)) {
// Filter the Stream of Mapping results, so only valid entries are processed:
Stream<LocalWeatherData> localWeatherDataStream = stream.filter(e -> e.isValid()).map(e -> e.getResult());
// Now bulk insert valid entries into the PostgreSQL database:
bulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), localWeatherDataStream);
}
// Check if we have the correct amount of rows in the DB:
Assert.assertEquals(4496262, getRowCount());
}
private boolean createTable() throws SQLException {
String sqlStatement = "CREATE TABLE sample.unit_test\n" +
" (\n" +
" wban text,\n" +
" sky_condition text,\n" +
" date timestamp\n" +
" );";
Statement statement = connection.createStatement();
return statement.execute(sqlStatement);
}
private int getRowCount() throws SQLException {
Statement s = connection.createStatement();
ResultSet r = s.executeQuery("SELECT COUNT(*) AS rowcount FROM sample.unit_test");
r.next();
int count = r.getInt("rowcount");
r.close();
return count;
}
}