ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [nestjs] kafka 커넥션 핸들링 및 topicMessage prefix 구현 - kafka (2,3/5)
    nodejs/nestjs 2023. 7. 25. 18:18

    소개

    이 포스팅은 NestJS와 Kafka를 이용하여 실시간 데이터 처리 시스템을 구축하고자 하는 개발자를 대상으로 합니다.
    다음과 같은 독자분들을 대상으로 작성되었습니다.

    • Node.js 및 TypeScript에 익숙한 개발자

    • kafka에 대한 이해가 있는 개발자

    • docker에 대한 이해가 있는 개발자


    포스팅의 전체 목표는 다음과 같습니다.

    1. nestjs의 공식문서를 기반으로 Microservice기반의 kafka 연동
    2. kafka 커넥션 핸들링
    3. kafka 토픽 환경분리
    4. 마이크로서비스를 로깅을 위한 traceId 연동
    5. kafka graceful shutdown 구현하기


    개요

    이번 포스팅에서 kafka 모듈화에서 진행할 목록들입니다.

    • Producer 생성 및 커넥션 관리

    • Consumer 생성 및 커넥션 관리

    • 카프카 메시지 prefix 구현
      • ex) {development/production/staging}-kafka-message
    • kafka constants 관리 : topicMessage, config 값등을 효율적으로 관리하는 작업

    • kafka workflow 관리: 서버와 토픽을 관리하여 어떤 서버에서 어떤 토픽이 생성되고, 어떤 서버가 컨슘하는지 유지보수를 위한 작업

    • [참고] 다음포스팅에는 다음과 같은 작업을 진행합니다. 
      • traceId 관리
      • kafka 에러핸들링 및 kafka 그레이스 셧다운 구현

    참고

     

    GitHub - mingoogle/blog-mingoogle-seed: Building a nestjs-based microservices architecture with Kafka messaging

    Building a nestjs-based microservices architecture with Kafka messaging - GitHub - mingoogle/blog-mingoogle-seed: Building a nestjs-based microservices architecture with Kafka messaging

    github.com

    git clone -b blog-kafka-2 --single-branch https://github.com/mingoogle/blog-mingoogle-seed.git blog-kafka-2

     

    • 프로젝트 내부의 postman 파일을 import하여 쉽게 테스트 가능합니다.

    ~/resources/postman/microservice-kafka.postman_collection.json
    postman

     



    Part 2, 3 - kafka 커넥션 핸들링 및 topicMessage prefix 구현

    1. 설치

    • kafkaMessage 데이터를 벨리데이션 하기 위해 아래 라이브러리를 설치합니다.
      nestjs에서 Serialization작업이 필요할때 흔히 사용하는 class-transformer 유효성 검사를 쉽게 수행할 수 있도록 도와주는 라이브러리인 class-validator을 가지고 토픽 생성 시 토픽 메시지 내용이 적절한지 밸리데이션을 진행합니다.
    yarn add class-validator --save
    yarn add class-transformer --save
    • 예를 들어 특정 topicMessage가 { 'id': string, 'name': string } 형식의 메시지인데 토픽메시지 생성 시,
      { 'id': string } or { 'title': string, 'contents': string } 처럼 메시지의 내용을 누락하거나 메시지 형식이 다른 경우의 밸리데이션을 진행하여, 컨슘하는 서버에서도 문제가 없도록 사전에 정상적인 메시지만 보내도록 합니다.

     

    2. kafka 모듈 만들기 

    kafka 모듈을 만들어 재사용성을 높이고 새로운 기능을 추가하거나 수정할때 유연하게 대응할 수 있습니다.
    또한 모듈 간 독립성 높여 카프카 기능이 변경되더라도 다른 부분에 영향을 최소화하도록 결합도를 낮출 수 있습니다.
    카프카 모듈을 사용하면 카프카 브로커, 토픽, 컨슈머 그룹, kafka 에러 핸들링 등과 같은 kafka 구성을 한 곳에서 관리하여 응집도를 높일 수 있습니다.

     

    [2-1]. kafka 모듈 생성하기 

    auth서버(컨슈머)와 main서버(프로듀서) 둘다 사용하는 모듈이기 때문에 libs폴더에서 구현합니다. 아래는 간단하게 표현한 kafka 모듈의 구조입니다.

    libs
    └── common
        └── kafka
            ├── map // workflow
            ├── topics // 토픽 메세지
            │   └── common
            ├── kafka.module.ts
            └── kafka.service.ts

     

     

    [2-2]. kafka.service.ts - consumer, producer 생성하기, prefix 구현하기

    // server/libs/common/src/kafka/src/kafka.service.ts
    
    import { CustomTransportStrategy, Server } from '@nestjs/microservices';
    import { Kafka, Producer, Consumer } from 'kafkajs';
    
    import { LoggerService } from '../../logger/src';
    import { ValidationService } from '../../validation/src';
    import { CustomError } from '../../error';
    
    import { TopicMessageMap } from './topics';
    import { SYSTEM } from '../../../../constants';
    import { Observable } from 'rxjs';
    
    export type TTopicMessage = {
      topic: string;
      messages: [{ value: string }];
    };
    
    export type TKafkaConfig = {
      prefixTopic: string;
      clientId: string;
      brokers: string[];
      groupId: string;
      retryTime: number;
      retryCount: number;
      readUncommitted: boolean;
      maxWaitTimeInMs: number;
      sessionTimeout: number;
    };
    
    export class KafkaService extends Server implements CustomTransportStrategy {
      private kafka: Kafka;
    
      private producer: Producer;
    
      private consumer: Consumer;
    
      constructor(
        private readonly kafkaConfig: TKafkaConfig,
        private readonly loggerService: LoggerService,
        private readonly validationService: ValidationService,
      ) {
        super();
      }
    
      /**
       * 환경에 맞게 토픽 명 세팅
       * ex) {development or production or qc}.xxx.xxx
       * @param {string} topicName - 토픽명
       */
      private _setTopicByEnvironment(topicName: string): string {
        const result = this.kafkaConfig.prefixTopic
          ? `${this.kafkaConfig.prefixTopic}.${topicName}`
          : topicName;
        return result;
      }
    
      /**
       * consume 시 호출되는 핸들러를 환경변수에 맞게 세팅
       * - @MessagePattern(topicName) 데코레이터의 topicName 파라미터 값에 prefix{development or production}를 세팅.
       */
      private _setMessageHandler() {
        [...this.messageHandlers.entries()].map(([pattern]) => {
          const setTopicName = this._setTopicByEnvironment(pattern);
          this.messageHandlers.set(setTopicName, this.messageHandlers.get(pattern));
          this.messageHandlers.delete(pattern);
        });
      }
    
      async init() {
        this.kafka = new Kafka({
          clientId: this.kafkaConfig.clientId,
          brokers: this.kafkaConfig.brokers,
          retry: {
            initialRetryTime: this.kafkaConfig.retryTime,
            retries: this.kafkaConfig.retryCount,
          },
        });
    
        this._setMessageHandler();
      }
    
      /**
       * main.js 에서 app.listen() 호출 시 실행됨".
       */
      async listen(callback: () => void) {
        callback();
      }
    
      /**
       * application shutdown 이벤트 발생 시 실행됨
       * CustomTransportStrategy가 인터페이스라 반드시 선언해야함
       */
      // eslint-disable-next-line @typescript-eslint/no-empty-function
      async close() {}
    
      /**
       * producer init".
       */
      async produceGroupProcessor() {
        this.producer = this.kafka.producer();
        await this.producer.connect();
      }
    
      /**
       * consumer init
       */
      async consumeGroupProcessor() {
        this.consumer = this.kafka.consumer({
          groupId: this.kafkaConfig.groupId,
          readUncommitted: this.kafkaConfig.readUncommitted,
          sessionTimeout: this.kafkaConfig.sessionTimeout,
          maxWaitTimeInMs: this.kafkaConfig.maxWaitTimeInMs,
        });
        await this.consumer.connect();
    
        // 컨슘할 토픽 구독
        for (const [pattern] of this.messageHandlers.entries()) {
          try {
            await this.consumer.subscribe({
              topic: pattern,
              fromBeginning: false,
            });
          } catch (err) {
            this.loggerService.error(
              err,
              `error during the consumer subscription. topic: ${pattern}`,
            );
            throw new CustomError(
              SYSTEM.ERROR.ERROR_CODE.INTERNAL_SERVER_ERROR,
              `[libs/kafka/kafkaService] error during the consumer subscription. topic: ${pattern}`,
            );
          }
        }
    
        // 구독한 토픽들이 컨슘될 때 실행
        await this.consumer.run({
          eachMessage: async ({ topic, partition, message }) => {
            try {
              // message 값이 buffer 로 들어옴
              const messageValue = JSON.parse(message?.value?.toString());
              const handler = this['messageHandlers'].get(topic);
              if (handler) {
                const handled = await handler(messageValue, message);
                if (handled instanceof Observable) {
                  const subscription = handled.subscribe({
                    complete: () => {
                      subscription.unsubscribe();
                    },
                    error: (err) => {
                      this.loggerService.error(
                        err,
                        '[libs/kafka/kafkaService] eachMessage handler err',
                      );
                    },
                  });
                }
              }
            } catch (err) {
              this.loggerService.error(
                err,
                `[libs/kafka/kafkaService] eachMessage err topic: ${topic} message: ${message?.value?.toString()}`,
              );
            }
          },
        });
      }
    
      /**
       * consumers 종료
       * 참고
       * disconnect(): 네트워크 연결도 끊어진 상태로 Consumer 인스턴스와 kafka서버의 연결이 완전히 종료됨
       * stop(): 네트워크 연결을 유지하고있지만 더 이상의 컨슘을 진행하지 않음 (재시작시 기존에 할당받은 파티션이 유지되어있음)
       */
      async closeConsumers() {
        await this.consumer.disconnect();
      }
    
      /**
       * producer 종료
       */
      async closeProducer() {
        await this.producer.disconnect();
      }
    
      /**
       * 토픽메세지 세팅 작업 및 검증 작업
       * - 검증 작업
       *   1. 토픽명에 해당하는 토픽메시지 검증
       *   2. 토픽메시지 내부를 검증
       * @param {string} topic - TopicMessageMap에 정의된 토픽명
       * @param {object} message- TopicMessageMap에 정의된 클래스 인스턴스
       */
      async setTopicMessage<T extends keyof typeof TopicMessageMap>(
        topic: T,
        message: InstanceType<(typeof TopicMessageMap)[T]>,
      ): Promise<TTopicMessage> {
        // 토픽메시지 내부 검증
        try {
          await this.validationService.validate({
            type: TopicMessageMap[topic], // 수정된 부분
            value: message,
          });
        } catch (err) {
          throw new CustomError(
            SYSTEM.ERROR.ERROR_CODE.VALIDATION_ERROR,
            `[libs/kafka/kafkaService] ${TopicMessageMap[topic]} validation error!`,
          );
        }
    
        const setTopicName = this._setTopicByEnvironment(topic);
        const topicMessage: TTopicMessage = {
          topic: setTopicName,
          messages: [{ value: JSON.stringify(message) }],
        };
    
        return topicMessage;
      }
    
      /**
       * 한개의 topic에 producing
       * @param topicMessage - TTopicMessage 타입
       */
      async produceSingleMessage(topicMessage: TTopicMessage): Promise<void> {
        try {
          await this.producer.send({
            ...topicMessage,
            acks: -1,
            timeout: 5000,
          });
        } catch (err) {
          this.loggerService.error(
            { topicMessage: topicMessage, err },
            `[libs/kafka/kafkaService] [produce topic: ${topicMessage.topic}] Topic send fail`,
          );
          throw new CustomError(
            SYSTEM.ERROR.ERROR_CODE.INTERNAL_SERVER_ERROR,
            `[libs/kafka/kafkaService] [produce topic: ${topicMessage.topic}] Topic send fail`,
          );
        }
      }
    }

     

    kafka.service.ts에 구현된 내용을 설명하면 다음과 같습니다

    • 각 서버들은 kafka init()을 진행합니다.
      • @MessagePattern(topicMessage)선언하게되면, 메시지를 구독하여 수신하게 됩니다. 이때 메시지 핸들러가 처리하게 됩니다.
      • _setMessageHandler()가 호출되어 컨트롤러에서 메시지 핸들러에 있는 topicMessage(@MessagePattern의 topicMessage)에 prefix를 붙이게 됩니다. ex) topic-message -> development-topic-meesage
    async init() {
        this.kafka = new Kafka({
          clientId: this.kafkaConfig.clientId,
          brokers: this.kafkaConfig.brokers,
          retry: {
            initialRetryTime: this.kafkaConfig.retryTime,
            retries: this.kafkaConfig.retryCount,
          },
        });
    
        this._setMessageHandler();
      }
      
      
      /**
       * consume 시 호출되는 핸들러를 환경변수에 맞게 세팅
       * - @MessagePattern(topicName) 데코레이터의 topicName 파라미터 값에 prefix{development or production}를 세팅.
       */
      private _setMessageHandler() {
        [...this.messageHandlers.entries()].map(([pattern]) => {
          const setTopicName = this._setTopicByEnvironment(pattern);
          this.messageHandlers.set(setTopicName, this.messageHandlers.get(pattern));
          this.messageHandlers.delete(pattern);
        });
      }
      
        /**
       * 환경에 맞게 토픽 명 세팅
       * ex) {development or production or qc}.xxx.xxx
       * @param {string} topicName - 토픽명
       */
      private _setTopicByEnvironment(topicName: string): string {
        const result = this.kafkaConfig.prefixTopic
          ? `${this.kafkaConfig.prefixTopic}.${topicName}`
          : topicName;
        return result;
      }

     

    • produceGroupProcessor() 를 통하여 프로듀서를 생성합니다.
    /**
       * producer init".
       */
      async produceGroupProcessor() {
        this.producer = this.kafka.producer();
        await this.producer.connect();
      }

     

    • consumeGroupProcessor() 를 통하여 컨슈머를 생성합니다.
      • this.consumer.subscribe() 메서드를 통하여 MessagePattern()를 통하여 컨슈머가 구독하고 있는 토픽들을 구독합니다. 
      • this.consumer.run()을 통하여 컨슘 시의 메시지 핸들러를 실행시킵니다.
      • 컨슈머와 프로듀서를 분리시켜, 각각 서버마다 컨슈머역할만 할수 있도록, 프로듀서역할만 할 수있도록, 혹은 컨슈머+프로듀서 역할을 할 수 있도록 분리시켜 놓았습니다.
    async consumeGroupProcessor() {
        this.consumer = this.kafka.consumer({
          groupId: this.kafkaConfig.groupId,
          readUncommitted: this.kafkaConfig.readUncommitted,
          sessionTimeout: this.kafkaConfig.sessionTimeout,
          maxWaitTimeInMs: this.kafkaConfig.maxWaitTimeInMs,
        });
        await this.consumer.connect();
    
        // 컨슘할 토픽 구독
        for (const [pattern] of this.messageHandlers.entries()) {
          try {
            await this.consumer.subscribe({
              topic: pattern,
              fromBeginning: false,
            });
          } catch (err) {
            this.loggerService.error(
              err,
              `error during the consumer subscription. topic: ${pattern}`,
            );
            throw new CustomError(
              SYSTEM.ERROR.ERROR_CODE.INTERNAL_SERVER_ERROR,
              `[libs/kafka/kafkaService] error during the consumer subscription. topic: ${pattern}`,
            );
          }
        }
    
        // 구독한 토픽들이 컨슘될 때 실행
        await this.consumer.run({
          eachMessage: async ({ topic, partition, message }) => {
            try {
              // message 값이 buffer 로 들어옴
              const messageValue = JSON.parse(message?.value?.toString());
              const handler = this['messageHandlers'].get(topic);
              if (handler) {
                const handled = await handler(messageValue, message);
                if (handled instanceof Observable) {
                  const subscription = handled.subscribe({
                    complete: () => {
                      subscription.unsubscribe();
                    },
                    error: (err) => {
                      this.loggerService.error(
                        err,
                        '[libs/kafka/kafkaService] eachMessage handler err',
                      );
                    },
                  });
                }
              }
            } catch (err) {
              this.loggerService.error(
                err,
                `[libs/kafka/kafkaService] eachMessage err topic: ${topic} message: ${message?.value?.toString()}`,
              );
            }
          },
        });
      }
    • produceSingleMessage()를 통하여 토픽 메시지를 발행합니다.
    /**
     * 한개의 topic에 producing
     * @param topicMessage - TTopicMessage 타입
     */
    async produceSingleMessage(topicMessage: TTopicMessage): Promise<void> {
      try {
        await this.producer.send({
          ...topicMessage,
          acks: -1,
          timeout: 5000,
        });
      } catch (err) {
        this.loggerService.error(
          { topicMessage: topicMessage, err },
          `[libs/kafka/kafkaService] [produce topic: ${topicMessage.topic}] Topic send fail`,
        );
        throw new CustomError(
          SYSTEM.ERROR.ERROR_CODE.INTERNAL_SERVER_ERROR,
          `[libs/kafka/kafkaService] [produce topic: ${topicMessage.topic}] Topic send fail`,
        );
      }
    }
    • 참고로 필요에 따라 eachBatch() 및 produceMultiMessage()를 구현할 수 있습니다.

     

    [2-3]. kafka constants 관리 - topicMessage 상수로 관리 및 밸리데이션 적용하기

    Constants를 사용하면 매직 넘버(하드코딩된 숫자나 문자열) 대신 의미 있는 상수 이름을 사용하여 코드의 가독성이 향상됩니다.

    상수를 사용하지 않는다면 같은 값을 여러 곳에서 사용하는 중복해서 작성을 하는 상황이 발생하는데요.

    이때, 값이 변경이 되면 일일이 찾아서 수정해야하기 때문에 번거로울 뿐 아니라 버그를 초래할 수 있습니다.
    만약, 상수를 사용하고 있다면 변경 시 한 곳에서만 수정하면 되므로 변경 시 용이합니다.

    • 상수를 관리하면  재사용 및 일관성을 유지하고 중복된 코드를 줄일 수 있으며, 값의 변경 시 유지보수하기 편리합니다.

     

    • kafka-topic-message.ts 만들기
    // server/libs/common/src/kafka/src/topics/kafka-topic-message.ts
    
    import { IsString } from 'class-validator';
    
    export class KafkaTopicMessage {
      @IsString()
      name: string;
    
      @IsString()
      title: string;
      
      @IsNumber()
      eventTime: number; // 토픽메세지마다 공통적인 값
    
      @IsString()
      @IsOptional()
      traceId?: string; // 토픽메세지마다 공통적인 값
    }

    위의 코드를 보면 eventTime이나 traceId라는 값은 '모든 토픽 메시지'에서 공통적으로 들어가야 하는 값이라면 extends 하여 중복된 코드를 줄이는 것이 좋습니다. 

     

    다음과 같이 코드를 변경합니다.

    // server/libs/common/src/kafka/src/topics/common/definitions.ts
    import { IsNumber, IsOptional, IsString } from 'class-validator';
    
    export class CommonKafkaSchema {
      @IsNumber()
      eventTime: number;
    
      @IsString()
      @IsOptional()
      traceId?: string;
    }
    
    
    // server/libs/common/src/kafka/src/topics/kafka-topic-message.ts
    import { IsString } from 'class-validator';
    
    import { CommonKafkaSchema } from './common/definitions';
    
    export class KafkaTopicMessage extends CommonKafkaSchema {
      @IsString()
      name: string;
    
      @IsString()
      title: string;
    }

     

     

    • kafkatopic들을 constants로 관리하기
      • 위에서 생성한 토픽메시지와 실제 비즈니스 로직에서 사용할 상수를 맵핑시켜 줍니다.
    // server/libs/common/src/kafka/src/topics/index.ts
    
    import { NAME } from '../../../../../constants/index';
    
    import { KafkaTopicMessage } from './kafka-topic-message';
    
    const KAFKA_TOPICS = NAME.KAFKA.TOPICS;
    
    export * from './kafka-topic-message';
    
    export const TopicMessageMap = {
      [KAFKA_TOPICS.KAFKA_TOPIC_MESSAGE]: KafkaTopicMessage,
    } as const;
    
    export type TTopicMessageMap =
      (typeof TopicMessageMap)[keyof typeof TopicMessageMap];

    실제 사용 시는 아래처럼 상수로만 토픽메시지를 사용합니다.

     

    • 토픽메세지 벨리데이션하기

    위의 KafkaTopicMessage는 { name, title, eventTime, traceId(optional)} 의 형식으로 메시지를 보내야 합니다. 
    그뿐만 아니라 '타입' 또한 알맞은 타입으로 보내야 하는데요. 만약에, 데이터를 누락하거나 다른 형식으로 메시지를 보내게되면 컨슈머 서버에서는 에러 및 정상적으로 동작하지 않을 수 있습니다. 
    토픽 메세지를 보낼 때 정상적인 메세지를 보낼 수 있도록 밸리데이션을 진행합니다.

    // server/libs/common/src/kafka/src/kafka.service.ts
      async setTopicMessage<T extends keyof typeof TopicMessageMap>(
        topic: T,
        message: InstanceType<(typeof TopicMessageMap)[T]>,
      ): Promise<TTopicMessage> {
        // 토픽메시지 내부 검증
        try {
          await this.validationService.validate({
            type: TopicMessageMap[topic], // 수정된 부분
            value: message,
          });
        } catch (err) {
          throw new CustomError(
            SYSTEM.ERROR.ERROR_CODE.VALIDATION_ERROR,
            `[libs/kafka/kafkaService] ${TopicMessageMap[topic]} validation error!`,
          );
        }
    
        const setTopicName = this._setTopicByEnvironment(topic);
        const topicMessage: TTopicMessage = {
          topic: setTopicName,
          messages: [{ value: JSON.stringify(message) }],
        };
    
        return topicMessage;
      }

     

    예를 들어 만약에 토픽에 해당하는 토픽메시지를 보내지 않는다면 에러가 발생합니다.

    • name, title이 아닌 address, contents로 잘못된 데이터를 보내려고 시도한다면
      자바스크립트와 달리 런타임 전에 타입스크립트가 타입을 체크해 주는 장점을 볼 수 있습니다 :)

     

     

    [2-4]. kafka workflow 관리

    workflow로 관리하여 얻는 이점들은 다음과 같습니다.
    토픽을 어떤 서버의 어떤 비즈니스 로직에서 발행하는가? 토픽을 어떤 서버에서 컨슘을 진행하는가? 에 대한 부분이 명확해질 수 있습니다. 프로젝트가 점점 커지게 되면, 카프카의 토픽을 핸들링하는 관점에서 매번 어디에서 컨슘을 하고 토픽을 생성하는지 찾기 어려움이 발생할 수 있습니다.
    카프카가 비즈니스로직 어디서 사용되고 있는지 workflow로 관리하면 유지보수측면에서 더 나을 수 있습니다. 

     

    • workflow.ts 생성
    // server/libs/common/src/kafka/src/map/index.ts
    
    import { NAME } from '../../../../../constants/index';
    const KAFKA_TOPICS = NAME.KAFKA.TOPICS;
    
    /**,
     * workflow 가 없으면 어떤 토픽이 어디서 생성하고 어디서 컨슘하는지 한번에 확인하기 어려움
     * workflow 를 생성하여 핸들링 관점에서 만들어 놓음
     * 1. server type(name)
     * 2. producer/consumer type
     * 3. 토픽을 생성하거나 컨슘하는 곳의 모듈(폴더명)
     * 4. full file path ( 토픽 생성 및 컨슘하는 파일명)
     * 5. topic
     */
    export const workflow = {
      auth: { // 1
        producer: {}, // 2
        consumer: { // 2
          app: { // 3
            '/app.kafka.controller': { // 4
              KAFKA_TOPIC_MESSAGE: KAFKA_TOPICS.KAFKA_TOPIC_MESSAGE, // 5
            },
          },
        },
      },
      main: {
        producer: {
          app: {
            '/app.controller': {
              KAFKA_TOPIC_MESSAGE: KAFKA_TOPICS.KAFKA_TOPIC_MESSAGE,
            },
          },
        },
        consumer: {
          user: {},
        },
      },
    };

     

    • 생성한 workflow를 비즈니로직에 반영
    // server/apps/auth/src/app.kafka.controller.ts
    
    import { Controller } from '@nestjs/common';
    import { MessagePattern, Payload, Transport } from '@nestjs/microservices';
    
    import { KAFKA_MAP, KafkaTopicMessage } from '@app/common';
    
    // workflow를 통해서 토픽메세지 가져옴
    const KAFKA_CONSUMER_MAP_APP =
      KAFKA_MAP.auth.consumer.app['/app.kafka.controller']; 
    
    @Controller('kafka')
    export class AppKafkaController {
      @MessagePattern(KAFKA_CONSUMER_MAP_APP.KAFKA_TOPIC_MESSAGE, Transport.KAFKA)
      getKafkaConsumer(@Payload() data: KafkaTopicMessage) {
        console.log('### kafka-topic-message consumer => \n', data);
      }
    }

     

     

    3. 테스트하기 (github readme 참고)

     

    [3.1] {prefix}-topic-message로 변경되었기 때문에 migrate.sh 파일을 다시 실행합니다. 

    // 1번째 방법
    cd ~/server/script/kafka
    sh migrate.sh local up
    // sh migrate.sh [local,development, production] [up or down]
    
    
    // 2번째 방법 (package.json에 스크립트를 생성되어있음)
    cd ~/server
    yarn migrate:kafka-topics local up
    // yarn migrate:kafka-topics [local,development, production] [up or down]

     

    [3.2] 서버를 실행합니다.

    cd ~/server
    yarn start:auth // kafka 컨슈머 서버
    yarn start:main // kafka 프로듀서 서버

    [3.3] [POST] /publishToKafka 로 요청합니다.

    • 토픽을 발행하고, 컨슘하는 것을 확인할 수 있습니다.

     

Designed by Tistory.