-
[nestjs] 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅 - kafka (5/5)nodejs/nestjs 2024. 5. 18. 16:00
소개
이 포스팅은 NestJS와 Kafka를 이용하여 실시간 데이터 처리 시스템을 구축하고자 하는 개발자를 대상으로 합니다.
다음과 같은 독자분들을 대상으로 작성되었습니다.- Node.js 및 TypeScript에 익숙한 개발자
- kafka에 대한 이해가 있는 개발자
- docker에 대한 이해가 있는 개발자
포스팅의 전체 목표는 다음과 같습니다.- nestjs의 공식문서를 기반으로 Microservice기반의 kafka 연동
- kafka 커넥션 핸들링
- kafka 토픽 환경분리
- kafka 토픽 환경분리
- kafka graceful shutdown 구현하기
참고
- 해당 포스팅은 소스코드를 기준으로 설명합니다.
- https://github.com/mingoogle/blog-mingoogle-seed/tree/feature/blog-kafka-5?files=1
- 설명을 위해 일부 코드들은 ESLint 컨벤션에 맞지 않는 코드가 있습니다.
git clone -b feature/blog-kafka-5 --single-branch https://github.com/mingoogle/blog-mingoogle-seed.git blog-kafka-5
- 프로젝트 내부의 postman 파일을 import하여 쉽게 테스트 가능합니다.
개요
- microservices
- Lifecycle Events 와 lifecycle hooks
- lifecycle hooks을 이용한 graceful shutdown 구조 만들기
microservices
- nestjs에서는 일반적인 애플리케이션외에도 마이크로 서비스 형태 아키텍처를 nestjs에서 쉽게 사용할 수 있도록 @nestjs/microservice 을 지원합니다.
- kafka, gRPC, Redis 등등 실제 많이 사용하는 기술들을 nestjs형태로 랩핑을 해놓았습니다. 그러다보니 NestJS의 의존성 주입, 예외 처리, 로깅 등을 그대로 활용할 수도 있으며, redis, kafka등 모듈을 만들때 일관된 코드 작성이 가능하다는 장점이 있습니다. 하지만 초기화하거나 커넥션을 핸들링하는 부분들은 라이브러리 안에서 숨겨져 있기때문에 자유도가 떨어지는 단점은 가지고 있습니다.
- 해당 포스팅에서는 서버를 강제로 종료 시, 안정적으로 서버를 종료하기 위해 더이상 카프카 메세지를 컨슘하지 않고 기존에 작업중인 프로세스가 마칠때까지 기다리고나서 유휴상태일때 서버를 종료하기 위해서 lifecycle hooks를 이용해서 kafka 초기화 및 커넥션을 핸들링해보도록 하겠습니다.
Lifecycle Events
- nestjs에서는 라이프사이클이 존재합니다. 라이프사이클 이벤트는 애플리케이션이 부트스트랩될 때와 종료될 때 발생을 하고 각 라이프사이클 이벤트에서 모듈, 프로바이더, 컨트롤러에 등록된 라이프사이클 훅 메서드를 호출합니다
onModuleInit() 호스트 모듈의 의존성이 해결된 후 호출됩니다. onApplicationBootstrap() 모든 모듈이 초기화된 후, 커넥션 수신 전에 호출됩니다. onModuleDestroy()* 종료 신호(e.g., SIGTERM)가 수신된 후 호출됩니다. beforeApplicationShutdown()* 모든 onModuleDestroy() 핸들러가 완료된 후(Promise가 해결되거나 거부됨) 호출됩니다. 완료된 후 모든 기존 연결이 닫힘(app.close()가 호출됨). onApplicationShutdown()* 연결이 종료된 후(app.close() 해결됨) 호출됩니다. 참고로
onModuleDestroy, beforeApplicationShutdown 및 onApplicationShutdown은 명시적으로 app.close()를 호출하거나 프로세스가 특별한 시스템 신호(SIGHUP, SIGTERM 등)를 받았을 때, 그리고 애플리케이션 부트스트랩 시 enableShutdownHooks를 올바르게 호출한 경우에만 트리거가 됩니다.
lifecycle hooks을 이용한 graceful shutdown 구조 만들기
1. main.ts에서 shotdown hooks를 활성화합니다.
// main.ts import { NestFactory } from '@nestjs/core'; import { AppModule } from './app.module'; import { ConfigurationService, LoggerService } from '@app/common'; async function bootstrap() { const configurationService = new ConfigurationService(); const serverName = configurationService.get<string>('SERVER_NAME') || 'main'; configurationService.setConfigurationAtInitServer(serverName); const port = configurationService.getServerPort(serverName); const NODE_ENV = configurationService.get<string>('NODE_ENV'); const app = await NestFactory.create(AppModule, { bufferLogs: true, }); // Starts listening for shutdown hooks app.enableShutdownHooks(); // NOTE: main서버에 컨슈머를 사용한다면 microservice 설정해주어야합니다.(@MessagePattern) app.connectMicroservice( { strategy: app.get('KAFKA_SERVICE'), }, { inheritAppConfig: true, }, ); await app.startAllMicroservices(); await app.listen(port, () => { app .get(LoggerService) .info( `[server] serverName: ${serverName}, starting: ${port}, NODE_ENV: ${NODE_ENV}`, ); }); } bootstrap();
2. app.module.ts를 다음과 같이 수정합니다.
// app.module.ts import { Module, Inject } from '@nestjs/common'; import { APP_INTERCEPTOR } from '@nestjs/core'; import { ClsInterceptor } from 'nestjs-cls'; import { delay } from 'lodash'; import { LibsModule } from '@app/lib'; import { AppController } from './app.controller'; import { AppService } from './app.service'; import { LoggerService, KafkaService } from '@app/common'; import { TraceInterceptor } from '@app/interceptor'; @Module({ imports: [LibsModule], controllers: [AppController], providers: [ AppService, { provide: APP_INTERCEPTOR, useClass: ClsInterceptor, }, { provide: APP_INTERCEPTOR, useClass: TraceInterceptor, }, ], }) export class AppModule { constructor( @Inject('KAFKA_SERVICE') private readonly kafkaService: KafkaService, private readonly logger: LoggerService, ) {} _delayTime(ms: number): Promise<void> { return new Promise((resolve) => { delay(resolve, ms); }); } async onModuleInit(): Promise<void> { const initialize = async (): Promise<void> => { // kafka initialize await this.kafkaService.init(); await this.kafkaService.produceGroupProcessor(); await this.kafkaService.consumeGroupProcessor(); }; try { await initialize(); } catch (err) { this.logger.warn(err, 'Initialization failed.'); } } //종료 신호(ex: SIGTERM)가 수신된 후 호출됩니다. async onModuleDestroy(): Promise<void> { this.logger.info('[main/server] shutting down gracefully ...'); await this.kafkaService.closeConsumers(); // close 시 진행 중인 작업 처리 await this._delayTime(5000); } // 모든 onModuleDestroy()처리기가 완료된 후 호출됩니다 async beforeApplicationShutdown(signal: string): Promise<void> { this.logger.info(`beforeApplicationShutdown sign : ${signal}`); const cleanUp = async (): Promise<void> => { // cleanup 함수들 적용 this.logger.info('database connection close!'); // db connection close this.logger.info('kafka connection close!'); await this.kafkaService.closeProducer(); }; await cleanUp(); // 모든 모듈이 종료된 후 처리작업등의 로직 } //연결 종료 후 호출됩니다( app.close()해결). async onApplicationShutdown(signal: string): Promise<void> { this.logger.info(`onApplicationShutdown sign : ${signal}`); this.logger.info('[server] terminated!'); process.exit(2); } }
onModuleInit
- 모듈이 초기화되는 시점에 kafka 모듈을 초기화하고 프로듀서와 컨슈머를 초기화합니다.
onModuleDestroy
- 서버 종료 시 카프카 컨슈머 커넥션을 종료하려 더이상 토픽 메세지를 수신하지 않도록 합니다.
- await _delayTime()
- 그밖에 작업들을 처리하거나 컨슈머 작업들에 대한 콜백을 수신하여 카프카관련 작업이 완료되었는지에 대한 콜백을 받습니다.
- await _delayTime()
beforeApplicationShutdown
- await _delayTime() 단에서 모든 작업이 완료되었다면, 카프카 프로듀서도 종료를 해줍니다.
테스트
1. auth 서버의 경우 서버가 종료되어도 커넥션 핸들링이 되지않습니다.
2. main 서버의 경우 lifecycle hooks event를 통하여 커넥션을 핸들링하여 서버를 안정적으로 종료시킵니다.
'nodejs > nestjs' 카테고리의 다른 글
[nestjs] 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅 - kafka (4/5) (0) 2023.07.26 [nestjs] kafka 커넥션 핸들링 및 topicMessage prefix 구현 - kafka (2,3/5) (0) 2023.07.25 [nestjs] kafka 마이크로서비스 구축하기 - kafka (1/5) (0) 2023.07.24 [ Node js ] Passport 미들웨어 (1) - 패스포트 무엇인가? (2) 2020.12.06 - Node.js 및 TypeScript에 익숙한 개발자