더 좋은 퍼포먼스를 만들어 가는 개발자가 되기 위해 공부하는 daco입니다.

GitHub: https://github.com/mooncw

 

About Me

6회 이상의 프로젝트를 통한 AI 모델링, 데이터 파이프라인 구축, 백엔드 시스템 개발 경험이 있습니다.
AI 모델 성능 8% 개선, 쿼리 최적화로 특정 서비스 실행 속도 94% 개선 등 다양한 개선 경험이 있습니다.
팀원들 간의 활발한 커뮤니케이션을 통하여 다양한 문제 해결 경험이 있습니다.

 

Side Projects

AI고 내 8자야
2024.8 - 2024.09
GitHub: https://github.com/mooncw/final-8-backend
[프로젝트 소개]
한국광고자율심의기구의 심의 사업에 더 나은 솔루션을 제공하기 위한 웹

[주요 업무]
- 프로젝트 초기 세팅
- REST API 및 테스트 코드 구현
- JWT를 활용한 인증 기능 구현 및 RTR 기법 구현
- CoolSms를 이용한 연락처 인증 기능 구현
- JdbcTemplate를 이용한 가짜 데이터 조작하는 로직 구현
- QueryDsl을 이용한 쿼리 최적화로 로직 실행 속도 94% 개선
KernelSquare
2023.12 - 2024.04
GitHub: https://github.com/mooncw/f1-KernelSquare-backend
[프로젝트 소개]
지속가능한 성장을 위한 개발자 커뮤니티

[주요 업무]
- REST API 및 테스트 코드 구현
- 채팅 서버 구현
- 알림 서버 구현
- 무중단 배포 구현
- 쿼리 성능, 로직 실행 속도 개선
PFSS
2023.04 - 2023.05
GitHub: https://github.com/mooncw/PFSS
[프로젝트 소개]
전력 설비 실시간 상태 모니터링

[주요 업무]
- 가상 센서 구현
- Kafka와 Spark를 이용한 스트림 처리 구현
- 스트림 처리한 데이터 시각화
PowerSOH
2022.10 - 2022.10
GitHub: https://github.com/mooncw/powersoh
[프로젝트 소개]
전력설비의 효율성 유지를 위한 이상 탐지 모델

[주요 업무]
- 공공 데이터 수집
- 데이터 분석, Feature Engineering, 모델링 싸이클
- 싸이클을 돌면서 모델 성능 개선

 

Education

패스트캠퍼스 Kernel 360
2023.10.10 - 2024.04.18
- 실전 경험을 위한 팀 프로젝트 4회


코드스테이츠 AI 부트캠프
2022.01.18 - 2022.08.11
- 데이터 전처리 & EDA, 머신러닝, 데이터 엔지니어링, 딥러닝, 컴퓨터 공학 기본

백엔드 서버와 프런트엔드 서버가 따로 있다면 REST API를 많이 사용할 거라 생각합니다.

이 때, API는 알겠는데, REST가 뭘까라는 생각이 들 수 있을 것 같습니다.

저역시 그랬었고, REST API에 대한 개념을 알고자 글을 작성하게 되었습니다.

 

REST란?

자원을 이름으로 구분하여 처리한다 것을 의미합니다.

즉, REST는 다음과 같은 과정을 거치게 됩니다.

  • HTTP URI를 통해 자원을 식별하고,
  • HTTP Method를 통해,
  • 해당 자원에 대한 CURD 동작

REST 특징

  • Server-Client(서버-클라이언트 구조): 요청을 하는 클라이언트와 요청을 처리하는 서버 구조
  • Stateless(무상태): 클라이언트 상태를 서버에 저장하지 않음
  • Cachealbe(캐시 가능): HTTP가 가진 특징 중 하나인 캐싱 기능 적용 가능
  • Layered System(계층화): REST API 서버는 다중 계층으로 구성 가능
  • Uniform Interface(인터페이스 일관성): 자원 접근 방식의 일관성 및 자원 조작 방법은 자원의 표현에 독립적

REST API란?

REST API는 이러한 REST을 기반으로 한 API 입니다.

REST API 설계 규칙

  • URI는 슬래시 구분자(/)로 계층 관계 표현
  • URI 마지막에 슬래시(/)를 포함하지 않음
  • 긴 URI에 대한 가독성은 하이픈(-)을 사용
  • URI는 소문자만 사용
  • URI에 파일 확장자를 포함하지 않음
  • URI는 동사보다 명사를 사용
  • 자원에 대한 행위는 HTTP Method로 표현

RESTful이란?

REST API 설계 규칙을 잘 따르는 것을 RESTful하다라고 합니다.

RESTful한 API를 구현하는 것의 목적은 일관적인 컨벤션을 통한 API의 이해도 및 호환성을 높이는 것이기 때문에, 성능이 중요한 상황에서는 REST API 설계 규칙을 모두 따를 필요는 없다고 합니다.

'채워가는 지식 > 네트워크' 카테고리의 다른 글

프락시  (0) 2024.02.15
STOMP 웹소켓 프로그래밍  (0) 2024.02.15
웹 서버  (0) 2023.12.29
커넥션  (0) 2023.12.20
HTTP 메시지  (0) 2023.12.18

https://school.programmers.co.kr/learn/courses/30/lessons/64063

 

프로그래머스

코드 중심의 개발자 채용. 스택 기반의 포지션 매칭. 프로그래머스의 개발자 맞춤형 프로필을 등록하고, 나와 기술 궁합이 잘 맞는 기업들을 매칭 받으세요.

programmers.co.kr


1차 시도

방 번호를 key를 갖고 바로 다음 방 번호를 value로 두고 찾아보자는 생각으로 구현

def solution(k, room_number):
    room_dict = dict()
        
    for room in room_number:
        while room in room_dict.keys():
            room = room_dict[room]
        room_dict[room] = room + 1
    
    return list(room_dict.keys())

결과: 78.8

정확성만 통과

 

다른 사람 코드

import sys
sys.setrecursionlimit(1012)

def solution(k, room_number):
    room_dict = dict()
        
    for room in room_number:
        find_empty_room(room, room_dict)
            
    return list(room_dict.keys())

def find_empty_room(room_num, room_dict):
    if room_num not in room_dict.keys():
        room_dict[room_num] = room_num + 1
        return room_num
    empty_room = find_empty_room(room_dict[room_num], room_dict)
    room_dict[room_num] = empty_room + 1
    return empty_room

결과: 100

정확성과 효율성 모두 통과

 

1차 시도와 차이점

저의 코드는 value를 바로 다음 방 번호를 넣기 때문에 다음 가능한 방 번호를 찾을 때 어떠한 경우라도 1씩 증가시키면서 찾습니다.

다른 사람 코드는 value를 이전까지 찾았던 배정 가능한 빈 방 번호를 넣기 때문에 1씩 증가시키는 것보다 탐색 경로가 압축이 되기 때문에 더 빠르게 됩니다.

'프로그래머스-파이썬' 카테고리의 다른 글

쿠키 구입  (0) 2024.05.14
도둑질  (0) 2024.05.02
무지의 먹방 라이브  (0) 2024.04.09
테이블 해시 함수  (0) 2022.12.29
카드짝 맞추기  (0) 2022.11.30

https://school.programmers.co.kr/learn/courses/30/lessons/42891#

 

프로그래머스

코드 중심의 개발자 채용. 스택 기반의 포지션 매칭. 프로그래머스의 개발자 맞춤형 프로필을 등록하고, 나와 기술 궁합이 잘 맞는 기업들을 매칭 받으세요.

programmers.co.kr

문제를 요약하자면 k초까지 음식을 먹고 난 다음 차례를 구하는 문제라 할 수 있을 것 같습니다.

 

1차 시도

food_times를 매번 다 돌기보다는 음식이 남아있는 인덱스만 돌자라는 생각으로 구현했습니다.

def solution(food_times, k):
    N = len(food_times)
    
    leftovers = [idx for idx in range(N)]
    
    cur = 0
    
    while 0 <= k:    
        if leftovers == []:
            return -1
        
        repeat_leftovers = leftovers.copy()
        
        for left in repeat_leftovers:
            if k == 0:
                return left+1
            
            food_times[left] -= 1
            k -= 1
            
            if food_times[left] == 0:
                leftovers.remove(left)

 

결과

 

정확성은 통과하였지만 효율성에서 다 실패했습니다.

 

2차 시도

queue를 이용해보자는 생각으로 구현했습니다.

import queue

def solution(food_times, k):
    food_queue = queue.Queue()
    for idx, food_time in enumerate(food_times):
        food_queue.put((food_time, idx+1))
    
    while not food_queue.empty():
        food_time, idx = food_queue.get()
        if k == 0:
            return idx
        food_time -= 1
        k -= 1
        if food_time:
            food_queue.put((food_time, idx))
    return -1

 

결과

가독성은 늘어났으나 실행시간은 더 느려졌습니다. (1차 시도와 마찬가지로 정확성만 통과)

 

3차 시도

다른 사람의 코드를 참고하였고, 음식을 섭취하는데 걸리는 시간을 오름차순으로 정렬한 후,

현재 음식의 섭취하는데 걸리는 시간과 다음 음식의 섭취하는데 걸리는 시간을 비교하여 for문을 도는 횟수를 줄이자는 생각으로 구현했습니다.

import heapq

def solution(food_times, k):
    if sum(food_times) <= k:
        return -1
    
    food_queue = []
    for idx, food_time in enumerate(food_times):
        heapq.heappush(food_queue, (food_time, idx+1))
    
    times = 0
    previous_time = 0
    left_food_count = len(food_queue)
    
    while times + (food_queue[0][0] - previous_time) * left_food_count <= k:
        food_time = heapq.heappop(food_queue)[0]
        times += (food_time - previous_time) * left_food_count
        left_food_count -= 1
        previous_time = food_time
        
    leftovers = sorted(food_queue, key=lambda x: x[1])
    return leftovers[(k-times)%left_food_count][1]

 

결과

정확성과 효율성 모두 통과했습니다.

 

설명

heapq를 이용하여 food_queue를 섭하는데 걸리는 시간을 기준으로 우선순위 큐를 만듭니다.

 

현재까지 섭취한 시간인 times, 이전까지 섭취한 시간인 previous_time, 남은 음식 개수인 left_food_count를 초기화합니다.

while문으로 현재 음식과 다음 음식의 섭취 시간 차이 * 남은 음식 수가 현재 섭취 시간에 더해질 때 시간과 k초를 비교합니다.

※ 여기서 food_queue[0][0]이 다음 음식 섭취시간이고, previous_time가 현재 음식 섭취시간입니다.

 

k초를 넘지 않는다면, 다음 음식을 다 섭취하는 루틴으로 넘어가게 됩니다.

이 루틴에서 food_queue에서 다음 음식을 빼내고, times가 다음 음식을 다 섭취할 때의 루틴의 섭취 시간으로 초기화 되고, left_food_count에 1을 빼고, previous_time을 다음 음식 섭취 시간으로 초기화합니다.

 

k초를 넘는다면, k초에 times를 빼고 left_food_count로 나눈 나머지의 위치에 있는 음식 번호를 return 해주게 됩니다.

 

만약 전체 음식을 다 섭취하는데 걸리는 시간이 k초보다 작다면 이후 로직을 탈 필요가 없기 때문에,

맨위에서 if  sum(food_times) <= k의 조건문을 추가했습니다.

'프로그래머스-파이썬' 카테고리의 다른 글

도둑질  (0) 2024.05.02
호텔방 배정  (0) 2024.04.11
테이블 해시 함수  (0) 2022.12.29
카드짝 맞추기  (0) 2022.11.30
블록 이동하기  (1) 2022.11.09

문제 인식

저는 운영 서버에 같은 서비스 기능을 하는 스프링부트 서버 2개를 띄우고 Nginx 서버에서 로드밸런싱을 하고 있었습니다.

팀에서 Oauth2로 소셜 로그인을 구현이 되어 개발 서버에서 테스트하고 운영 서버에 반영을 했습니다.

그런데, 개발 서버에서 잘 되었던 소셜 로그인이 확률적으로 성공하는 현상이 발생했습니다.

확률적으로 성공하는 소셜 로그인은 실제 서비스에선 있을 수 없는 일이라 생각했기 때문에 빠르게 해결하고자 했습니다.

 

문제 파악

구현한 소셜 로그인 동작에 대해 생각해보니 백엔드 서버는 크게 3가지 요청이 있었습니다.

  • 클라이언트에게 위임 요청
  • 받은 위임으로 인증 서버에 토큰 요청
  • 받은 토큰으로 리소스 서버에 사용자 정보 요청

그렇다면 받은 위임과 받은 토큰으로 요청하는 것이기 때문에 이전 요청에 대한 응답을 가지고 있어야 정상적으로 작동할 것이라 생각했습니다.

즉, Nginx의 로드밸런싱으로 중간에 원래 소셜 로그인 동작을 하던 스프링부트가 아닌 다른 포트 번호의 스프링부트 서버로 요청을 했기 때문에 일어난 일이라 생각할 수 있었습니다.

 

문제 해결

저는 Nginx에서 특정 요청은 특정 포트 번호의 서버로 요청이 가게끔하면 되지 않을까라는 생각으로 해당 방향으로 알아봤습니다.

알아보니, Nginx에서 ip_hash라는 것을 사용하면 특정 ip는 같은 포트 번호로 요청을 보낼 수 있는 방법이 있었습니다.

적용한 결과, 운영 서버에서도 정상적으로 소셜로그인을 할 수 있게 되었습니다.

 

※ 개발 서버에서는 비용 문제로 1개의 스프링부트 서버만 띄우고 있어서 잘 동작한 것이었습니다.

 

 

개선점

ip_hash는 한 지역에서 많은 트래픽을 보내면 한쪽 서버으로 트래픽이 몰릴 확률이 존재합니다.

그래서 OAuth 인증 서버에서 받은 토큰을 Redis 같은 스토리지에 저장하여 공유하도록 하면 한 지역에서 많은 트래픽을 보내도 부하 분산이 가능하고 제가 겪었던 문제도 해결할 수 있는 좋은 방법이 될 것이라 생각합니다.

스프링 부트에서 영속성 컨텍스트는 Persistence Context라 하며, 데이터베이스와 관련된 작업을 할 때 사용되는 중요한 개념입니다.

영속성 컨텍스트는 엔티티 객체를 관리하고 데이터베이스와의 상호작용을 관리하는 역할을 합니다.

이를 통해 데이터베이스와의 효율적인 상호작용 및 데이터 변경을 추적할 수 있습니다.

 

※ 여기서 엔티티는 데이터베이스에서 가져온 데이터를 나타내는 자바 객체를 말합니다.

 

일반적으로 스프링 부트는 JPA, Hibernate와 같은 영속성 프레임워크를 사용하여 엔티티를 관리합니다.

 

위 이미지에 대해 제가 간략히 이해한바로 말씀드리자면,

  • JDBC: java로 데이터베이스를 조작할 수 있게 해주는 인터페이스
  • Hibernate: JDBC를 보다 더 간단하고 쉽게 사용할 수 있게 해주는 인터페이스
  • JPA: Hibernate를 보다 더 간단하고 쉽게 사용할 수 있게 해주는 인터페이스

※ EntityManager는 영속성 컨텍스트에서 엔티티를 관리하는 객체입니다.

 

영속성 컨텍스트 특징

1. 엔티티 관리

애플리케이션에서 작업하는 엔티티 객체를 관리하여, 객체의 상태 변화를 추적하고, 데이터베이스와의 동기화를 담당합니다.

2. 엔티티 캐싱

데이터베이스로부터 가져온 엔티티를 key는 id값, value는 entitiy인 Map 형태로 캐싱하여 성능을 향상시킵니다.

이렇게 캐싱하는 것을 1차 캐싱이라고도 합니다.

따라서 동일한 엔티티 조회 시에는 캐시된 엔티티를 반환하여 데이터베이스 부하를 줄일 수 있습니다.

3. 트랜잭션 지원

영속성 컨텍스트는 트랜잭션 범위 내에서 동작합니다.

즉, 트랜잭션이 시작되고 종료될 때까지 영속성 컨텍스트는 엔티티 상태를 추적하고, 변경 내용을 데이터베이스에 반영하지 않습니다.

트랜잭션을 롤백할 경우 영속성 컨텍스트도 변경된 상태를 롤백합니다.

4. 지연 로딩(Lazy Loading)과 즉시 로딩(Eager Loading)

영속성 컨텍스트는 연관된 엔티티를 지연 로딩하거나 즉시 로딩하는 방식으로 데이터를 로드합니다.

이를 통해 애플리케이션의 성능을 최적화할 수 있습니다.

5. 더티 체킹(Dirty Checking)

JPA에서 엔티티를 조회하면 해당 엔티티의 조회 상태 그대로 스냅샷을 만들어 놓습니다.

트랜잭션이 끝나는 시점에서 이 스냅샷과 비교해서 다른 점이 있다면 Update 쿼리를 데이터베이스로 전달합니다.

이러한 더티 체킹은 영속성 컨텍스트가 관리하는 엔티티에만 적용됩니다.

'채워가는 지식 > Spring Boot' 카테고리의 다른 글

JPA란?  (0) 2024.04.17
커넥션 풀이란?  (0) 2024.04.17
스프링에서 프록시와 AOP란?  (0) 2024.03.04
Bean에 대하여...  (0) 2024.02.16

문제 인식

저는 필요로 인해 유저 서비스에 포함되어 있던 채팅 기능을 채팅 서버로 분리하였습니다.

그 이후 특정 상황에서 채팅 서버에서 카프카 컨슈머 역직렬화에 실패하였다는 에러 로그가 엄청나게 빨리 쌓여 채팅 서버가 존재하는 ec2 인스턴스 서버에 에러 로그가 30초도 안되서 1GB가 쌓여서 디스크 공간이 매우 빠른 속도로 고갈되는 문제가 발생했습니다.

이 문제는 해당 ec2 인스턴스는 채팅 서버 뿐만 아니라 다른 서버도 존재하였기 때문에 다른 서버에 영향을 끼치고, ec2 데이터 쓰기 비용이 크게 증가하기 때문에 빠르게 해결해야 했습니다.

 

문제 파악

알아보니 로그가 빨리 쌓이는 이유는 스프링 카프카는 컨슈머에서 들어온 메시지의 역직렬화에 실패하면, 해당 메시지에 대하여 역직렬화를 짧은 시간 내에 다시 시도합니다.

해당 에러에 대한 처리가 없다면 무한 루프가 되어 엄청난 속도로 에러 로그가 쌓이게 되는 것이었습니다.

 

다음으로 역직렬화에 실패한 이유를 파악하기 위해 에러 로그를 확인 했고, 에러 로그는 다음과 같았습니다.

Caused by: java.lang.ClassNotFoundException: com.kernelsquare.memberapi.domain.coffeecaht.dto.ChatMessageRequest

 

해당 로그를 통해서 스프링 카프카는 프로듀서에서 클래스에 대한 정보도 같이 메시지를 직렬화하여 넣는다는 것을 알게 되었습니다.

그렇기 때문에, 유저 서버에서 해당 메시지를 보내면 채팅 서버에도 똑같이 com.kernelsquare.memberapi.domain.coffeechat.dto 위치에 ChatMessageRequest 클래스가 있어야 컨슈머에서 역직렬화가 가능합니다.

 

그런데 저는 채팅 서버에서 다른 위치의 ChatMessageRequest 클래스를 사용하기 때문에 역직렬화에 실패한 것입니다.

 

문제 해결

저는 다음 3가지 해결 방법이 떠올랐습니다.

  • 첫 번째, 채팅 서버에도 똑같은 위치에 ChatMessageRequest 클래스를 생성하는 방법
  • 두 번째, 유저 서버에서 메시지를 보낼 때, 클래스 정보는 넣지 않도록 설정하는 방법
  • 세 번째, 유저 서버의 ChatMessageRequest와 채팅 서버의 ChatMessageRequest를 매핑할 수 있도록 설정하는 방법

저는 다음 2가지 이유로 세 번째 방법을 선택했습니다.

  • 매핑하는 방법이 채팅 서버 패키지 구조과 유저 서버 패키지 구조에 의존적이지 않아도 되기 때문입니다.
  • 매핑하는 방법이 프로듀서 측 클래스 정보도 포함할 수 있기 때문입니다.

매핑하는 방법은 프로듀서와 컨슈머에서 매핑 설정을 해줘야 하는데, yml 파일에서 하는 방법과 코드에서 하는 방법이 있었습니다.

저는 이미 프로듀서와 컨슈머 설정을 코드로 구현했기 때문에 코드에서 매핑 설정을 추가하기로 했습니다.

 

프로듀서

    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        config.put(JsonSerializer.TYPE_MAPPINGS,
        	"ChatMessageRequest:com.kernelsquare.memberapi.domain.coffeechat.dto.ChatMessageRequest");
        return new DefaultKafkaProducerFactory<>(config);
    }

 

컨슈머

	public ConsumerFactory<String, ChatMessageRequest> consumerFactory(String groupId) {
		Map<String, Object> config = new HashMap<>();

		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
		config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		config.put(JsonDeserializer.TYPE_MAPPINGS,
			"ChatMessageRequest:com.kernelsquare.chattingapi.domain.chatting.dto.ChatMessageRequest");
		config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
		config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

		return new DefaultKafkaConsumerFactory<>(config);
	}

 

매핑 설정은 사용한 Serializer과 Deserializer의 TYPE_MAPPINGS에 "매핑할 이름:위치를 포함한 클래스명" 형태로 넣어주면 됩니다.

 

프로듀서와 컨슈머에 매핑 설정한 결과, 에러없이 잘 작동함을 확인했습니다.

 

더 나아가...

하지만 저는 여기까지는 반만 해결한 것이라 생각했습니다.

 

왜냐하면 모종의 이유(?)로 컨슈머에서 역직렬화하지 못하는 메시지가 메시지 브로커에 들어간다면 무한 루프로 에러 로그를 찍게 될 것이라 생각했기 때문입니다.

 

그렇다면 해당 메시지는 무시하고 다음 메시지로 넘어가게 할 수 있다면 되지 않을까라는 생각을 하게 되었습니다.

그래서 찾아보니 ErrorHandlingDeserializer라는 것을 사용하면 역직렬화에 실패하면 null을 반환하기 때문에 다음 메시지로 넘어갈 수 있다는 글이 있었습니다.

그래서 저는 ErrorHandlingDeserializer를 사용하여 컨슈머 설정을 다음과 같이 했습니다.

	public ConsumerFactory<String, ChatMessageRequest> consumerFactory(String groupId) {
		Map<String, Object> config = new HashMap<>();

		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
		config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
		config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
		config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
		config.put(JsonDeserializer.TYPE_MAPPINGS,
			"ChatMessageRequest:com.kernelsquare.chattingapi.domain.chatting.dto.ChatMessageRequest");
		config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
		config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

		return new DefaultKafkaConsumerFactory<>(config);
	}

 

기존 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG와 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG에 ErrorHandlingDeserializer.class를 넣고,

ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS와 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS 에 기존에 사용하던 Deserializer.class를 넣어주면 됩니다.

 

이 컨슈머 설정을 적용한 후 저는 에러 로그를 한번 찍고 다음 메시지를 처리할 수 있게 되었습니다.

 

개인적인 생각

ErrorHandlingDeserializer를 적용함으로써 에러 로그를 한 번 찍고 다음 메시지로 넘어갈 수 있었지만, 그 한 번의 에러 로그가 엄청 길어서, 이것을 커스텀하여 핵심 내용만 로그에 남길 수 있다면 가독성 측면과 리소스 측면에서 나아질 것이라 생각이 들었습니다.

 

역직렬화에 실패한 메시지에 대한 관리를 따로 할 수 있다면, 나중에 해당 메시지들에 대해 파악할 수 있지 않을까 생각합니다.

문제 인식

저희 프로젝트에 STOMP + 웹소켓 + 카프카 + 몽고 DB를 사용한 채팅 기능을 구현 했습니다.

채팅방에 처음 입장할 때 입장하기 메시지를 채팅방에 있는 사람에게 보내는데, 해당 메시지를 수신하는데 5분이 걸리는 문제가 있었습니다.
채팅은 실시간성이 중요한 기능이기에 반드시 해결해야하는 문제였습니다.

 

문제 파악

채팅 메시지는 STOMP의 pub/sub 구조와 카프카의 메시지 브로커를 통해 전달이 됩니다.

 

당시 입장 메시지 데이터 흐름

  • 채팅방에 입장하면 서버와 클라이언트는 웹소켓 연결을 맺고 입장 메시지를 서버에게 보냅니다.
  • 서버는 받은 입장 메시지를 메시지 핸들러를 거치고 카프카의 해당 채팅방 토픽으로 보냅니다.
  • 채팅방 토픽을 구독하고 있는 카프카 리스너에서 입장 메시지를 받습니다.
  • 카프카에게 받은 메시지를 몽고 DB에 저장합니다.
  • 몽고 DB에 저장한 후 해당 채팅방을 구독하고 있는 클라이언트에게 보냅니다.

저는 먼저 PUB, SUB, 메시지 브로커 중에 무엇이 문제인지 확인하기 위해 카프카의 해당 토픽을 확인했습니다.

토픽을 확인한 결과, 메시지는 잘 들어왔고 SUB 부분에 문제가 있다고 판단했습니다.

 

문제 위치를 정확히 파악을 하기 위해 카프카 리스너와 몽고 DB 저장 로직 사이, 몽고 DB 저장 로직과 구독자에게 메시지를 보내는 로직 사이에 메시지를 출력하는 로그를 추가했습니다.

그런 다음 실행한 결과, 카프카 리스너에서 데이터를 늦게 받아서 생긴 문제임을 확인했습니다.

 

왜 카프카 리스너에서 데이터를 늦게 받는지 조사한 결과, 카프카 컨슈머 리밸런싱과 관련이 있었습니다.

메시지 브로커는 해당 토픽의 각 파티션마다 메시지를 어떤 컨슈머에게 보낼 것인지를 정해야 합니다.

즉, 컨슈머 그룹 내의 컨슈머 수에 변화가 있거나, 토픽의 파티션이 변경이 되었을 때 리밸런싱을 통해 파티션 별로 컨슈머를 재할당합니다.

이 리밸런싱이 일어나게 되면 리밸런싱이 일어난 컨슈머 그룹 내의 모든 컨슈머의 읽기 작업이 중단됩니다.

 

저는 채팅방에 입장 했을 때, 특정 채팅방을 위한 토픽을 새로 만들고 있었습니다.

그래서 만들어진 토픽의 파티션에 컨슈머를 할당하기 위해 리밸런싱이 일어나게 되었고, 카프카 리스너에서 바로 메시지를 받지 못한 것입니다.

 

문제 해결

해결 방법으로 다음 2가지가 떠올랐습니다.

  • 첫 번째, 채팅방을 사용하려면 예약을 먼저 해야했기 때문에 예약할 때 미리 토픽을 생성하는 방법
  • 두 번째, 채팅은 하나의 토픽으로만 사용하는 방법

저는 다음 2가지 이유로 두 번째 방법을 채택했습니다.

  • 예약을 취소하면 토픽을 생성한 비용이 낭비가 되고, 토픽을 삭제하지 않는 한 토픽에 대한 정보가 이전보다 더 쌓일 것이라 생각했습니다.
  • 리밸런싱은 카프카 성능에 부정적인 영향을 주기 때문에 최대한 안일어나는 것이 좋다는 글을 보았습니다.

그래서 저는 서버를 실행할 때, chat 토픽이 없으면 생성하게끔 하고 모든 채팅방은 chat 토픽을 이용하게 변경하였습니다.

 

그 결과, 첫 입장 메시지를 받는 데 5분 걸리던 것이 1초 미만으로 걸리게 되었습니다.

 

+ Recent posts