JSR 352 con Liberty Profile: cómo implementar puntos de control cuando ItemReader realiza una consulta de base de datos

Tengo 10 registros en mi tabla de origen y tengo un recuento de elementos como 3.

Tengo 2 particiones para procesar estos 10 registros (es decir, los primeros 5 registros se procesarán en la primera partición y los registros restantes se procesarán en la segunda partición mientras se procesan los registros en la segunda partición). cuando reinicio el trabajo, la partición fallida está procesando todos los registros nuevamente (es decir, el primer fragmento y el segundo fragmento). El reinicio del trabajo solo debe procesarse desde los últimos registros de fragmento fallidos, pero no todos los registros de esa partición. ¿Puede guiarme? ¿Cómo lograr esto?

Mi JSL es como a continuación:

    <?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
    xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
    id="readingfrom-db" restartable="true" version="1.0" >
    <properties >
        <property name="numRec" value="#{jobParameters['numRec']}?:5;"/>        
        <property name="chunkSize" value="#{jobParameters['chunkSize']}?:3;"/>
        <property name="whereclauseFrom" value="#{jobParameters['whereclauseFrom']}?:5;"/>
        <property name="whereclauseTo" value="#{jobParameters['whereclauseTo']}?:6;"/>      
        <property name="dsJNDI" value="#{jobParameters['dsJNDI']}?:jdbc/db2;"/>
        <property name="dsJNDI1" value="#{jobParameters['dsJNDI1']}?:jdbc/db2;"/>
        <property name="tableName" value="#{jobParameters['tableName']}?:CISDW.AIF1_CH;"/>
        <property name="ProcesstableName" value="#{jobParameters['ProcesstableName']}?:CISDW.PROC_AIF1_CH;"/>
    </properties>
    <step id="runcache" next="readFromDB">
        <batchlet ref="com.cdc.runcache.CacheRunnerBatchlet" />
    </step>
    <step id="readFromDB">
        <listeners>
            <listener ref="com.cdc.dbreader.LogExceptionListener"/>
        </listeners> 
        <chunk item-count="3" checkpoint-policy="item">
            <reader ref="com.cdc.dbreader.DBItemReader">
                <properties >
                    <property name="dsJNDI" value="#{jobProperties['dsJNDI']}"/>
                    <property name="tableName" value="#{jobProperties['tableName']}"/>
                    <property name="whereclauseFrom" value="#{partitionPlan['modrec']}"/>                   
                </properties>
            </reader>
            <processor ref="com.cdc.dbreader.DBItemProcessor" />            
            <writer ref="com.cdc.dbreader.DBItemWriter">
                <properties >
                    <property name="dsJNDI" value="#{jobProperties['dsJNDI1']}"/>
                    <property name="tableName" value="#{jobProperties['ProcesstableName']}"/>
                </properties>
            </writer>
        </chunk>
        <partition>
            <plan partitions="2" threads="2">
                <properties partition="0">
                    <property name="modrec" value="#{jobProperties['whereclauseFrom']}"/>                   
                </properties>
                <properties partition="1">
                    <property name="modrec" value="#{jobProperties['whereclauseTo']}"/>
                </properties>
            </plan>
        </partition>        
    </step>                     
</job>

Mi lector de artículos es el siguiente:

 public class DBItemReader implements ItemReader {  
    @Inject
    @BatchProperty
    private String dsJNDI;

    @Inject
    @BatchProperty
    private String whereclauseFrom;


    @Inject
    @BatchProperty
    private String tableName;

    private Connection conn =null;
    private int totalRecords=0;

    private DataSource ds = null;
    List<RecObj> listRecObj=new ArrayList<RecObj>();    

    @Override
    public Object readItem() throws SQLException {
         if (listRecObj.size() == 0) {             
             return null;
         } else { 
             RecObj rec =null;           
             Iterator<RecObj> iter =listRecObj.iterator();
             while (iter.hasNext()) {               
                rec = iter.next();               

               if (Integer.parseInt(rec.getRec())  == 7) {                      
                  throw new IllegalStateException("Thrown Error");
                }
                iter.remove();
                return rec;
             }
             return rec;
         }


     @Override
    public void open(Serializable arg0) throws NamingException, SQLException {
          ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI)); 
//        System.out.println("whereclauseFrom: " + whereclauseFrom);          
          conn = ds.getConnection(); 
          String sql ="";
          if(Integer.parseInt(whereclauseFrom) == 5){
              sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) <= "+ whereclauseFrom;
          }else if(Integer.parseInt(whereclauseFrom) == 6){
              sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) >= "+ whereclauseFrom;
          }

          PreparedStatement ps = conn.prepareStatement(sql);
          ResultSet rs=ps.executeQuery();
          while(rs.next()){
             totalRecords++;
             String rec=rs.getString("REC"); 
             if(rec != null)
                listRecObj.add(new RecObj(rec));

          }          
          rs.close();          
    }   
    @Override
    public void close() throws SQLException {
        conn.close();       
    }   
    @Override
    public Serializable checkpointInfo() {       
            return null;
    }

}
    }

Mi clase de escritor es como a continuación:

public class DBItemWriter extends AbstractItemWriter implements ItemWriter {    
    @Inject
    @BatchProperty
    private String dsJNDI;

    @Inject
    @BatchProperty
    private String tableName;

    private DataSource ds = null; 

    @Override
    public void open(Serializable arg0) throws NamingException {
         ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));            
    }

    @Override
    public void writeItems(List<java.lang.Object> items) throws BatchUpdateException,SQLException{      
        Connection conn = ds.getConnection();           
        String sql = "INSERT INTO "+tableName+ "(MOD_REC) VALUES(?) ";       
        PreparedStatement ps = conn.prepareStatement(sql);        
        for (Object obj : items) {          
            RecObj v = (RecObj)obj;
            System.out.println("=======Writer values===="+v.getRec());                
            ps.setString(1, v.getRec());            
            ps.addBatch();
        }        
        ps.executeBatch();
        ps.clearBatch();
        ps.close();
        conn.close();
    }
}

A continuación se muestra mi procesador:

public class DBItemProcessor implements ItemProcessor {
    Integer count=0;   
    @Override
    public Object processItem(Object arg0) {
        count++;
        RecObj v=(RecObj)arg0;
        String vname=v.getRec();
        System.out.println("=========Processer Values==="+vname);
        return new RecObj(vname+count);
    }
}

Abajo está mi clase de frijoles

public class RecObj {
   private String rec;


  public RecObj(String rec) {
    this.rec=rec;
}

Respuestas a la pregunta(1)

Su respuesta a la pregunta