문제 인식
저는 필요로 인해 유저 서비스에 포함되어 있던 채팅 기능을 채팅 서버로 분리하였습니다.
그 이후 특정 상황에서 채팅 서버에서 카프카 컨슈머 역직렬화에 실패하였다는 에러 로그가 엄청나게 빨리 쌓여 채팅 서버가 존재하는 ec2 인스턴스 서버에 에러 로그가 30초도 안되서 1GB가 쌓여서 디스크 공간이 매우 빠른 속도로 고갈되는 문제가 발생했습니다.
이 문제는 해당 ec2 인스턴스는 채팅 서버 뿐만 아니라 다른 서버도 존재하였기 때문에 다른 서버에 영향을 끼치고, ec2 데이터 쓰기 비용이 크게 증가하기 때문에 빠르게 해결해야 했습니다.
문제 파악
알아보니 로그가 빨리 쌓이는 이유는 스프링 카프카는 컨슈머에서 들어온 메시지의 역직렬화에 실패하면, 해당 메시지에 대하여 역직렬화를 짧은 시간 내에 다시 시도합니다.
해당 에러에 대한 처리가 없다면 무한 루프가 되어 엄청난 속도로 에러 로그가 쌓이게 되는 것이었습니다.
다음으로 역직렬화에 실패한 이유를 파악하기 위해 에러 로그를 확인 했고, 에러 로그는 다음과 같았습니다.
Caused by: java.lang.ClassNotFoundException: com.kernelsquare.memberapi.domain.coffeecaht.dto.ChatMessageRequest
해당 로그를 통해서 스프링 카프카는 프로듀서에서 클래스에 대한 정보도 같이 메시지를 직렬화하여 넣는다는 것을 알게 되었습니다.
그렇기 때문에, 유저 서버에서 해당 메시지를 보내면 채팅 서버에도 똑같이 com.kernelsquare.memberapi.domain.coffeechat.dto 위치에 ChatMessageRequest 클래스가 있어야 컨슈머에서 역직렬화가 가능합니다.
그런데 저는 채팅 서버에서 다른 위치의 ChatMessageRequest 클래스를 사용하기 때문에 역직렬화에 실패한 것입니다.
문제 해결
저는 다음 3가지 해결 방법이 떠올랐습니다.
- 첫 번째, 채팅 서버에도 똑같은 위치에 ChatMessageRequest 클래스를 생성하는 방법
- 두 번째, 유저 서버에서 메시지를 보낼 때, 클래스 정보는 넣지 않도록 설정하는 방법
- 세 번째, 유저 서버의 ChatMessageRequest와 채팅 서버의 ChatMessageRequest를 매핑할 수 있도록 설정하는 방법
저는 다음 2가지 이유로 세 번째 방법을 선택했습니다.
- 매핑하는 방법이 채팅 서버 패키지 구조과 유저 서버 패키지 구조에 의존적이지 않아도 되기 때문입니다.
- 매핑하는 방법이 프로듀서 측 클래스 정보도 포함할 수 있기 때문입니다.
매핑하는 방법은 프로듀서와 컨슈머에서 매핑 설정을 해줘야 하는데, yml 파일에서 하는 방법과 코드에서 하는 방법이 있었습니다.
저는 이미 프로듀서와 컨슈머 설정을 코드로 구현했기 때문에 코드에서 매핑 설정을 추가하기로 했습니다.
프로듀서
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(JsonSerializer.TYPE_MAPPINGS,
"ChatMessageRequest:com.kernelsquare.memberapi.domain.coffeechat.dto.ChatMessageRequest");
return new DefaultKafkaProducerFactory<>(config);
}
컨슈머
public ConsumerFactory<String, ChatMessageRequest> consumerFactory(String groupId) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TYPE_MAPPINGS,
"ChatMessageRequest:com.kernelsquare.chattingapi.domain.chatting.dto.ChatMessageRequest");
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(config);
}
매핑 설정은 사용한 Serializer과 Deserializer의 TYPE_MAPPINGS에 "매핑할 이름:위치를 포함한 클래스명" 형태로 넣어주면 됩니다.
프로듀서와 컨슈머에 매핑 설정한 결과, 에러없이 잘 작동함을 확인했습니다.
더 나아가...
하지만 저는 여기까지는 반만 해결한 것이라 생각했습니다.
왜냐하면 모종의 이유(?)로 컨슈머에서 역직렬화하지 못하는 메시지가 메시지 브로커에 들어간다면 무한 루프로 에러 로그를 찍게 될 것이라 생각했기 때문입니다.
그렇다면 해당 메시지는 무시하고 다음 메시지로 넘어가게 할 수 있다면 되지 않을까라는 생각을 하게 되었습니다.
그래서 찾아보니 ErrorHandlingDeserializer라는 것을 사용하면 역직렬화에 실패하면 null을 반환하기 때문에 다음 메시지로 넘어갈 수 있다는 글이 있었습니다.
그래서 저는 ErrorHandlingDeserializer를 사용하여 컨슈머 설정을 다음과 같이 했습니다.
public ConsumerFactory<String, ChatMessageRequest> consumerFactory(String groupId) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.TYPE_MAPPINGS,
"ChatMessageRequest:com.kernelsquare.chattingapi.domain.chatting.dto.ChatMessageRequest");
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(config);
}
기존 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG와 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG에 ErrorHandlingDeserializer.class를 넣고,
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS와 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS 에 기존에 사용하던 Deserializer.class를 넣어주면 됩니다.
이 컨슈머 설정을 적용한 후 저는 에러 로그를 한번 찍고 다음 메시지를 처리할 수 있게 되었습니다.
개인적인 생각
ErrorHandlingDeserializer를 적용함으로써 에러 로그를 한 번 찍고 다음 메시지로 넘어갈 수 있었지만, 그 한 번의 에러 로그가 엄청 길어서, 이것을 커스텀하여 핵심 내용만 로그에 남길 수 있다면 가독성 측면과 리소스 측면에서 나아질 것이라 생각이 들었습니다.
역직렬화에 실패한 메시지에 대한 관리를 따로 할 수 있다면, 나중에 해당 메시지들에 대해 파악할 수 있지 않을까 생각합니다.