Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Spring Batch uses a 'Chunk-oriented' processing style within its most common implementation. Chunk oriented processing refers to reading the data one at a time and creating 'chunks' that are written out within a transaction boundary. One item is read in from an ItemReader, handed to an ItemProcessor, and aggregated. Once the number of items read equals the commit interval, the entire chunk is written out by the ItemWriter, and then the transaction is committed. The following image shows the process:

Figure 2. Chunk-oriented Processing

The following code shows the same concepts shown:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

Configuring a Step

Despite the relatively short list of required dependencies for a Step, it is an extremely complex class that can potentially contain many collaborators.

When using java configuration, the Spring Batch builders can be used, as shown in the following example:
Java Configuration

/**
 * Note the JobRepository is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
    			.repository(jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * Note the TransactionManager is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
	return this.stepBuilderFactory.get("sampleStep")
				.transactionManager(transactionManager)
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

The configuration above includes the only required dependencies to create a item-oriented step:

  • reader: The ItemReader that provides items for processing.

  • writer: The ItemWriter that processes the items provided by the ItemReader.

  • transactionManager: Spring’s PlatformTransactionManager that begins and commits transactions during processing.

  • repository: The JobRepository that periodically stores the StepExecution and ExecutionContext during processing (just before committing).

  • chunk: Indicates that this is an item based step and the number of items to be processed before the transaction is committed.

It should be noted that repository defaults to jobRepository and transactionManager defaults to transactionManger (all provided through the infrastructure from @EnableBatchProcessing). Also, the ItemProcessor is optional, since the item could be directly passed from the reader to the writer.

The Commit Interval

As mentioned previously, a step reads in and writes out items, periodically committing using the supplied PlatformTransactionManager. With a commit-interval of 1, it commits after writing each individual item. This is less than ideal in many situations, since beginning and committing a transaction is expensive. Ideally, it is preferable to process as many items as possible in each transaction, which is completely dependent upon the type of data being processed and the resources with which the step is interacting. For this reason, the number of items that are processed within a commit can be configured. The following example shows a step whose tasklet has a commit-interval value of 10.
Java Configuration

@Bean
public Job sampleJob() {
    return this.jobBuilderFactory.get("sampleJob")
                     .start(step1())
                     .end()
                     .build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

In the preceding example, 10 items are processed within each transaction. At the beginning of processing, a transaction is begun. Also, each time read is called on the ItemReader, a counter is incremented. When it reaches 10, the list of aggregated items is passed to the ItemWriter, and the transaction is committed.

Configuring a Step for Restart

In the "Configuring and Running a Job" section , restarting a Job was discussed. Restart has numerous impacts on steps, and, consequently, may require some specific configuration.

Setting a Start Limit

There are many scenarios where you may want to control the number of times a Step may be started. For example, a particular Step might need to be configured so that it only runs once because it invalidates some resource that must be fixed manually before it can be run again. This is configurable on the step level, since different steps may have different requirements. A Step that may only be executed once can exist as part of the same Job as a Step that can be run infinitely. The following code fragment shows an example of a start limit configuration:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.startLimit(1)
				.build();
}

The step above can be run only once. Attempting to run it again causes a StartLimitExceededException to be thrown. Note that the default value for the start-limit is Integer.MAX_VALUE.

Restarting a Completed Step

In the case of a restartable job, there may be one or more steps that should always be run, regardless of whether or not they were successful the first time. An example might be a validation step or a Step that cleans up resources before processing. During normal processing of a restarted job, any step with a status of 'COMPLETED', meaning it has already been completed successfully, is skipped. Setting allow-start-if-complete to "true" overrides this so that the step always runs, as shown in the following example:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}
Step Restart Configuration Example

The following example shows how to configure a job to have steps that can be restarted:
Java Configuration

@Bean
public Job footballJob() {
	return this.jobBuilderFactory.get("footballJob")
				.start(playerLoad())
				.next(gameLoad())
				.next(playerSummarization())
				.end()
				.build();
}

@Bean
public Step playerLoad() {
	return this.stepBuilderFactory.get("playerLoad")
			.<String, String>chunk(10)
			.reader(playerFileItemReader())
			.writer(playerWriter())
			.build();
}

@Bean
public Step gameLoad() {
	return this.stepBuilderFactory.get("gameLoad")
			.allowStartIfComplete(true)
			.<String, String>chunk(10)
			.reader(gameFileItemReader())
			.writer(gameWriter())
			.build();
}

@Bean
public Step playerSummarization() {
	return this.stepBuilderFactor.get("playerSummarization")
			.startLimit(2)
			.<String, String>chunk(10)
			.reader(playerSummarizationSource())
			.writer(summaryWriter())
			.build();
}

The preceding example configuration is for a job that loads in information about football games and summarizes them. It contains three steps: playerLoad, gameLoad, and playerSummarization. The playerLoad step loads player information from a flat file, while the gameLoad step does the same for games. The final step, playerSummarization, then summarizes the statistics for each player, based upon the provided games. It is assumed that the file loaded by playerLoad must be loaded only once, but that gameLoad can load any games found within a particular directory, deleting them after they have been successfully loaded into the database. As a result, the playerLoad step contains no additional configuration. It can be started any number of times, and, if complete, is skipped. The gameLoad step, however, needs to be run every time in case extra files have been added since it last ran. It has 'allow-start-if-complete' set to 'true' in order to always be started. (It is assumed that the database tables games are loaded into has a process indicator on it, to ensure new games can be properly found by the summarization step). The summarization step, which is the most important in the job, is configured to have a start limit of 2. This is useful because if the step continually fails, a new exit code is returned to the operators that control job execution, and it can not start again until manual intervention has taken place.


This job provides an example for this document and is not the same as the footballJob found in the samples project.

The remainder of this section describes what happens for each of the three runs of the footballJob example.

Run 1:

  1. playerLoad runs and completes successfully, adding 400 players to the 'PLAYERS' table.

  2. gameLoad runs and processes 11 files worth of game data, loading their contents into the 'GAMES' table.

  3. playerSummarization begins processing and fails after 5 minutes.

Run 2:

  1. playerLoad does not run, since it has already completed successfully, and allow-start-if-complete is 'false' (the default).

  2. gameLoad runs again and processes another 2 files, loading their contents into the 'GAMES' table as well (with a process indicator indicating they have yet to be processed).

  3. playerSummarization begins processing of all remaining game data (filtering using the process indicator) and fails again after 30 minutes.

Run 3:

  1. playerLoad does not run, since it has already completed successfully, and allow-start-if-complete is 'false' (the default).

  2. gameLoad runs again and processes another 2 files, loading their contents into the 'GAMES' table as well (with a process indicator indicating they have yet to be processed).

  3. playerSummarization is not started and the job is immediately killed, since this is the third execution of playerSummarization, and its limit is only 2. Either the limit must be raised or the Job must be executed as a new JobInstance.

Configuring Skip Logic

There are many scenarios where errors encountered while processing should not result in Step failure, but should be skipped instead. This is usually a decision that must be made by someone who understands the data itself and what meaning it has. Financial data, for example, may not be skippable because it results in money being transferred, which needs to be completely accurate. Loading a list of vendors, on the other hand, might allow for skips. If a vendor is not loaded because it was formatted incorrectly or was missing necessary information, then there probably are not issues. Usually, these bad records are logged as well, which is covered later when discussing listeners.

The following example shows an example of using a skip limit:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(FlatFileParseException.class)
				.build();
}

In the preceding example, a FlatFileItemReader is used. If, at any point, a FlatFileParseException is thrown, the item is skipped and counted against the total skip limit of 10. Separate counts are made of skips on read, process, and write inside the step execution, but the limit applies across all skips. Once the skip limit is reached, the next exception found causes the step to fail. In other words, the eleventh skip triggers the exception, not the tenth.

One problem with the preceding example is that any other exception besides a FlatFileParseException causes the Job to fail. In certain scenarios, this may be the correct behavior. However, in other scenarios, it may be easier to identify which exceptions should cause failure and skip everything else, as shown in the following example:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(Exception.class)
				.noSkip(FileNotFoundException.class)
				.build();
}

By identifying java.lang.Exception as a skippable exception class, the configuration indicates that all Exceptions are skippable. However, by 'excluding' java.io.FileNotFoundException, the configuration refines the list of skippable exception classes to be all Exceptions except FileNotFoundException. Any excluded exception classes is fatal if encountered (that is, they are not skipped).

For any exception encountered, the skippability is determined by the nearest superclass in the class hierarchy. Any unclassified exception is treated as 'fatal'.

The order of the skip and noSkip calls does not matter.

Configuring Retry Logic

In most cases, you want an exception to cause either a skip or a Step failure. However, not all exceptions are deterministic. If a FlatFileParseException is encountered while reading, it is always thrown for that record. Resetting the ItemReader does not help. However, for other exceptions, such as a DeadlockLoserDataAccessException, which indicates that the current process has attempted to update a record that another process holds a lock on, waiting and trying again might result in success. In this case, retry should be configured as follows:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.retryLimit(3)
				.retry(DeadlockLoserDataAccessException.class)
				.build();
}

The Step allows a limit for the number of times an individual item can be retried and a list of exceptions that are 'retryable'. More details on how retry works can be found in retry.

Controlling Rollback

By default, regardless of retry or skip, any exceptions thrown from the ItemWriter cause the transaction controlled by the Step to rollback. If skip is configured as described earlier, exceptions thrown from the ItemReader do not cause a rollback. However, there are many scenarios in which exceptions thrown from the ItemWriter should not cause a rollback, because no action has taken place to invalidate the transaction. For this reason, the Step can be configured with a list of exceptions that should not cause rollback, as shown in the following example:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.noRollback(ValidationException.class)
				.build();
}
Transactional Readers

The basic contract of the ItemReader is that it is forward only. The step buffers reader input, so that in the case of a rollback, the items do not need to be re-read from the reader. However, there are certain scenarios in which the reader is built on top of a transactional resource, such as a JMS queue. In this case, since the queue is tied to the transaction that is rolled back, the messages that have been pulled from the queue are put back on. For this reason, the step can be configured to not buffer the items, as shown in the following example:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.readerIsTransactionalQueue()
				.build();
}

Transaction Attributes

Transaction attributes can be used to control the isolation, propagation, and timeout settings. More information on setting transaction attributes can be found in the Spring core documentation. The following example sets the isolation, propagation, and timeout transaction attributes:
Java Configuration

@Bean
public Step step1() {
	DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
	attribute.setPropagationBehavior(Propagation.REQUIRED.value());
	attribute.setIsolationLevel(Isolation.DEFAULT.value());
	attribute.setTimeout(30);

	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.transactionAttribute(attribute)
				.build();
}

Registering ItemStream with a Step

The step has to take care of ItemStream callbacks at the necessary points in its lifecycle (For more information on the ItemStream interface, see ItemStream). This is vital if a step fails and might need to be restarted, because the ItemStream interface is where the step gets the information it needs about persistent state between executions.

If the ItemReader, ItemProcessor, or ItemWriter itself implements the ItemStream interface, then these are registered automatically. Any other streams need to be registered separately. This is often the case where indirect dependencies, such as delegates, are injected into the reader and writer. A stream can be registered on the Step through the 'streams' element, as illustrated in the following example:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(compositeItemWriter())
				.stream(fileItemWriter1())
				.stream(fileItemWriter2())
				.build();
}

/**
 * In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
 * necessary, but used for an example.
 */
@Bean
public CompositeItemWriter compositeItemWriter() {
	List<ItemWriter> writers = new ArrayList<>(2);
	writers.add(fileItemWriter1());
	writers.add(fileItemWriter2());

	CompositeItemWriter itemWriter = new CompositeItemWriter();

	itemWriter.setDelegates(writers);

	return itemWriter;
}

In the example above, the CompositeItemWriter is not an ItemStream, but both of its delegates are. Therefore, both delegate writers must be explicitly registered as streams in order for the framework to handle them correctly. The ItemReader does not need to be explicitly registered as a stream because it is a direct property of the Step. The step is now restartable, and the state of the reader and writer is correctly persisted in the event of a failure.

Intercepting Step Execution

Just as with the Job, there are many events during the execution of a Step where a user may need to perform some functionality. For example, in order to write out to a flat file that requires a footer, the ItemWriter needs to be notified when the Step has been completed, so that the footer can be written. This can be accomplished with one of many Step scoped listeners.

Any class that implements one of the extensions of StepListener (but not that interface itself since it is empty) can be applied to a step through the listeners element. The listeners element is valid inside a step, tasklet, or chunk declaration. It is recommended that you declare the listeners at the level at which its function applies, or, if it is multi-featured (such as StepExecutionListener and ItemReadListener), then declare it at the most granular level where it applies. The following example shows a listener applied at the chunk level:
Java Configuration

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

An ItemReader, ItemWriter or ItemProcessor that itself implements one of the StepListener interfaces is registered automatically with the Step if using the namespace <step> element or one of the the *StepFactoryBean factories. This only applies to components directly injected into the Step. If the listener is nested inside another component, it needs to be explicitly registered (as described previously under Registering ItemStream with a Step).

In addition to the StepListener interfaces, annotations are provided to address the same concerns. Plain old Java objects can have methods with these annotations that are then converted into the corresponding StepListener type. It is also common to annotate custom implementations of chunk components such as ItemReader or ItemWriter or Tasklet. The annotations are analyzed by the XML parser for the <listener/> elements as well as registered with the listener methods in the builders, so all you need to do is use the XML namespace or builders to register the listeners with a step.

StepExecutionListener

StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it ends, whether it ended normally or failed, as shown in the following example:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus is the return type of afterStep in order to allow listeners the chance to modify the exit code that is returned upon completion of a Step.

The annotations corresponding to this interface are:

  • @BeforeStep

  • @AfterStep

ChunkListener

A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a 'chunk'. A ChunkListener can be used to perform logic before a chunk begins processing or after a chunk has completed successfully, as shown in the following interface definition:

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

The beforeChunk method is called after the transaction is started but before read is called on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (and not at all if there is a rollback).

The annotations corresponding to this interface are:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

A ChunkListener can be applied when there is no chunk declaration. The TaskletStep is responsible for calling the ChunkListener, so it applies to a non-item-oriented tasklet as well (it is called before and after the tasklet).

ItemReadListener

When discussing skip logic previously, it was mentioned that it may be beneficial to log the skipped records, so that they can be dealt with later. In the case of read errors, this can be done with an ItemReaderListener, as shown in the following interface definition:

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

The beforeRead method is called before each call to read on the ItemReader. The afterRead method is called after each successful call to read and is passed the item that was read. If there was an error while reading, the onReadError method is called. The exception encountered is provided so that it can be logged.

The annotations corresponding to this interface are:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

Just as with the ItemReadListener, the processing of an item can be 'listened' to, as shown in the following interface definition:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

The beforeProcess method is called before process on the ItemProcessor and is handed the item that is to be processed. The afterProcess method is called after the item has been successfully processed. If there was an error while processing, the onProcessError method is called. The exception encountered and the item that was attempted to be processed are provided, so that they can be logged.

The annotations corresponding to this interface are:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

The writing of an item can be 'listened' to with the ItemWriteListener, as shown in the following interface definition:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

The beforeWrite method is called before write on the ItemWriter and is handed the list of items that is written. The afterWrite method is called after the item has been successfully written. If there was an error while writing, the onWriteError method is called. The exception encountered and the item that was attempted to be written are provided, so that they can be logged.

The annotations corresponding to this interface are:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListener, ItemProcessListener, and ItemWriteListener all provide mechanisms for being notified of errors, but none informs you that a record has actually been skipped. onWriteError, for example, is called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items, as shown in the following interface definition:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead is called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite is called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.

The annotations corresponding to this interface are:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners and Transactions

One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue leading to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:

  1. The appropriate skip method (depending on when the error happened) is called only once per item.

  2. The SkipListener is always called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.