springboot 2.7.x 기준으로 스프링배치를 SCDF로 스케쥴링해서 사용하려고 하면 spring cloud task 가 필수다
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-task -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-task</artifactId>
<version>2.3.0</version>
</dependency>
배치 컨피그를 잡아주고
// batch config
import javax.sql.DataSource;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.policy.CompositeCompletionPolicy;
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
import org.springframework.batch.repeat.policy.TimeoutTerminationPolicy;
import org.springframework.batch.support.DatabaseType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer {
@Value("${batch.timeout-termination:3000}")
private int timeOut;
@Value("${batch.simple-completion:1000}")
private int completionCount;
//[Chunk 크기를 프로그램적으로 동적 구성하기 위한 completion policy]
//[청크크기는 timeOut을 초과하거나 completionCount에 도달하면 정해짐]
@Bean
public CompletionPolicy completionPolicy() {
CompositeCompletionPolicy policy = new CompositeCompletionPolicy();
policy.setPolicies(new CompletionPolicy[] {new TimeoutTerminationPolicy(timeOut),
new SimpleCompletionPolicy(completionCount)});
return policy;
}
@Autowired
@Qualifier("batDataSource")
private DataSource dataSource;
@Autowired
@Qualifier("batTransactionManager")
private PlatformTransactionManager transactionManager;
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDatabaseType(DatabaseType.POSTGRES.name());
factoryBean.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
factoryBean.setDataSource(this.dataSource);
factoryBean.setTransactionManager(getTransactionManager());
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
protected JobExplorer createJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
public PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
}
}
@EnableTask를 쓰기때문에 Task config 통해서는 간단히 로깅만 한다.
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.task.configuration.DefaultTaskConfigurer;
import org.springframework.cloud.task.configuration.TaskProperties;
import org.springframework.cloud.task.listener.TaskExecutionListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dev.listener.BatTaskListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration
public class BatchTaskConfig extends DefaultTaskConfigurer {
public BatchTaskConfig(@Qualifier("batDataSource") DataSource dataSource, ApplicationContext applicationContext) {
super(dataSource, TaskProperties.DEFAULT_TABLE_PREFIX, applicationContext);
}
@Bean
public TaskExecutionListener batTaskListener() {
return new BatTaskListener();
}
}
task config에서 사용하는 리스너는 다음과 같다.
import java.util.Date;
import org.springframework.cloud.task.listener.TaskExecutionListener;
import org.springframework.cloud.task.repository.TaskExecution;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class BatTaskListener implements TaskExecutionListener {
@Override
public void onTaskStartup(TaskExecution taskExecution) {
taskExecution.setStartTime(new Date());
log.debug("[START] TaskName : {} Execution Id : {} parentExecution Id {}", taskExecution.getTaskName(),
taskExecution.getExecutionId(), taskExecution.getParentExecutionId());
}
@Override
public void onTaskEnd(TaskExecution taskExecution) {
taskExecution.setEndTime(new Date());
taskExecution.setExitCode(0);
taskExecution.setExitMessage("COMPLETED");
log.debug("[END] TaskName : {} Execution Id : {} parentExecution Id {}", taskExecution.getTaskName(),
taskExecution.getExecutionId(), taskExecution.getParentExecutionId());
}
@Override
public void onTaskFailed(TaskExecution taskExecution, Throwable throwable) {
taskExecution.setEndTime(new Date());
taskExecution.setExitCode(-1);
taskExecution.setExitMessage("FAILED");
log.debug("[FAIL] TaskName : {} Execution Id : {} parentExecution Id {} ", taskExecution.getTaskName(),
taskExecution.getExecutionId(), taskExecution.getParentExecutionId());
}
}
메인 클래스에 설정이다.
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.listener.TaskEventAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@EnableAutoConfiguration(exclude = { DataSourceAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class, TaskEventAutoConfiguration.class
})
@EnableTask
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
//FIXME 배치 args split처리
// web 때문에 배치서버가 종료되지 않는 경우 아래와 같은 코드를 넣어서 종료시켜 준다.
ConfigurableApplicationContext ctx = new SpringApplicationBuilder(BatchApplication.class)
.web(WebApplicationType.NONE).run();
int exitCode = SpringApplication.exit(ctx, new ExitCodeGenerator() {
@Override
public int getExitCode() {
return 0;
}
});
System.exit(exitCode);
}
}
spring cloud task를 통해서 spring batch와 같이 신규로 생성되는 테이블이 있으니 데이터소스 설정을 해줘야 하며, 작업이 종료되지 않는 경우 여러개의 task를 이어서 batch flow를 작성하였을때 파드가 종료되지 않으면서 뒤의 태스크 실행이 안되므로, 태스크를 통한 배치 실행결과가 되었든 서버가 정상 종료될 수 있도록 해야한다.
728x90
'개발 > java' 카테고리의 다른 글
| jib (2) | 2024.12.26 |
|---|---|
| openjdk 12 ~ 21 feature (3) | 2024.11.15 |
| jasypt (1) | 2024.11.13 |
| @Aspect로 공통header 처리 (2) | 2024.02.19 |
| springboot threadLocal 테스트 (1) | 2024.01.22 |