-
[nestjs] kafka 마이크로서비스 구축하기 - kafka (1/5)nodejs/nestjs 2023. 7. 24. 15:54
소개
이 포스팅은 NestJS와 Kafka를 이용하여 실시간 데이터 처리 시스템을 구축하고자 하는 개발자를 대상으로 합니다.
다음과 같은 독자분들을 대상으로 작성되었습니다.- Node.js 및 TypeScript에 익숙한 개발자
- kafka에 대한 이해가 있는 개발자
- docker에 대한 이해가 있는 개발자
포스팅의 전체 목표는 다음과 같습니다.- nestjs의 공식문서를 기반으로 Microservice기반의 kafka 연동
- kafka 커넥션 핸들링
- kafka 토픽 환경분리
- 마이크로서비스를 로깅을 위한 traceId 연동
- kafka graceful shutdown 구현하기
개요
nestjs의 공식문서를 기반으로 Microservice기반의 kafka 연동을 해볼 예정입니다.
참고
- 해당 포스팅은 소스코드를 기준으로 설명합니다.
git clone -b blog-kafka-1 --single-branch https://github.com/mingoogle/blog-mingoogle-seed.git blog-kafka-1
- 프로젝트 내부의 postman 파일을 import하여 쉽게 테스트 가능합니다.
Part 1 - nestjs의 공식문서를 기반으로 Microservice기반의 kafka 연동하기
1. 설치
- Kafka 기반 마이크로서비스 구축을 시작하려면 먼저 필요한 패키지를 설치합니다.
yarn add @nestjs/microservices --save yarn add kafkajs --save
- kafka 마이그레이션을 위하여 아래 패키지도 추가로 설치합니다.
// https://www.npmjs.com/package/migrate yarn add migrate --save
2. kafka 마이그레이션
kafka 마이그레이션을 파일로 관리하여 버전 관리 및 이력 추적을 기록하고 쉽게 롤백과 롤포워드를 할 수 있습니다.
로컬/개발/운영 등의 환경에서 스키마를 최신 상태로 유지하거나 배포 시 자동으로 마이그레이션을 실행하여 쉽게 각 환경별로 일관성을 유지할 수 있고 여러 개발자가 협업하는 프로젝트에서 마이그레이션 파일은 버전 및 이력을 쉽게 공유할 수 있어 협업 및 배포 시 유용합니다.- 사전에 docker 환경에서 kafka와 zoopker를 실행시킵니다.
cd ~/docker docker-compose up -d --build
- migrate.sh 파일 만듭니다.
- dev/staging/local 등의 환경에 맞게 kafka 서버 세팅을 합니다.
// .env.local or .env.development KAFKA_HOST='localhost:9093' KAFKA_TOPIC_PARTITIONS=6 KAFKA_TOPIC_REPLICATION_FACTOR=1
- migrate.sh 파일을 생성하여 스크립트 파일을 작성합니다.
- 아래 내용은 [server/script/kafka/.env.local or .env.development] 등의 환경을 가지고 kafka.js파일을 실행
#!/bin/bash GIT_ROOT_PATH=`git rev-parse --show-toplevel` RUN_PATH=${GIT_ROOT_PATH}/server/script/kafka NODE_PATH=${GIT_ROOT_PATH}/server # node_modules install cd ${NODE_PATH}; yarn install; cd ${RUN_PATH}; DEFAULT_STATE_FILE=".migrate" ENV_STATE_EXT=".local" ARGUMENT_COUNT=$# ENV=$1 COMMAND=$2 MIGRATION_FILE=$3 if [ "${ARGUMENT_COUNT}" -eq 0 ]; then echo "./migrate.sh [local/development/staging/production] [up/down/list] [migration file]" exit 1 fi if [ "${ENV}" = "production" ]; then ENV_STATE_EXT=".prod" elif [ "${ENV}" = "staging" ]; then ENV_STATE_EXT=".staging" elif [ "${ENV}" = "development" ]; then ENV_STATE_EXT=".dev" elif [ "${ENV}" = "local" ]; then ENV_STATE_EXT=".local" else echo "./migrate.sh [local/development/staging/production] [up/down/list] [migration file]" exit 1 fi if [ "${COMMAND}" != "down" -a "${COMMAND}" != "up" -a "${COMMAND}" != "list" ]; then echo "./migrate.sh [local/development/staging/production] [up/down/list] [migration file]" exit 1 fi if [ "${COMMAND}" = "down" ]; then if [ "${MIGRATION_FILE}" = "" ]; then MIGRATION_FILE="latest-file" fi fi echo "../../node_modules/.bin/migrate ${COMMAND} ${MIGRATION_FILE} -f ${DEFAULT_STATE_FILE}${ENV_STATE_EXT} --env .env${ENV_STATE_EXT}" ../../node_modules/.bin/migrate ${COMMAND} ${MIGRATION_FILE} -f ${DEFAULT_STATE_FILE}${ENV_STATE_EXT} --env ".env"${ENV_STATE_EXT} --compiler="ts:./typescript-compiler.js"
- kafka.ts 파일을 생성합니다.
참고로 migrate.sh파일에 의해서 루트프로젝트의 .env환경변수들이 .env.xxx파일에 선언된 환경변수 값으로 병합됩니다.
.env.local파일의 kafka 환경변수를 가져온 뒤 admin계정으로 접속하여 토픽을 생성을 진행하는 코드입니다.
// kafka.ts const { map } = require('lodash'); const { Kafka } = require('kafkajs'); const { KAFKA_HOST, KAFKA_TOPIC_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR } = process.env; export const kafka = new Kafka({ clientId: 'seed-kafka-topic-migrate', brokers: [KAFKA_HOST], }); const admin = kafka.admin(); // 실제 Kafka Topic 생성을 위한 배열 const getTopicList = (topics) => map(topics, (newTopic) => ({ topic: newTopic.topic, numPartitions: newTopic.numPartitions || KAFKA_TOPIC_PARTITIONS, replicationFactor: newTopic.replicationFactor || KAFKA_TOPIC_REPLICATION_FACTOR, })); export const adminConnect = async () => { await admin.connect(); }; // NOTE: 연결 종료는 현재 사용하지 않는다. 서버에서 자동 연결 종료되는 것으로 처리한다. export const adminDisconnect = async () => { try { await admin.disconnect(); } catch (err) { process.exit(0); } }; export const createTopics = async (topics) => { const topicList = getTopicList(topics); try { await admin.createTopics({ topics: topicList, }); } catch (err) { console.log(err); // NOTE: 마이그레이션 실행 중 에러가 발생하면 프로그램을 종료한다. migrate 모듈에서는 에러를 무시하고 계속 실행한다. process.exit(0); } }; export const deleteTopics = async (topics) => { try { await admin.deleteTopics({ topics, }); } catch (err) { console.log(err); process.exit(0); } }; export const listTopics = async (topics = {}) => { try { const results = await admin.fetchTopicMetadata(topics); console.log(results, 'topic list'); return results; } catch (err) { console.log(err); } };
- migrations폴더에 마이그레이션 파일을 생성 후, 생성할 토픽들을 작성합니다.
// 1600409610677-add-initial-topics.ts import { createTopics, adminConnect, deleteTopics } from '../kafka'; const topics = [ { topic: 'kafka-topic-message', // 'kafka-topic-message'라는 토픽을 생성 }, ]; export const up = async () => { await adminConnect(); await createTopics(topics); }; export const down = async () => { const deleteTopicList = topics.map((topic) => topic.topic); await deleteTopics(deleteTopicList); };
- migrate.sh 파일을 실행합니다.
// 1번째 방법 cd ~/server/script/kafka sh migrate.sh local up // sh migrate.sh [local,development, production] [up or down] // 2번째 방법 (package.json에 스크립트를 생성되어있음) cd ~/server yarn migrate:kafka-topics local up // yarn migrate:kafka-topics [local,development, production] [up or down]
- 실행 결과를 확인합니다.
정상적으로 실행된다면 .migrate.xxxx파일이 생성되며, 마이그레이션 시점 및 이력들을 확인할 수 있습니다.
- 카프카 서버에 정상적으로 토픽이 생성되었는지 확인합니다.
// 카프카 or 주키퍼 컨테이너 접속 후 다음 명령어 실행 kafka-topics --list --zookeeper zookeeper:2181
- 추후, 정상적으로 토픽이 생성되었는지 확인하기 위하여 다음 명령어를 입력 후 아래 내용을 진행하시면 좋습니다.
kafka-console-consumer --topic kafka-topic-message --from-beginning --bootstrap-server kafka:9092
3. 카프카 producer 서버 만들기
- main 서버의 app.module.ts 코드에 kafka clientModule를 import 합니다.
// server/apps/main/src/app.module.ts import { Module } from '@nestjs/common'; import { ClientsModule, Transport } from '@nestjs/microservices'; import { LibsModule } from '@app/lib'; import { AppController } from './app.controller'; import { AppService } from './app.service'; @Module({ imports: [ LibsModule, ClientsModule.register([ { name: 'KAFKA_SERVICE', transport: Transport.KAFKA, options: { client: { clientId: 'seed', brokers: ['localhost:9093'], }, consumer: { groupId: 'seed-groupId', }, }, }, ]), ], controllers: [AppController], providers: [AppService], }) export class AppModule {}
- 토픽을 생성하는 인터페이스를 만들어줍니다.
[POST] /publishToKafka
// server/apps/auth/src/app.controller.ts import { Controller, Get, Inject, Post } from '@nestjs/common'; import { ClientKafka } from '@nestjs/microservices'; import { AppService } from './app.service'; @Controller() export class AppController { constructor( private readonly appService: AppService, @Inject('KAFKA_SERVICE') private readonly kafkaClient: ClientKafka, ) {} @Get() getHello(): string { console.log('### [start] main server hello =>'); return this.appService.getHello(); } @Post('/publishToKafka') async publishToKafka(): Promise<any> { try { await this.kafkaClient.connect(); await this.kafkaClient.emit('kafka-topic-message', { message: '토픽 메세지입니다.', }); // await this.kafkaClient.close(); return { message: 'Message published successfully' }; } catch (err) { console.log('### err =>', err); return { error: 'Failed to publish message' }; } } }
4. 카프카 consumer 서버 만들기nestjs에서의 마이크로서비스는 기본적으로 HTTP가 아닌 다른 전송 계층(transport layer - kafka, grpc..)을 사용하는 애플리케이션을 의미합니다.
해당 포스팅에서는 nestjs의 Hybrid application형태로 컨슈머 서버를 만들도록 하겠습니다.
Hybrid application는 HTTP 요청을 수신하는 것과 동시에 연결된 마이크로서비스를 사용하는 애플리케이션을 의미합니다. INestApplication 인스턴스는 connectMicroservice() 메서드를 통해 INestMicroservice 인스턴스와 연결할 수 있습니다.- auth 서버의 main.ts에 connectMicroservice() 메서드를 이용하여 마이크로서비스를 연동합니다.
// server/apps/auth/src/main.ts import { NestFactory } from '@nestjs/core'; import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { AppModule } from './app.module'; import { ConfigurationService, LoggerService } from '@app/common'; async function bootstrap() { const app = await NestFactory.create(AppModule, { bufferLogs: true, }); const configurationService = app.get(ConfigurationService); const serverName = configurationService.get<string>('AUTH_SERVER_NAME') || 'auth'; const port = configurationService.getServerPort(serverName); const NODE_ENV = configurationService.get<string>('NODE_ENV'); // microservice app.connectMicroservice<MicroserviceOptions>({ transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9093'], clientId: 'seed', }, consumer: { groupId: 'seed-groupId', }, }, }); await app.startAllMicroservices(); await app.listen(port, () => { app .get(LoggerService) .info( `[server] serverName: ${serverName}, starting: ${port}, NODE_ENV: ${NODE_ENV}`, ); }); } bootstrap();
- 토픽을 컨슘 할 컨트롤러를 생성해 줍니다.
// server/apps/auth/src/app.kafka.controller.ts import { Controller } from '@nestjs/common'; import { MessagePattern, Payload, Transport } from '@nestjs/microservices'; @Controller('kafka') export class AppKafkaController { @MessagePattern('kafka-topic-message', Transport.KAFKA) getKafkaConsumer(@Payload() data: any) { console.log('### kafka-topic-message consumer => \n', data); } }
- 생성한 컨트롤러 파일을 app.module.ts에 선언해 줍니다.
// server/apps/auth/src/app.module.ts import { Module } from '@nestjs/common'; import { LibsModule } from '@app/lib'; import { AppController } from './app.controller'; import { AppService } from './app.service'; import { AppKafkaController } from './app.kafka.controller'; @Module({ imports: [LibsModule], controllers: [AppController, AppKafkaController], providers: [AppService], }) export class AppModule {}
5. 테스트하기 (github readme 참고)
- 서버를 실행합니다.
cd ~/server yarn start:auth // kafka 컨슈머 서버 yarn start:main // kafka 프로듀서 서버
- [POST] /publishToKafka 로 요청합니다.
- auth 서버에서 토픽메시지를 컨슘 한 로그를 확인할 수 있습니다.
- kafka 컨테이너에서 토픽이 정상적으로 생성된 로그를 볼 수 있습니다.
참고
'nodejs > nestjs' 카테고리의 다른 글
[nestjs] 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅 - kafka (5/5) (0) 2024.05.18 [nestjs] 마이크로서비스 환경에서 로그 추적을 위한 traceId 세팅 - kafka (4/5) (0) 2023.07.26 [nestjs] kafka 커넥션 핸들링 및 topicMessage prefix 구현 - kafka (2,3/5) (0) 2023.07.25 [ Node js ] Passport 미들웨어 (1) - 패스포트 무엇인가? (2) 2020.12.06 - Node.js 및 TypeScript에 익숙한 개발자