ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [nestjs] 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅 - kafka (4/5)
    nodejs/nestjs 2023. 7. 26. 03:17

     

    소개

    이 포스팅은 NestJS와 Kafka를 이용하여 실시간 데이터 처리 시스템을 구축하고자 하는 개발자를 대상으로 합니다.
    다음과 같은 독자분들을 대상으로 작성되었습니다.

    • Node.js 및 TypeScript에 익숙한 개발자

    • kafka에 대한 이해가 있는 개발자

    • docker에 대한 이해가 있는 개발자



    포스팅의 전체 목표는 다음과 같습니다.

    1. nestjs의 공식문서를 기반으로 Microservice기반의 kafka 연동
    2. kafka 커넥션 핸들링
    3. kafka 토픽 환경분리
    4. 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅
    5. kafka graceful shutdown 구현하기

     



    개요

    이번 포스팅에서 kafka 모듈화에서 진행할 목록들입니다.

    • [이전 포스팅 작업]
      • Producer 생성 및 커넥션 관리
      • Consumer 생성 및 커넥션 관리
      • 카프카 메시지 prefix 구현
        • ex) {development/production/staging}-kafka-message
      • kafka constants 관리 : topicMessage, config 값등을 효율적으로 관리하는 작업
      • kafka workflow 관리: 서버와 토픽을 관리하여 어떤 서버에서 어떤 토픽이 생성되고, 어떤 서버가 컨슘하는지 유지보수를 위한 작업
    • 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅
      • nestjs-cls 모듈 생성하기

      • kafkaTopic 메세지 세팅 시 traceId 추가하기

      • logger에 traceId 설정하기

      • interceptor 생성하기

    • [참고] 다음포스팅에는 다음과 같은 작업을 진행합니다. 
      • kafka 에러핸들링 및 kafka 그레이스 셧다운 구현

     

    참고

     

    GitHub - mingoogle/blog-mingoogle-seed: Building a nestjs-based microservices architecture with Kafka messaging

    Building a nestjs-based microservices architecture with Kafka messaging - GitHub - mingoogle/blog-mingoogle-seed: Building a nestjs-based microservices architecture with Kafka messaging

    github.com

    git clone -b blog-kafka-4 --single-branch https://github.com/mingoogle/blog-mingoogle-seed.git blog-kafka-4

     

    • 프로젝트 내부의 postman 파일을 import하여 쉽게 테스트 가능합니다.

    ~/resources/postman/microservice-kafka.postman_collection.json

     

     

    postman

     


     

    Part 4 - 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅

     

     

    위의 그림처럼 마이크로서비스 환경에서 하나의 요청이 여러 서버들을 거쳐 처리하게 된다면, 각각 서버별로 로그가 쌓이게 됩니다.
    각 요청 별 로그들을 수집하기 위해서는 로그를 추적을 해야하는 상황이 발생합니다.

    예를 들어 B라는 요청은 auth server -> main server -> etc server를 호출하는 상황에서 auth server <-> main server 간에 에러가 발생했다고 가정한다면, 수많은 요청들 중에서 '어떤 클라이언트가 요청을 했을 때 에러가 발생한 것인지 확인하기 위하여' 로그를 확인해야 하는 상황이 발생합니다. sre팀이나 인프라팀에서 dataDog 같은 (로그 수집) 모니터링 툴을 통하여 여러 서버들의 로그를 한 번에 볼 수 있게 구축을 했어도 요청에 대한 각 서버들의 로그가 추적이 되지 않는다면, 로그만을 보고 어떤 요청과 관련되어 있는 로그인지를 알기 어렵기 때문에 요청의 흐름을 파악하기 어려워집니다.

     

    여러 서버(서비스)들을 거치는 복잡한 시스템에서 traceId를 사용하여 요청의 전체 흐름을 추적할 수 있습니다.
    즉, traceId를 사용하여 요청이 어떤 서비스를 거치고 얼마나 시간이 소요되는지 등을 파악하여 성능 분석과 최적화에 도움을 줄 수도있으며 분산된 환경에서 트랜잭션 관리도 처리할 수도 있습니다.

     

    nodejs에서는 async_hooks을 가지고 로그를 추적하거나 로그 라이브러리(winston, pino)들이 제공하는 각 클라이언트의 요청마다 유니크한 requestId를 가지고도 설정할 수 있습니다.
    해당 포스팅에서는 nestjs-cls 라이브러리를 이용하여 traceId를 구현해보도록 하겠습니다.

     

     

    1. 설치

    • nestjs-cls 를 가지고 traceId를 관리하기 위하여 nestjs-cls를 설치합니다.
    yarn add nestjs-cls --save

     

    2. nestjs-cls 모듈 생성하기

    nestjs-cls 모듈을 생성하여 각 서버에 nestjs-cls 세팅을 진행합니다.

    참고로 nestjs-cls 모듈은 auth server, main server 둘다 사용하는 모듈로 libs폴더 내부에 생성합니다.

     

    //server/libs/common/src/cls-hooked/src/cls-hooked.module.ts
    
    import { Module } from '@nestjs/common';
    import { ClsModule } from 'nestjs-cls';
    
    @Module({
      imports: [ClsModule.forRoot()],
      exports: [ClsModule],
    })
    export class ClsHookedModule {}
    
    
    // server/apps/auth/src/app.module.ts
    @Module({
      imports: [LibsModule],
      controllers: [AppController, AppKafkaController],
      providers: [
        AppService,
        {
          provide: APP_INTERCEPTOR,
          useClass: ClsInterceptor,
        },
      ],
    })

    clsModule을 만들고 app.module.ts단에서 ClsInterceptor을 활용하여 초기화 및 세팅을 완료합니다.

    3. kafkaTopic 메세지 세팅 시 traceId 추가하기

    로그 추적을 위하여 클라이언트의 첫 인입점(클라이언트의 요청을 처음으로 받는 서버)에서 생성한 traceId를 가지고
    각각의 서버들이 같은 traceId를 가질 수 있도록 세팅합니다.
    각각의 클라이언트의 요청마다 호출되는 각각의 서버들에게 같은 traceId를 세팅한다면 하나의 요청에 대하여 로그 추적이 가능해집니다.

     

    [중요] traceId를 세팅하는 절차는 다음과 같습니다.

    1. 클라이언트의 첫 인입점(postman으로 http 요청을 처리하는 서버 === 클라이언트의 요청을 처음으로 받는 서버)에서 traceId를 생성합니다.
      • 참고로 프로젝트가 제공하는 api에 따라서, 첫 인입점은 http, rpc 등 다양한 인터페이스에서 요청이 들어올 수 있습니다.
        해당 포스팅은 http로 들어오는 요청이 첫 인입점이며, main server가 첫 인입점입니다.
    2. 첫 인입점인 main서버에서 생성된 traceId를 http 요청 or kafka(rpc) 토픽 생성 시 auth server 혹은 etc server에게 전달해줍니다. ( 즉, 다음 작업을 진행할 서버에게 traceId를 넘겨줍니다.)
    3. 인터셉터에서 traceId를 세팅합니다.
      • 각 서버의 인터셉터들이 request http header부분에서 traceId가 존재하는지 or 카프카 토픽메세지에서 traceId가 존재하지는 확인 하여 있다면 요청 데이터의 traceId로 세팅하고, 없다면 새롭게 traceId를 생성합니다.
    4. 3번을 통하여 요청에 대한 로그 추적을 할 수 있게됩니다. 
      • ex) 로그 수집 시스템(dataDog, 사내 로그 시스템)에서 traceId를 기준으로 조회를 한다면 요청에 대한 로그를 추적할 수 있습니다.
    5. 원활한 로그 추적을 위하여 프로젝트에서 사용하는 로그모듈에도 traceId를 세팅해줍니다.

     

    • kafka.service.ts 파일에서 카프카 토픽 메세지 세팅 시 traceId를 추가하는 로직을 작성합니다.
    // server/libs/common/src/kafka/src/kafka.service.ts
    
    async setTopicMessage<T extends keyof typeof TopicMessageMap>(
        topic: T,
        message: InstanceType<(typeof TopicMessageMap)[T]>,
      ): Promise<TTopicMessage> {
        const traceId = this.clsService.get('traceId');
        set(message, 'traceId', traceId);
    
        // 토픽메시지 내부 검증
        try {
          await this.validationService.validate({
            type: TopicMessageMap[topic],
            value: message,
          });
        } catch (err) {
          throw new CustomError(
            SYSTEM.ERROR.ERROR_CODE.VALIDATION_ERROR,
            `[libs/kafka/kafkaService] ${TopicMessageMap[topic]} validation error!`,
          );
        }

     

    4. interceptor 생성하기 

    traceId 세팅을 위한 인터셉터를 생성합니다.

     

    // server/libs/interceptor/trace/trace.interceptor.ts
    
    import {
      CallHandler,
      ExecutionContext,
      Injectable,
      NestInterceptor,
    } from '@nestjs/common';
    import { ClsService } from 'nestjs-cls';
    import { Observable } from 'rxjs';
    import { v4 as uuidv4 } from 'uuid';
    
    import { LoggerService } from '@app/common';
    
    @Injectable()
    export class TraceInterceptor implements NestInterceptor {
      constructor(
        private readonly clsService: ClsService,
        private readonly logger: LoggerService,
      ) {}
    
      intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
        try {
          // kafka
          if (context.getType() === 'rpc') {
            const traceId = context.getArgByIndex(0)?.traceId || uuidv4();
            this.clsService.set('traceId', traceId);
            return next.handle();
          }
    
          // http
          if (context.getType() === 'http') {
            const request = context.switchToHttp().getRequest();
            const traceId = request.headers['x-request-id'] || uuidv4();
            this.clsService.set('traceId', traceId);
            return next.handle();
          }
        } catch (err) {
          this.logger.error(err, '[TraceInterceptor] unknown error in interceptor');
          return next.handle();
        }
      }
    }
    • request http header부분에서 traceId가 존재하는지 or 카프카 토픽메세지에서 traceId가 존재하지는 확인 하여 있다면 요청 데이터의 traceId로 세팅하고, 없다면 새롭게 traceId를 생성합니다.

     

    5. logger에 traceId 설정하기

    원활한 로그 추적을 위하여 프로젝트에서 사용하는 로그모듈에도 traceId를 세팅해줍니다.

    // server/libs/common/src/logger/src/logger.service.ts
    
    private _format(obj: object | string, message: string = '') {
        // traceId 세팅
        const traceId = this.clsService.get('traceId');
    
        const result: LogInfo = { message, traceId };
        if (obj instanceof Error) {
          result.err = obj;
          return result;
        }
    
        if (typeof obj === 'string') {
          result.message = `${obj} ${message}`;
        } else {
          result.data = obj;
        }
    
        return result;
      }

     

    6. 테스트 하기  (github readme 참고)

     

    [6.1] 서버를 실행합니다.

    cd ~/server
    yarn start:auth // kafka 컨슈머 서버
    yarn start:main // kafka 프로듀서 서버

    [6.2] [POST] /publishToKafka 로 요청합니다.

    • [post] publishToKafka은 다음과 같이 동작합니다.
      • main server 호출 -> main server에서 kafka 토픽 발행 -> auth server에서 컨슘
      • http 요청 당 main server와 auth server가 동일한 traceId를 가지고 있는 것을 확인 할 수 있습니다.
        즉, 마이크로 서비스 환경에서 traceId를 가지고 로그를 추적할 수 있게 되었습니다 :)
      • 참고로 테스트 시 console.log와 logger의 로그가 순서대로 찍히지 않을 수 있습니다.
        이는 logger모듈의 pino-pretty 속성 때문입니다. pino-pretty 속성을 사용하지 않으면 순서대로 나옵니다.

    main server: traceId는&nbsp; 4a244926-2326-4b66-9181-fa1cd07d11d5

     

    auth server도 main server와 동일한 traceId를 가지고 있음

     

Designed by Tistory.