본문 바로가기

Programming/Java & Spring 관련 내용 정리

[Spring Batch] 오류가 나면 어떻게 해야할까? - Skip과 Retry

 

 

 

 

스프링 배치에서는 

Skip이라는 기능을 통해

Step 내에서 발생한 특정 Exception에 대해

해당 Exception을 n번까지 허용하겠다는 설정을 할 수가 있다.

 

 

예를들어

skip(NotFoundNameExcpetion.class).skipLimit(3) 으로 설정한 경우

  • NotFoundNameExcpetion 발생 3번까지는 에러를 skip 한다.
  • NotFoundNameExcpetion 발생 4번째부터는 Job과 Step의 상태가 실패로 끝나며, 배치가 중지된다.
  • 단, 에러가 발생하기 전까지 데이터는 모두 처리된 상태로 남는다.

 

 

Step은 chunk 1개 기준으로 Transaction이 동작한다.

  • 예를 들어 items = 100, chunk.size = 10 일 때 총 chunk 동작 횟수는 10회이다.
  • chunk 1-9번까지는 정상 처리되고, 10번째 chunk가 수행되면서 에러가 발생하면
    • 1-9번까지의 chunk 데이터는 정상 저장되고, Job과 Step의 상태는 FAILED 처리된다.
  • 배치 재 실행 시, chunk 10번부터 처리할 수 있도록 배치를 만들어야 한다.

 

 

즉, Skip은 데이터를 처리하는 동안 설정된 Exception이 발생했을 경우

해당 데이터 처리를 건너뛰는 기능이다.

데이터의 사소한 오류에 대해 Step의 실패처리 대신 Skip을 함으로써

배치 수행의 빈번한 실패를 줄일 수 있게 한다.

 

 

 

 


✏️ SkipListener 로 예외처리 해보기

 

  • 구현 요구사항
    • “이름,나이,거주지”로 구성된 Person.csv 파일이 주어지는데,
    • 이때 이름이 empty String인 경우 “NotFoundNameException”을 발생 시킨다.
    • NotFoundNameException이 3번 이상 발생한 경우 배치를 실패 처리 한다.

 

 

 

☝️ 세팅

 

 

아래와 같이 Skip 관련 코드를 추가한다.

 

 

 

 

 

 

 

✌️ 예상 시나리오

 

  • validationProcessor가 먼저 실행되고
  • Name 부분이 비어있는 3건의 데이터 때문에 NotFoundNameException 이 3번 발생된다.
  • SkipLimit을 2로 설정했기 때문에, 이 SkipLimit보다 더 많은 예외가 발생했으므로 job이 실패한다.
  • 데이터는 90개만 저장이 된다.
    • 그 이유는, 100건의 데이터 중 10번째 chunk에서 예외가 limit보다 많이 발생했으므로 9번째 chunk 데이터 까지만 저장되고 10번째 chunk는 저장되지 않는다.

 

 

✍️ 결과

 

 

 

 

BATCH_JOB_EXECUTION

 

 

 

 

BATCH_STEP_EXECUTION

 

 

 

 

 

 

👌 위 내용을 통해 얻을 수 있는 결론

 

  • chunk 기반으로 처리 시
    • skip 메소드를 통해 해당하는 예외를 설정하고
    • skipLimit으로 몇 번까지 이 예외를 skip할 지 숫자를 설정했을 때
    • skipLimit에 설정된 수보다 더 많은 예외가 발생하면
    • 그 전 chunk 까지는 데이터가 저장되지만, 에러가 발생한 chunk 부터는 저장되지 않는다는 것을 눈으로 확인할 수 있었다.

 

 

 


✏️ Retry로 배치 재시도 해보기

 

* 언제 Retry를 설정하는게 좋을까?

DB DeadLock, Network timeout 과 같이 간헐적으로 발생하지만,

재시도하면 성공할 여지가 있는 경우

 

 

아래와 같이 retry를 추가할 수도 있지만

더 구체적으로 retry를 정의하려면, ‘RetryTemplate’를 이용해보자.

 

 

 

 

 

* 구현 요구사항

  • NotFoundNameException이 발생하면
  • 3번까지 재시도 후
  • Person.name을 “UNKNOWN”으로 변경

 

 

 

 

 

 

 

 

 

 

다음과 같이 실행시키면 아래와 같은 결과를 확인할 수 있다.

 

 

 

 

 

 

 

 

 


 

✏️ Retry 실험

 

  • 기본 전제
    • pserson.csv 파일에
    • 제일 첫 줄은 이름,나이,거주지 가 적혀있어 linesToSkip 처리
    • 1-101번째 줄까지 정상적인 데이터
    • 102-104번째 줄까지는 “이름”이 비어있는 데이터 3개
    • 105-121번째 줄까지는 정상적인 데이터

 

 

 

 

(1) 번 실험

 

  • NotFoundNameException 예외에 대해 skipLimit이 1로 설정된 상황이며, retry 없음

 

 

 

 

  • (1)번 실험 결과
    • skipLimit보다 더 많은 예외가 발생하여, 그 전 chunk 까지만 저장되고 배치 실패

 

 

 

 

 

(2) 번 실험

 

  • NotFoundNameException 예외에 대해 skipLimit이 1로 설정된 상황이며, retry 있음

 

 

 

 

 

 

 

  • (2)번 실험 결과
    • 이름이 비어있는 데이터는 이름을 “UNKNOWN”으로 세팅하여 저장, 배치는 실행됨

 

 

 

 

 

(3) 번 실험

 

  • NotFoundNameException 예외에 대해 skipLimit이 1로 설정된 상황이며, retry 있음
  • Person.csv 파일에 나이가 없거나 거주지가 없는 경우 추가

 

 

 

  • (3)번 실험 결과

 

 


✏️ 전체코드

 

 

- Person.java

 

package com.ot.schedule.core.domain;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import javax.persistence.*;
import java.util.Objects;

@Entity
@NoArgsConstructor
@Getter
@Slf4j
@Table(name = "PERSON")
public class Person {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "ID")
    private int id;

    @Column(name = "NAME")
    private String name;

    @Column(name = "AGE")
    private String age;

    @Column(name = "ADDRESS")
    private String address;

    public Person(String name, String age, String address) {
        this(0, name, age, address);
    }

    public Person(int id, String name, String age, String address) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.address = address;
    }

    public boolean isNotEmptyName() {
        return Objects.nonNull(this.name) && !name.isEmpty();
    }

    public Person unknownName() {
        this.name = "UNKNOWN";
        log.info("savePersonRetry : RecoveryCallback");
        return this;
    }
}

 

 

 

- SavePersonConfig.java

 

package com.ot.schedule.job;

import com.ot.schedule.core.domain.Person;
import com.ot.schedule.exception.NotFoundNameException;
import com.ot.schedule.listener.SavePersonListener;
import com.ot.schedule.vaildation.DuplicateValidationProcessor;
import com.ot.schedule.retry.PersonValidationRetryProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemProcessorBuilder;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import javax.persistence.EntityManagerFactory;

@Configuration
@Slf4j
public class SavePersonConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    public SavePersonConfig(JobBuilderFactory jobBuilderFactory,
                            StepBuilderFactory stepBuilderFactory,
                            EntityManagerFactory entityManagerFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.entityManagerFactory = entityManagerFactory;
    }

    @Bean
    public Job savePersonJob() throws Exception {
        return this.jobBuilderFactory.get("savePersonJob")
                .incrementer(new RunIdIncrementer())
                .start(this.savePersonStep(null))
                .listener(new SavePersonListener.SavePersonJobExecutionListener())
                .listener(new SavePersonListener.SavePersonAnnotationJobExecutionListener())
                .build();
    }

    @Bean
    @JobScope
    public Step savePersonStep(@Value("#{jobParameters[allow_duplicate]}") String allowDuplicate) throws Exception {
        return this.stepBuilderFactory.get("savePersonStep")
                .<Person, Person>chunk(10)
                .reader(itemReader())
                .processor(itemProcessor(allowDuplicate))
                .writer(itemWriter())
                .listener(new SavePersonListener.SavePersonStepExecutionListener())
                .faultTolerant()
                .skip(NotFoundNameException.class)
                .skipLimit(2)
                .build();
    }

    private ItemReader<? extends Person> itemReader() throws Exception {
        DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setNames("name", "age", "address");
        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(fieldSet -> new Person(
                fieldSet.readString(0),
                fieldSet.readString(1),
                fieldSet.readString(2)));

        FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
                .name("savePersonItemReader")
                .encoding("UTF-8")
                .linesToSkip(1)
                .resource(new ClassPathResource("person.csv"))
                .lineMapper(lineMapper)
                .build();

        itemReader.afterPropertiesSet();
        return itemReader;
    }

    private ItemProcessor<? super Person, ? extends Person> itemProcessor(String allowDuplicate) throws Exception {

        ItemProcessor<Person, Person> validationProcessor = item -> {
            if (item.isNotEmptyName()) {
                return item;
            }
            // Name이 비어있다면 예외 발생
            throw new NotFoundNameException();
        };

        DuplicateValidationProcessor<Person> duplicateValidationProcessor =
                new DuplicateValidationProcessor<>(Person::getName, Boolean.parseBoolean(allowDuplicate));

        // ItemProcessor는 하나만 생성할 수 있기 때문에, 1개 이상 생성하려면 CompositeItemProcessor를 사용해서 Process를 묶는다.
        CompositeItemProcessor<Person, Person> itemProcessor = new CompositeItemProcessorBuilder<Person, Person>()
//                .delegates(validationProcessor, duplicateValidationProcessor)
                // PersonValidationRetryProcessor가 먼저 호출되어야, name이 비어있는 데이터를 먼저 처리할 수 있음
                .delegates(new PersonValidationRetryProcessor(), validationProcessor, duplicateValidationProcessor)
                .build();

        itemProcessor.afterPropertiesSet();

        return itemProcessor;
    }

    private ItemWriter<? super Person> itemWriter() throws Exception {
//        return items -> items.forEach(x -> log.info("저는 {} 입니다.", x.getName()));
        JpaItemWriter<Person> jpaItemWriter = new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();

        ItemWriter<Person> logItemWriter = items -> log.info("person.size : {}", items.size());

        CompositeItemWriter<Person> itemWriter = new CompositeItemWriterBuilder<Person>()
                .delegates(jpaItemWriter, logItemWriter)
                .build();

        itemWriter.afterPropertiesSet();
        return itemWriter;
    }
}

 

 

 

person.csv

 

이름,나이,거주지
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
아무개,25,강원
이경원,32,인천
홍길동,30,서울
아무개,25,강원
이경원,31,인천
홍길동,30,서울
아무개,25,강원
이경원,32,인천
홍길동,30,서울
,30,서울
,25,강원
,32,인천

 

 

PersonRepository.java

 

package com.ot.schedule.core.repository;

import com.ot.schedule.core.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;

public interface PersonRepository extends JpaRepository<Person, Integer> {
}

 

 

NotFoundNameException.java

 

package com.ot.schedule.exception;

public class NotFoundNameException extends RuntimeException {
}

 

 

 

DuplicateValidationProcessor.java

package com.ot.schedule.exception;

public class NotFoundNameException extends RuntimeException {
}

 

 

PersonValidationRetryProcessor

 

package com.ot.schedule.retry;

import com.ot.schedule.core.domain.Person;
import com.ot.schedule.exception.NotFoundNameException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.retry.support.RetryTemplateBuilder;

@Slf4j
public class PersonValidationRetryProcessor implements ItemProcessor<Person, Person> {

    private final RetryTemplate retryTemplate;

    // 생성자에서 retryTemplate을 생성하고, 예외를 3번까지 재시도 함
    public PersonValidationRetryProcessor() {
        this.retryTemplate = new RetryTemplateBuilder()
                .maxAttempts(3)
                .retryOn(NotFoundNameException.class)
                .withListener(new SavePersonRetryListener())
                .build();

    }

    @Override
    public Person process(Person item) throws Exception {
        // ececute 메소드 호출 시 2개의 객체를 생성함
        return this.retryTemplate.execute(context -> {
            // RetryCallback 지점이며, retryTemplate의 첫 시작점
            // 예외가 3번 발생하면 RecoveryCallback을 호출함
            log.info("savePersonRetry (2) : RetryCallback");
            if (item.isNotEmptyName()) {
                return item;
            }
            throw new NotFoundNameException();
        }, context -> {
            // RecoveryCallback
            log.info("savePersonRetry (4) : RecoveryCallback");
            return item.unknownName();
        });
    }

    public static class SavePersonRetryListener implements RetryListener {
        @Override
        public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
            log.info("savePersonRetry (1) : open");
            // retry를 시작하는 설정이며, true여야 retry가 실행됨
            return true;
        }

        @Override
        public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            log.info("savePersonRetry : close");
        }

        @Override
        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            log.info("savePersonRetry (3) : onError");
        }
    }
}