본문 바로가기

Programming/Java & Spring 관련 내용 정리

[Spring Batch] 로그인한지 1년 지난 회원 -> 휴면회원으로 상태변경하는 배치구현

오늘 기준으로 마지막 로그인 일자가 1년이 넘은 회원의 사용자 상태를

‘ACTIVE’  👉  ‘INACTIVE’ 상태로 변경하는 배치를 만들었다.

 

 

* 전체 소스코드는 하단에 있습니다. :) 

 

 

[ 스프링 배치 실행흐름 ]

 

 

 

 

 


 

코드로 살펴보면 다음과 같다.

 

 

1. [ Job]

Job스프링부트 배치를 처리함에 있어서 가장 상위 계층이다.

JobBuilderFactory에서 생성된 JobBuilder를 통해 Job을 생성한다.

 

 

 

✏️ JobBuilderFactory는 JobBuilder를 생성할 수 있는 get() 메소드를 포함하고 있다.

get()메소드는 "inactiveUserJob"이라는 Job을 만들어 반환한다.

 

✏️ .incrementer(new RunIdIncrementer())

Spring Batch에서 전달받은 Job Parameter 외에 

run.id 라는 임의의 파라미터를 추가로 사용해 매번 run.id 값을 변경해 준다.
매 실행마다 run.id가 변경되니 재실행 할 수 있다.

 

✏️ start(inactiveJobStep)

Job 실행 시 최초로 실행할 Step을 지정한다.

 

 

 

 

 

2. [ Step ]

1개 이상의 Step이 모여 Job을 표현한다.

읽고, 처리하고, 저장하는 기본구조를 이 step이라는 객체에서 정의하며,

가장 실질적인 배치처리를 담당한다.

 

 

✏️ chunk(5) 

데이터를 5개 단위로 끊어서 배치로 처리하겠다는 의미이다.

 

 

 

 

 

3. [ ItemReader ]

DB에서 마지막 로그인 일자가 오늘로부터 1년 전인 회원들을 읽어온다.

 

 

 

 

 

4. [ ItemProcessor ]

위에서 읽어온 회원들의 상태를 ACTIVE에서 INACTIVE(휴면회원)로 변경한다.

 

 

 

 

 

 

5. [ ItemWriter ]

상태가 변경된 회원들의 상태를 저장한다.

 

 

 

 

 

위와 같이 InactiveUserJobConfig 를 구성하였으며,

배치를 호출하는 스케쥴러는 @Scheduled 를 통해 구현하였습니다. (아래에서 코드 확인 가능)

 

 


✏️ 전체 소스코드

 

UserEntity

 

package com.ot.schedule.core.domain;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.hibernate.annotations.DynamicUpdate;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Getter
@DynamicUpdate
@Table(name = "USER")
public class User {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "ID")
    private Long id;

    @Column(name = "USER_ID")
    private String userId;

    @Column(name = "PASSWORD")
    private String password;

    @Column(name = "NAME")
    private String name;

    @Column(name = "PHONE")
    private String phone;

    @Column(name = "ROLES")
    private String roles;

    @Column(name = "USER_TYPE")
    private String userType;

    @Column(name = "REG_DATE")
    private LocalDateTime regDate;

    @Column(name = "LAST_LOGIN_DATE")
    private LocalDateTime lastLoginDate;

    @Column(name = "UPDATE_DATE")
    private LocalDateTime updateDate;
    @Column(name = "STATUS")
    @Enumerated(EnumType.STRING)
    private Status status;

    @Column(name = "REG_USER_SEQ")
    private Long regUserSeq;

    @Column(name = "UPD_USER_SEQ")
    private Long updUserSeq;

    public User setInactive() {
        status = Status.INACTIVE;
        return this;
    }

    @Getter
    @AllArgsConstructor
    public enum Status {
        ACTIVE(0, "가입"),
        INACTIVE(1, "휴면"),
        WITHDRAWAL(2, "탈퇴");

        private final Integer Id;
        private final String title;
    }
}

 

 

UserRepository 

 

package com.ot.schedule.core.repository;

import com.ot.schedule.core.domain.User;
import org.springframework.data.jpa.repository.JpaRepository;

import java.time.LocalDateTime;
import java.util.List;

public interface UserRepository extends JpaRepository<User, Long> {
    List<User> findAllByLastLoginDateBeforeAndStatusEquals(LocalDateTime lastLoginDate, User.Status status);
}

 

 

 

InactiveUserJobConfig

 

package com.ot.schedule.job;

import com.ot.schedule.config.QueueItemReader;
import com.ot.schedule.core.domain.User;
import com.ot.schedule.core.repository.UserRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.LocalDateTime;
import java.util.List;


@Slf4j
@Configuration // 스프링배치의 모든 Job은 @Configuration으로 등록해서 사용
@RequiredArgsConstructor
public class InactiveUserJobConfig {

// Job은 배치의 실행단위. Job을 만들 수 있는 팩토리 클래스는 스프링 배치 설정에 의해 빈으로 생성되어 있음
   
    private final JobBuilderFactory jobBuilderFactory;
    
    private final StepBuilderFactory stepBuilderFactory;
    
    private final UserRepository userRepository;


    @Bean("inactiveUserJob")
    public Job inactiveUserJob(Step inactiveJobStep) {
    // JobBuilder를 생성할 수 있는 get() 메서드를 포함. get()메서드는 새로운 JobBuilder를 생성해서 반환.
        return jobBuilderFactory.get("inactiveUserJob")
                .incrementer(new RunIdIncrementer()) // Job이 실행될 때마다 파라미터 아이디를 자동으로 생성해주는 클래스
                .start(inactiveJobStep) // job 실행 시 최초로 실행될 step 지정
                .build();
    }

    @JobScope
    @Bean("inactiveJobStep")
    public Step inactiveJobStep(ItemReader inactiveUserReader,
                                ItemProcessor inactiveUserProcessor,
                                ItemWriter inactiveUserWriter) {
        return stepBuilderFactory.get("inactiveJobStep")
                .<User, User>chunk(5)
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }

    @Bean
    @StepScope
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers = userRepository.findAllByLastLoginDateBeforeAndStatusEquals(LocalDateTime.now().minusYears(1), User.Status.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }

    @Bean
    public ItemProcessor<User, User> inactiveUserProcessor() {
        return User::setInactive;
    }

    @Bean
    public ItemWriter<User> inactiveUserWriter() {
        return (userRepository::saveAll);
    }
}

 

 

 

ScheduleApplication

 

@EnableBatchProcessing
@SpringBootApplication
public class ScheduleApplication {
    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class, args);
    }
}

 

 

 

application.properties

 

profile.name=local

#server
server.port = 8086
server.domain = http://localhost:8086

#database
spring.datasource.driver-class-name=net.sf.log4jdbc.sql.jdbcapi.DriverSpy
spring.datasource.url=
spring.datasource.username=
spring.datasource.password=

#batch
spring.batch.job.names=${job.name:NONE}

#jpa
spring.jpa.show-sql=true
spring.jpa.generate-ddl=false
spring.jpa.hibernate.ddl-auto=none
spring.batch.jdbc.initialize-schema=always

 

 

 

JobScheduler

 

package com.ot.schedule.scheduler;

import com.ot.schedule.job.InactiveUserJobConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
@RequiredArgsConstructor
public class JobScheduler {
    private final JobLauncher jobLauncher;
    private final InactiveUserJobConfig inactiveUserJobConfig;

    @Scheduled(cron = "0 0 1 * * *") // 매일 오전 1시
    public void runJob() {

        Map<String, JobParameter> confMap = new HashMap<>();
        confMap.put("date", new JobParameter(ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH:mm:ss"))));
        JobParameters jobParameters = new JobParameters(confMap);

        try {
            jobLauncher.run(inactiveUserJobConfig.inactiveUserJob(), jobParameters);
        } catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException |
                 JobParametersInvalidException e) {
            log.error(e.getMessage());
        } catch (JobRestartException e) {
            throw new RuntimeException(e);
        }
    }
}