How to quickly write an application to load thousands of records into DB using Spring Batch?

Let’s assume we want to measure air pollution in our city. We’ve got around 100 detectors located in different parts of the town. Measurement results are collected every 15 minutes. It gives us 9600 records a day...

|

Introduction

Let’s assume we want to measure air pollution in our city. We’ve got around 100 detectors located in different parts of the town. Measurement results are collected every 15 minutes. It gives us 9600 records a day. We want aggregate the data hourly or daily and do some analytics but first we need to load results of our measurements from the detectors into a relational database. The detector saves data in a flat file let’s say in CSV format. How to load such amount of data into DB quickly? Spring Batch is one of the available solutions. In the article I will show a simple application using this framework to solve the presented problem.

Concepts of Spring Batch

Basic concept of Spring Batch is a Job. JobParameters are a context in which a Job is executed. Single execution of a Job together with JobParemters creates a JobExectution. Information about each JobExection is stored in a JobRepository. In addition to that we can distinguish a JobLauncher, PlatformTransactionManager and of course DataSource.

Job consists of Steps. Among Steps data are processed in Chunks (piece of data). Typically during batch processing we want first read the data, process it and write back to some DB storage. For these three phases Reader, Processor and Writer are responsible respectively.

Input data format

CSV files we want to read contains a header in the first line, then we have the results of measurements:

1.  time,location,pm10,pm2_5,gas
2.  2016-12-01 12:00:01,aleje,7.4,2.5,11.3
3.  2016-12-01 12:15:01,aleje,8.8,2.9,11.5
4.  ...

Configuration of sample app

To configure Spring Batch we can use XML or Java. For the purpose of this app I’ve chosen the second option. The code below presents how DataSource is configured:

@Configuration
@EnableBatchProcessing
public class InMemoryInfrastructureConfiguration implements InfrastructureConfiguration {

    @Bean
    @Override
    public DataSource dataSource() {
        EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
        return embeddedDatabaseBuilder.addScript("classpath:sql/schema-drop-hsqldb.sql")
                .addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
                .addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
                .addScript("classpath:sql/schema-hsqldb.sql")
                .setType(EmbeddedDatabaseType.HSQL)
                .build();
    }
}

What interesting here is @EnableBatchProcessing annotation. It creates all necessary beans used by Spring Batch and we can focus on defining Jobs and Steps. It is worth to mention that Spring Batch requires special database tables to be created for keeping JobExecution info and similar. Passing schema-hsqldb.sql to the DataSource builder will prepare the DB (HSQLDB in our case) schema. Spring Batch includes corresponding scripts for the most popular RDBMSes.

Reading measurements

First we define a Job and a Step:

@Bean
public Job saveMeasurementsJob() {
    return jobBuilders.get("saveMeasurementsJob")
            .start(step())
            .build();
}
    
@Bean
public Step step() {
    return stepBuilders.get("step").<Measurement, Measurement>chunk(500)
            .reader(multiResourceReader())
            .writer(jdbcBatchItemWriter())
            .build();
}

The size of Chunk was set to 500 which means that 500 items will we be processed at once. One Item corresponds to one line from the CSV file.

For reading items we need to have a Reader:

@Bean
public ItemReader<Measurement> multiResourceReader() {
    MultiResourceItemReader<Measurement> itemReader = new MultiResourceItemReader<>();
    itemReader.setResources(resources);
    itemReader.setDelegate(cvsFileItemReader());
    return itemReader;
}

@Bean
public FlatFileItemReader<Measurement> cvsFileItemReader() {
    FlatFileItemReader<Measurement> itemReader = new FlatFileItemReader<>();
    itemReader.setLinesToSkip(linesToSkip);	
    itemReader.setLineMapper(lineMapper());
    return itemReader;
}

In our case we use MultiResourceItemReader to read items from multiple files (measurements from one location are kept is separate file – each detector produces own file). To read one particular CVS file FlatFileItemReader is used. At the definition we can pass how many lines from the beginning of the file we want to skip. Besides skipped lines, every line from the input file must be mapped to the class instance representing a given item. We can achieve that with the following beans:

@Bean
public LineMapper<Measurement> lineMapper() {
    DefaultLineMapper<Measurement> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(delimitedLineTokenizer());
    lineMapper.setFieldSetMapper(beanWrapperFieldSetMapper());
    return lineMapper;
}

@Bean
public LineTokenizer delimitedLineTokenizer() {
    DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    lineTokenizer.setNames(fieldNames);
    return lineTokenizer;
}
    
@Bean
public FieldSetMapper<Measurement> beanWrapperFieldSetMapper() {
    BeanWrapperFieldSetMapper<Measurement> fieldSetMapper = 
        new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(Measurement.class);
    fieldSetMapper.setCustomEditors(Collections.singletonMap(Date.class, 
        customDateEditor()));
    return fieldSetMapper;
}
    
@Bean
public PropertyEditor customDateEditor() {
    return new CustomDateEditor(new SimpleDateFormat(dateFormat), false);
}

First of all we need to split the line to tokens with DelimitedLineTokenizer (the default separator is a comma) and then map tokens to the properties of our Measurement bean. Furthermore, Spring Batch needs to know how to parse date/time from the first field of the input file so we added CustomDateEditor with desired date format.

Storing into database

We’ve created reading flow of our app. Now it’s time for storing items (finally!). It’s far way easier than reading part:

@Value("insert into measurement (time, location, pm10, pm2_5, gas) values (:time, :location, :pm10Concentration, :pm2_5Concentration, :gasConcentration)")
private String insertMeasurementSql;

@Bean
public ItemWriter<Measurement> jdbcBatchItemWriter() {
    JdbcBatchItemWriter<Measurement> itemWritter = new JdbcBatchItemWriter<>();
    itemWritter.setDataSource(infrastructureConfiguration.dataSource());
    itemWritter.setSql(insertMeasurementSql);
    itemWritter.setItemSqlParameterSourceProvider(
        beanPropertyItemSqlParameterSourceProvider());
    return itemWritter;
}
    
@Bean
public ItemSqlParameterSourceProvider<Measurement> beanPropertyItemSqlParameterSourceProvider() {
    return new BeanPropertyItemSqlParameterSourceProvider<Measurement>();
}

Thanks to BeanPropertyItemSqlParameterSourceProvider we can match Measurement class fields directly to the parameters in the SQL statement.

Testing the flow

We can try run our application now. For that purpose I’ve written a unit test. You can also go to the command line and try out CommandLineJobRunner which has a main method. As a first parameter you can pass either Configuration class name or XML application context file. More detailed description you can find in the Spring Batch documentation.

The unit test presents as follow:

@ContextConfiguration(classes={InMemoryInfrastructureConfiguration.class, CsvMeasurementsToDbJobConfiguration.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class CsvMeasurementsToDbJobIntegrationTest {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job job;

    @Autowired
    private JobRepository jobRepository;

    @Test
    public void testLaunchJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
        jobLauncher.run(job, jobParameters);
        assertThat(jobRepository.getLastJobExecution("saveMeasurementsJob", jobParameters).getExitStatus(),
	        is(ExitStatus.COMPLETED));
    }    
}

Summarize

I hope in this short article I encouraged you to take a look closer to Spring Batch framework. Complete sources are available on my github. Have a nice further exploring!

 

Paweł Weselak

Senior Java Developer. In IT industry since 2010. Sharing his knowledge and experiences Pawel finds the most rewarding in everyday work. He doesn't underestimate importance of developing soft skills in the job of software engineer. He is keen on Machine Learning and data analysis. His free time Pawel spends on traveling, dancing, snowboarding and striking up new acquaintances.

pawel.weselak@j-labs.pl

Did you like this article?

How to quickly write an application to load thousands of records into DB using Spring Batch?