Acceso multiproceso a los beans de Job Scope en Spring Batch 3.0
En Spring Batch 3.0 estoy tratando de usar la nueva funcionalidad de Job Scope para beans en pasos particionados y multiproceso (configurados con una tarea: bean ejecutor), y en ambos casos obtengo la excepción
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:153)
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:338)
pero si hago el alcance del paso de frijoles, funciona bien.
Noté el comentario en JobSynchronizationManager que dice
nótese bien es responsabilidad de cada implementación de {@link Job} asegurarse de que un {@link JobContext} esté disponible en cada hilo que pueda estar involucrado en la ejecución de un trabajo, incluidos los hilos de trabajo de un grupo.
así que me pregunto si necesito hacer algo para configurar esto o si es un error en la implementación del alcance del trabajo que no configure los hilos de trabajo correctamente.
StepSynchronizationManager tiene un comentario similar, pero en ese caso algo obviamente está configurando los hilos correctamente dentro del paso.
Código de muestra para reproducir el problema:
TestItemReader
package test;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.InitializingBean;
public class TestItemReader implements ItemReader<Integer>, InitializingBean {
private List<Integer> items;
@Override
public synchronized Integer read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (items.size() > 0) {
return items.remove(0);
}
return null;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("Initialising reader");
items = new ArrayList<Integer>();
for (int i=0;i<100;i++) items.add(i);
}
}
TestItemWriter
package test;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
public class TestItemWriter implements ItemWriter<Integer> {
@Override
public void write(List<? extends Integer> items) throws Exception {
for (int i : items) {
System.out.println(Thread.currentThread().getName() + " Writing " + i);
}
}
}
test-job-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<job id="job" restartable="true" xmlns="http://www.springframework.org/schema/batch">
<step id="index">
<tasklet task-executor="executor">
<chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
</tasklet>
</step>
</job>
<bean id="itemReader" class="test.TestItemReader" scope="job"/>
<bean id="itemWriter" class="test.TestItemWriter"/>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean class="org.springframework.batch.test.JobLauncherTestUtils">
<property name="job" ref="job"/>
</bean>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<batch:job-repository id="jobRepository"/>
<jdbc:embedded-database id="dataSource" type="HSQL">
<jdbc:script location="classpath:/org/springframework/batch/core/schema-hsqldb.sql"/>
</jdbc:embedded-database>
<task:executor id="executor" queue-capacity="0" pool-size="5"/>
</beans>
JobTest
package test;
import java.util.Collection;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
import o,rg.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.ContextConfiguration;
@ContextConfiguration(locations={"test-job-context.xml"})
public class JobTest extends AbstractJUnit4SpringContextTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@BeforeClass
public static void beforeClassSetup() {
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger("org.springframework.batch.core.scope.JobScope").setLevel(Level.DEBUG);
Logger.getLogger("org.springframework.batch.core.scope.StepScope").setLevel(Level.DEBUG);
}
@Test
public void testJobLaunch() throws Exception {
JobExecution execution = jobLauncherTestUtils.launchJob();
System.out.println("After execution " + execution);
Collection<StepExecution> stepExecutions = execution.getStepExecutions();
for (StepExecution stepExecution : stepExecutions) {
System.out.println("StepExecution " + stepExecution);
}
}
}
La ejecución de la prueba JUnit anterior reproducirá el problema. Si cambia el alcance del lector a paso o elimina el alcance, la prueba se completa normalmente.