
ํค์ด ํ๋ธ๋ฅด.
์ต๊ทผ์ ๋๋ Kafka Producer์ Consumer๊ฐ ๋ณด์ฅ๋ ์ ๋ฌ์ ๋ ๊ฐ๊น์์ง๋๋ก ํ์์ ๊ฐ์ฅ ์์ฃผ ์ฌ์ฉํ๋ ๋งค๊ฐ๋ณ์๋ ๋ฌด์์ ๋๊น? ์ด ๊ธ์์๋ ์ธ๋ถ ์์คํ ์ ์ผ์์ ์ผ๋ก ์ฌ์ฉํ ์ ์๊ฒ ๋์ด Kafka์์ ์์ ํ ์ด๋ฒคํธ๋ฅผ ์ด๋ป๊ฒ ๋ค์ ์ฒ๋ฆฌํ๋์ง ์ค๋ช ํ๊ณ ์ ํฉ๋๋ค.
์ต์ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋งค์ฐ ๋ณต์กํ ํ๊ฒฝ์์ ์๋ํฉ๋๋ค. Kubernetes๋ OpenShift์ ๊ฐ์ ์ค์ผ์คํธ๋ ์ดํฐ๊ฐ ๊ด๋ฆฌํ๋ Docker ์ด๋ฏธ์ง์์ ์คํ๋๋ ์ต์ ๊ธฐ์ ์คํ์ ํฌํจ๋ ๋น์ฆ๋์ค ๋ก์ง์ ๋ฌผ๋ฆฌ์ ๋ฐ ๊ฐ์ ๋ผ์ฐํฐ ์ฒด์ธ์ ํตํด ๋ค๋ฅธ ์ ํ๋ฆฌ์ผ์ด์ ์ด๋ ์ํฐํ๋ผ์ด์ฆ ์๋ฃจ์ ๊ณผ ํต์ ํฉ๋๋ค. ์ด๋ฌํ ํ๊ฒฝ์์๋ ์ธ์ ๋ ๋ฌด์ธ๊ฐ๊ฐ ๊ณ ์ฅ๋ ์ ์์ผ๋ฏ๋ก ์ธ๋ถ ์์คํ ์ค ํ๋๋ฅผ ์ฌ์ฉํ ์ ์๋ ๊ฒฝ์ฐ ์ด๋ฒคํธ๋ฅผ ๋ค์ ์ฒ๋ฆฌํ๋ ๊ฒ์ ๋น์ฆ๋์ค ํ๋ก์ธ์ค์ ์ค์ํ ๋ถ๋ถ์ ๋๋ค.
์นดํ์นด ์ด์ ์ ์ํฉ์ ์ด๋ ๋๊ฐ
์ด ํ๋ก์ ํธ์ ์ด๊ธฐ ๋จ๊ณ์์๋ ๋น๋๊ธฐ ๋ฉ์์ง ์ ๋ฌ์ ์ํด IBM MQ๋ฅผ ์ฌ์ฉํ์ต๋๋ค. ์๋น์ค ์ด์ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ฉด ์์ ๋ ๋ฉ์์ง๋ ์ถ๊ฐ ์๋ ๊ตฌ๋ฌธ ๋ถ์์ ์ํด ๋ฐ๋ ๋ ํฐ ํ(DLQ)์ ๋ณด๊ด๋ ์ ์์ต๋๋ค. DLQ๋ ์์ ๋๊ธฐ์ด ์์ ์์ฑ๋์์ผ๋ฉฐ, ๋ฉ์์ง ์ ์ก์ IBM MQ ๋ด๋ถ์์ ๋ฐ์ํ์ต๋๋ค.
์ค๋ฅ๊ฐ ์ผ์์ ์ด๊ณ ์ด๋ฅผ ๊ฐ์งํ ์ ์๋ ๊ฒฝ์ฐ(์: HTTP ํธ์ถ์์ ResourceAccessException์ด ๋ฐ์ํ๊ฑฐ๋ MongoDb ์์ฒญ์์ MongoTimeoutException์ด ๋ฐ์ํ๋ ๊ฒฝ์ฐ) ์ฌ์๋ ์ ๋ต์ด ์ ์ฉ๋ฉ๋๋ค. ์ ํ๋ฆฌ์ผ์ด์ ๋ก์ง์ ๋ถ๊ธฐ์ ๊ด๊ณ์์ด ์๋ ๋ฉ์์ง๋ ์ง์ฐ ์ ์ก์ ์ํด ์์คํ ๋๊ธฐ์ด๋ก ์ ์ก๋๊ฑฐ๋, ๋ฉ์์ง๋ฅผ ๋ค์ ์ ์กํ๊ธฐ ์ํด ๋ง๋ค์ด์ง ๋ณ๋์ ์ ํ๋ฆฌ์ผ์ด์ ์ผ๋ก ์ ์ก๋์์ต๋๋ค. ์ด ๊ฒฝ์ฐ ์ฌ์๋ ๋ฒํธ๊ฐ ๋ฉ์์ง ํค๋์ ๊ธฐ๋ก๋๋ฉฐ, ์ด๋ ์ง์ฐ ๊ฐ๊ฒฉ์ด๋ ์ ํ๋ฆฌ์ผ์ด์ ์์ค ์ ๋ต์ ๋์ ์ฐ๊ฒฐ๋ฉ๋๋ค. ์ ๋ต์ ๋์ ๋๋ฌํ์ง๋ง ์ธ๋ถ ์์คํ ์ ์ฌ์ ํ ์ฌ์ฉํ ์ ์๋ ๊ฒฝ์ฐ ํด๋น ๋ฉ์์ง๋ ์๋ ๊ตฌ๋ฌธ ๋ถ์์ ์ํด DLQ์ ์ ์ฅ๋ฉ๋๋ค.
์๋ฃจ์ ๊ฒ์
, ๋ค์์ ์ฐพ์ ์ ์์ต๋๋ค . ๊ฐ๋จํ ๋งํด, ๊ฐ ์ง์ฐ ๊ฐ๊ฒฉ์ ๋ํ ์ฃผ์ ๋ฅผ ๋ง๋ค๊ณ , ํ์ํ ์ง์ฐ ์๊ฐ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ฝ์ ์ ํ๋ฆฌ์ผ์ด์ ์ธก์ ์๋น์๋ฅผ ๊ตฌํํ๋ ๊ฒ์ด ์ ์๋ฉ๋๋ค.

๊ธ์ ์ ์ธ ๋ฆฌ๋ทฐ๊ฐ ๋ง์์๋ ๋ถ๊ตฌํ๊ณ , ์ ์๊ฐ์๋ ์ ์ ์ผ๋ก ์ฑ๊ณต์ ์ด์ง๋ ์์ ๊ฒ ๊ฐ์ต๋๋ค. ์ฒซ์งธ, ๊ฐ๋ฐ์๋ ๋น์ฆ๋์ค ์๊ตฌ ์ฌํญ์ ๊ตฌํํ๋ ๊ฒ ์ธ์๋ ์ค๋ช ๋ ๋ฉ์ปค๋์ฆ์ ๊ตฌํํ๋ ๋ฐ ๋ง์ ์๊ฐ์ ํ ์ ํด์ผ ํฉ๋๋ค.
๋ํ, Kafka ํด๋ฌ์คํฐ์์ ์ก์ธ์ค ์ ์ด๊ฐ ํ์ฑํ๋ ๊ฒฝ์ฐ ํ ํฝ์ ์์ฑํ๊ณ ํด๋น ํ ํฝ์ ํ์ํ ์ก์ธ์ค ๊ถํ์ ์ ๊ณตํ๋ ๋ฐ ์๊ฐ์ ํ ์ ํด์ผ ํฉ๋๋ค. ์ด ์ธ์๋ ๊ฐ ๊ฒ์ ์ฃผ์ ์ ๋ํด ์ฌ๋ฐ๋ฅธ retention.ms ๋งค๊ฐ๋ณ์๋ฅผ ์ ํํด์ผ ๋ฉ์์ง๊ฐ ๋ค์ ์ ์ก๋๊ณ ์ฌ๋ผ์ง์ง ์์ ์๊ฐ์ ํ๋ณดํ ์ ์์ต๋๋ค. ๊ตฌํ ๋ฐ ์ก์ธ์ค ์์ฒญ์ ๊ธฐ์กด ์๋น์ค๋ ์๋ก์ด ์๋น์ค๋ง๋ค ๋ฐ๋ณต๋์ด์ผ ํฉ๋๋ค.
์ด์ Spring์ด ์ ๊ณตํ๋ ๋ฉ์์ง ์ฌ์ฒ๋ฆฌ ๋ฉ์ปค๋์ฆ๊ณผ ํนํ Spring-Kafka๊ฐ ์ ๊ณตํ๋ ๋ฉ์์ง ์ฌ์ฒ๋ฆฌ ๋ฉ์ปค๋์ฆ์ ์ดํด๋ณด๊ฒ ์ต๋๋ค. Spring-kafka๋ ๋ค์ํ BackOffPolicy๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํ ์ถ์ํ๋ฅผ ์ ๊ณตํ๋ spring-retry์ ๋ํ ์ ์ด์ ์ข ์์ฑ์ ๊ฐ์ง๊ณ ์์ต๋๋ค. ์ด๊ฒ์ ๋งค์ฐ ์ ์ฐํ ๋๊ตฌ์ด์ง๋ง, ๊ฐ์ฅ ํฐ ๋จ์ ์ ๋ค์ ๋ณด๋ผ ๋ฉ์์ง๋ฅผ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅํ๋ค๋ ๊ฒ์ ๋๋ค. ์ฆ, ์ ๋ฐ์ดํธ๋ ์์ ์ค ์ค๋ฅ๋ก ์ธํด ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ค์ ์์ํ๋ฉด ์ฌ์ฒ๋ฆฌ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ๋ชจ๋ ๋ฉ์์ง๊ฐ ์์ค๋ฉ๋๋ค. ์ด ์ง์ ์ ์ฐ๋ฆฌ ์์คํ ์ ๋งค์ฐ ์ค์ํ๋ฏ๋ก ๋ ์ด์ ๊ณ ๋ คํ์ง ์์์ต๋๋ค.
spring-kafka ์์ฒด๋ ContainerAwareErrorHandler์ ์ฌ๋ฌ ๊ตฌํ์ ์ ๊ณตํฉ๋๋ค. ์๋ฅผ ๋ค์ด ์ด๋ฅผ ํตํด ์ค๋ฅ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ ์คํ์ ์ ์ด๋ํ์ง ์๊ณ ๋ ๋์ค์ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค. spring-kafka 2.3๋ถํฐ BackOffPolicy๋ฅผ ์ค์ ํ ์ ์์ต๋๋ค.
์ด ์ ๊ทผ ๋ฐฉ์์ ์ฌ์ฉํ๋ฉด ์ฌ์ฒ๋ฆฌ๋ ๋ฉ์์ง๊ฐ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ค์ ์์ํ ํ์๋ ์ ์ง๋์ง๋ง ์ฌ์ ํ DLQ ๋ฉ์ปค๋์ฆ์ ์์ต๋๋ค. ์ด๊ฒ์ ์ฐ๋ฆฌ๊ฐ 2019๋ ์ด์ ์ ํํ ์ต์ ์ผ๋ก, DLQ๊ฐ ํ์ํ์ง ์์ ๊ฒ์ด๋ผ๊ณ ๋๊ด์ ์ผ๋ก ๋ฏฟ์์ต๋๋ค(์ฐ๋ฆฌ๋ ์ด์ด ์ข์๊ณ ์ค์ ๋ก ๊ทธ๋ฌํ ์ฌ์ฒ๋ฆฌ ์์คํ ์ด ์๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฌ์ฉํ ๋ช ๋ฌ ๋์ DLQ๊ฐ ํ์ํ์ง ์์์ต๋๋ค). ์ผ์์ ์ธ ์ค๋ฅ๋ก ์ธํด SeekToCurrentErrorHandler๊ฐ ํธ๋ฆฌ๊ฑฐ๋์์ต๋๋ค. ๋๋จธ์ง ์ค๋ฅ๋ ๋ก๊ทธ์ ์ธ์๋์ด ์คํ์ ์ด ๋ฐ์ํ์ผ๋ฉฐ, ์ฒ๋ฆฌ๋ ๋ค์ ๋ฉ์์ง๋ก ๊ณ์๋์์ต๋๋ค.
์ต์ข ๊ฒฐ์
SeekToCurrentErrorHandler ๊ธฐ๋ฐ ๊ตฌํ์ผ๋ก ์ธํด ์ฐ๋ฆฌ๋ ๋ฉ์์ง ์ฌ์ ์ก์ ์ํ ์์ฒด ๋ฉ์ปค๋์ฆ์ ๊ฐ๋ฐํ๊ฒ ๋์์ต๋๋ค.
์ฐ์ , ์ฐ๋ฆฌ๋ ๊ธฐ์กด ๊ฒฝํ์ ํ์ฉํ๊ณ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ ผ๋ฆฌ์ ๋ฐ๋ผ ์ด๋ฅผ ํ์ฅํ๊ณ ์ถ์์ต๋๋ค. ์ ํ ๋ ผ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฒฝ์ฐ ์ฌ์๋ ์ ๋ต์์ ์ง์ ํ ์งง์ ์๊ฐ ๋ด์ ์ ๋ฉ์์ง ์ฝ๊ธฐ๋ฅผ ์ค์งํ๋ ๊ฒ์ด ๊ฐ์ฅ ์ข์ต๋๋ค. ๋๋จธ์ง ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฒฝ์ฐ ์ฌ์๋ ์ ๋ต์ ์ํํ ์ ์๋ ๋จ์ผ ์ง์ ์ ์ํ์ต๋๋ค. ๋ํ, ์ด ๋จ์ผ ์ง์ ์๋ ๋ ๊ฐ์ง ์ ๊ทผ ๋ฐฉ์ ๋ชจ๋์ ๋ํ DLQ ๊ธฐ๋ฅ์ด ์์ด์ผ ํฉ๋๋ค.
์ฌ์๋ ์ ๋ต ์์ฒด๋ ์ผ์์ ์ธ ์ค๋ฅ๊ฐ ๋ฐ์ํ์ ๋ ๋ค์ ๊ฐ๊ฒฉ์ ์ป๋ ์ญํ ์ ํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ ์ฅ๋์ด์ผ ํฉ๋๋ค.
์ ํ ๋ ผ๋ฆฌ ์ ํ๋ฆฌ์ผ์ด์ ์ ์๋น์ ์ค์ง
spring-kafka๋ฅผ ์ฌ์ฉํ ๋ Consumer๋ฅผ ์ค์งํ๋ ์ฝ๋๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
public void pauseListenerContainer(MessageListenerContainer listenerContainer,
Instant retryAt) {
if (nonNull(retryAt) && listenerContainer.isRunning()) {
listenerContainer.stop();
taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
return;
}
// to DLQ
}์ด ์์์ retryAt์ MessageListenerContainer๊ฐ ์์ง ์คํ ์ค์ด๋ฉด ๋ค์ ์์ํ ์๊ฐ์ ๋๋ค. ์ฌ์์์ TaskScheduler์์ ์์๋ ๋ณ๋์ ์ค๋ ๋์์ ๋ฐ์ํ๋ฉฐ, ์ด์ ๋ํ ๊ตฌํ๋ Spring์์ ์ ๊ณต๋ฉ๋๋ค.
retryAt์ ๊ฐ์ ๋ค์๊ณผ ๊ฐ์ ๋ฐฉ๋ฒ์ผ๋ก ์ฐพ์ ์ ์์ต๋๋ค.
- ์ฝ๋ฐฑ ์นด์ดํฐ์ ๊ฐ์ ๊ฒ์ํ๊ณ ์์ต๋๋ค.
- ์นด์ดํฐ ๊ฐ์ ๋ฐ๋ผ ์ฌ์๋ ์ ๋ต์ ํ์ฌ ์ง์ฐ ๊ฐ๊ฒฉ์ด ๊ฒ์๋ฉ๋๋ค. ์ ๋ต์ ์ ํ๋ฆฌ์ผ์ด์ ์์ฒด์์ ์ ์ธ๋๋ฉฐ, ์ด๋ฅผ ์ ์ฅํ๊ธฐ ์ํด JSON ํ์์ ์ ํํ์ต๋๋ค.
- JSON ๋ฐฐ์ด์์ ์ฐพ์ ๊ฐ๊ฒฉ์๋ ์ฒ๋ฆฌ๋ฅผ ๋ฐ๋ณตํด์ผ ํ๋ ์ด ์๊ฐ ํฌํจ๋์ด ์์ต๋๋ค. ์ด ์ด ์๋ ํ์ฌ ์๊ฐ์ ๋ํด์ ธ retryAt ๊ฐ์ ํ์ฑํฉ๋๋ค.
- ๊ฐ๊ฒฉ์ ์ฐพ์ ์ ์๋ ๊ฒฝ์ฐ retryAt ๊ฐ์ null์ด๊ณ ๋ฉ์์ง๋ ์๋ ๊ตฌ๋ฌธ ๋ถ์์ ์ํด DLQ๋ก ์ ์ก๋ฉ๋๋ค.
์ด ์ ๊ทผ ๋ฐฉ์์ ์ฌ์ฉํ๋ฉด ํ์ฌ ์ฒ๋ฆฌ ์ค์ธ ๊ฐ ๋ฉ์์ง์ ๋ํ ๋ฐ๋ณต ํธ์ถ ํ์๋ฅผ ์ ํ๋ฆฌ์ผ์ด์ ๋ฉ๋ชจ๋ฆฌ ๋ฑ์ ์ ์ฅํ๋ ๊ฒ๋ง ๋จ์ต๋๋ค. ์ด ์ ๊ทผ ๋ฐฉ์์์๋ ์๋ ์นด์ดํฐ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ๋ณด๊ดํ๋ ๊ฒ์ด ์ค์ํ์ง ์์ต๋๋ค. ์ ํ ๋ ผ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ ์ฒด์ ์ผ๋ก ์ฒ๋ฆฌ๋ฅผ ์ํํ ์ ์๊ธฐ ๋๋ฌธ์ ๋๋ค. spring-retry์ ๋ฌ๋ฆฌ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ค์ ์์ํด๋ ์ฌ์ฒ๋ฆฌํ ๋ฉ์์ง๊ฐ ๋ชจ๋ ์์ค๋์ง ์๊ณ ์ ๋ต๋ง ๋ค์ ์์๋ฉ๋๋ค.
์ด๋ฌํ ์ ๊ทผ ๋ฐฉ์์ ๋งค์ฐ ๋ฌด๊ฑฐ์ด ๋ถํ๋ก ์ธํด ์ฌ์ฉํ ์ ์๋ ์ธ๋ถ ์์คํ ์ ๋ถํ๋ฅผ ๋์ด์ฃผ๋ ๋ฐ ๋์์ด ๋ฉ๋๋ค. ์ฆ, ์ฌ์ฒ๋ฆฌ ์ธ์๋ ํจํด ๊ตฌํ๊น์ง ๋ฌ์ฑํ์ต๋๋ค. .
์ฐ๋ฆฌ์ ๊ฒฝ์ฐ ์ค๋ฅ ์๊ณ๊ฐ์ 1์ ๋ถ๊ณผํ๋ฉฐ, ์ผ์์ ์ธ ๋คํธ์ํฌ ์ค๋จ์ผ๋ก ์ธํ ์์คํ ๊ฐ๋ ์ค์ง ์๊ฐ์ ์ต์ํํ๊ธฐ ์ํด ์์ ์ง์ฐ ๊ฐ๊ฒฉ์ผ๋ก ๋งค์ฐ ์ธ๋ถํ๋ ์ฌ์๋ ์ ๋ต์ ์ฌ์ฉํฉ๋๋ค. ์ด๊ฒ์ด ์ฌ๋ฌ ํ์ฌ ๊ทธ๋ฃน์ ๋ชจ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ ํฉํ์ง ์์ ์ ์์ผ๋ฏ๋ก, ์ค๋ฅ ์๊ณ๊ฐ๊ณผ ๊ฐ๊ฒฉ ํฌ๊ธฐ ๊ฐ์ ๊ด๊ณ๋ ์์คํ ์ ํน์ฑ์ ๋ฐ๋ผ ์ ํํด์ผ ํฉ๋๋ค.
๋น๊ฒฐ์ ์ ๋ ผ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํ ๋ณ๋์ ์ ํ๋ฆฌ์ผ์ด์
๋ค์์ RETRY_AT ์๊ฐ์ ๋๋ฌํ๋ฉด DESTINATION ์ฃผ์ ๋ก ๋ค์ ์ ์ก๋๋ ๋ฉ์์ง๋ฅผ ํด๋น ์ ํ๋ฆฌ์ผ์ด์ (Retryer)์ ์ ์กํ๋ ์ฝ๋์ ์์ ๋๋ค.
public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic,
Instant retryAt, String counter, String groupId, Exception e) {
Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
List<Header> arrayOfHeaders =
new ArrayList<>(Arrays.asList(headers.toArray()));
updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
updateHeader(arrayOfHeaders, ORIGINAL_PARTITION,
() -> Integer.toString(record.partition()).getBytes());
if (nonNull(retryAt)) {
updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
} else {
updateHeader(arrayOfHeaders, REASON,
ExceptionUtils.getStackTrace(e)::getBytes);
updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
}
ProducerRecord<K, V> messageToSend =
new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
kafkaTemplate.send(messageToSend);
}์ด ์๋ฅผ ๋ณด๋ฉด ๋ง์ ์ ๋ณด๊ฐ ํค๋๋ฅผ ํตํด ์ ์ก๋๋ค๋ ๊ฒ์ด ๋ถ๋ช ํฉ๋๋ค. RETRY_AT ๊ฐ์ ์๋น์ ์ค์ง ์ฌ์๋ ๋ฉ์ปค๋์ฆ๊ณผ ๊ฐ์ ๋ฐฉ์์ผ๋ก ์ฐพ์ ์ ์์ต๋๋ค. DESTINATION ๋ฐ RETRY_AT ์ธ์๋ ๋ค์์ ์ ๋ฌํฉ๋๋ค.
- GROUP_ID๋ ๋ฉ์์ง๋ฅผ ๊ทธ๋ฃนํํ์ฌ ์๋ ๋ถ์๊ณผ ๋ณด๋ค ์ฌ์ด ๊ฒ์์ ์ํด ์ฌ์ฉ๋ฉ๋๋ค.
- ORIGINAL_PARTITION์ ์ฌ์ฉํ์ฌ ์ฌ์ฒ๋ฆฌ๋ฅผ ์ํด ๋์ผํ Consumer๋ฅผ ์ ์งํ๋ ค๊ณ ํฉ๋๋ค. ์ด ๋งค๊ฐ๋ณ์๋ null์ผ ์ ์์ผ๋ฉฐ, ์ด ๊ฒฝ์ฐ ์ ํํฐ์ ์ ์๋ณธ ๋ฉ์์ง์ record.key() ํค๋ก ์ป์ด์ง๋๋ค.
- ์ฌ์๋ ์ ๋ต์ ๋ฐ๋ผ COUNTER ๊ฐ์ด ์ ๋ฐ์ดํธ๋์์ต๋๋ค.
- SEND_TO๋ RETRY_AT์ ๋๋ฌํ๋ฉด ์ฌ์ฒ๋ฆฌ๋ฅผ ์ํด ๋ฉ์์ง๋ฅผ ๋ณด๋ผ์ง ์๋๋ฉด DLQ์ ๋ฃ์์ง ์ฌ๋ถ๋ฅผ ๋ํ๋ด๋ ์์์ ๋๋ค.
- ์ด์ - ๋ฉ์์ง ์ฒ๋ฆฌ๊ฐ ์ค๋จ๋ ์ด์ .
Retryer๋ PostgreSQL์์ ์ฌ์ ์ก ๋ฐ ์๋ ๊ตฌ๋ฌธ ๋ถ์์ ์ํด ๋ฉ์์ง๋ฅผ ์ ์ฅํฉ๋๋ค. ํ์ด๋จธ๋ RETRY_AT๊ฐ ๋ฐ์ํ ๋ฉ์์ง๋ฅผ ์ฐพ๊ณ record.key() ํค์ ํจ๊ป DESTINATION ํ ํฝ์ ORIGINAL_PARTITION ํํฐ์ ์ผ๋ก ๋ค์ ๋ณด๋ด๋ ์์ ์ ์์ํฉ๋๋ค.
์ ์ก๋ ๋ฉ์์ง๋ PostgreSQL์์ ์ญ์ ๋ฉ๋๋ค. ๋ฉ์์ง์ ์๋ ๊ตฌ๋ฌธ ๋ถ์์ REST API๋ฅผ ํตํด Retryer์ ์ํธ ์์ฉํ๋ ๊ฐ๋จํ UI์์ ์ด๋ฃจ์ด์ง๋๋ค. ์ฃผ์ ๊ธฐ๋ฅ์ผ๋ก๋ DLQ์์ ๋ฉ์์ง๋ฅผ ๋ค์ ๋ณด๋ด๊ฑฐ๋ ์ญ์ ํ๊ณ , ์ค๋ฅ ์ ๋ณด๋ฅผ ๋ณด๊ณ , ์ค๋ฅ ์ด๋ฆ์ผ๋ก ๋ฉ์์ง๋ฅผ ๊ฒ์ํ๋ ๊ฒ์ด ์์ต๋๋ค.
ํด๋ฌ์คํฐ์์ ์ก์ธ์ค ์ ์ด๊ฐ ํ์ฑํ๋์์ผ๋ฏ๋ก Retryer๊ฐ ์์ ํ๋ ์ฃผ์ ์ ๋ํ ์ก์ธ์ค๋ฅผ ์ถ๊ฐ๋ก ์์ฒญํ๊ณ Retryer๊ฐ DESTINATION ์ฃผ์ ์ ์ธ ์ ์๋๋ก ํ์ฉํด์ผ ํฉ๋๋ค. ์ด ๋ฐฉ๋ฒ์ ๋ถํธํ์ง๋ง ์ฃผ์ ๋ณ ๊ฐ๊ฒฉ ๋ฐฉ์๊ณผ ๋ฌ๋ฆฌ ๋ณธ๊ฒฉ์ ์ธ DLQ์ ์ด๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํ UI๋ฅผ ์ป์ ์ ์์ต๋๋ค.
์ฌ๋ฌ ์๋น์ ๊ทธ๋ฃน์ด ๋ค์ด์ค๋ ์ฃผ์ ๋ฅผ ์ฝ๊ณ ํด๋น ๊ทธ๋ฃน์ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์๋ก ๋ค๋ฅธ ๋ ผ๋ฆฌ๋ฅผ ๊ตฌํํ๋ ๊ฒฝ์ฐ๊ฐ ์์ต๋๋ค. ์ด๋ฌํ ์ ํ๋ฆฌ์ผ์ด์ ์ค ํ๋์์ Retryer๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๋ค์ ์๋ํ๋ฉด ๋ค๋ฅธ ์ ํ๋ฆฌ์ผ์ด์ ์์๋ ์ค๋ณต์ด ๋ฐ์ํฉ๋๋ค. ์ด๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํด ์ฌ์ฒ๋ฆฌ๋ฅผ ์ํ ๋ณ๋์ ์ฃผ์ ๋ฅผ ๋ง๋ค๊ณ ์์ต๋๋ค. ์์ ๋ฐ ์ฌ์๋ ์ฃผ์ ๋ ๋์ผํ ์๋น์๊ฐ ์๋ฌด๋ฐ ์ ํ ์์ด ์ฝ์ ์ ์์ต๋๋ค.

๊ธฐ๋ณธ์ ์ผ๋ก ์ด ์ ๊ทผ ๋ฐฉ์์ ํ๋ก ์ฐจ๋จ๊ธฐ ๊ธฐ๋ฅ์ ์ ๊ณตํ์ง ์์ง๋ง ๋ค์์ ์ฌ์ฉํ์ฌ ์ ํ๋ฆฌ์ผ์ด์ ์ ํ๋ก ์ฐจ๋จ๊ธฐ ๊ธฐ๋ฅ์ ์ถ๊ฐํ ์ ์์ต๋๋ค. ๋๋ ์๋ก์ด ์ธ๋ถ ์๋น์ค์ ํธ์ถ ์ฌ์ดํธ๋ฅผ ์ ์ ํ ์ถ์ํ๋ก ๋ํํฉ๋๋ค. ๋ํ, ์ ๋ต์ ์ ํํ ์ ์๋ ๊ธฐํ๊ฐ ์์ต๋๋ค. ํจํด๋ ์ ์ฉํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, spring-cloud-netflix์์๋ ์ค๋ ๋ ํ์ด๋ ์ธ๋งํฌ์ด๊ฐ ๋ ์ ์์ต๋๋ค.
์ถ๋ ฅ
๊ทธ ๊ฒฐ๊ณผ, ์ธ๋ถ ์์คํ ์ ์ผ์์ ์ผ๋ก ์ฌ์ฉํ ์ ์์ ๋ ๋ฉ์์ง ์ฒ๋ฆฌ๋ฅผ ๋ฐ๋ณตํ ์ ์๋ ๋ณ๋์ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ป๊ฒ ๋์์ต๋๋ค.
์ด ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฃผ์ ์ฅ์ ์ค ํ๋๋ ๋์ผํ Kafka ํด๋ฌ์คํฐ์์ ์คํ๋๋ ์ธ๋ถ ์์คํ ์์๋ ํฐ ์์ ์์ด ์ฌ์ฉํ ์ ์๋ค๋ ๊ฒ์ ๋๋ค! ์ด๋ฌํ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฌ์๋ ์ฃผ์ ์ ์ก์ธ์คํ๊ณ , ๋ช ๊ฐ์ Kafka ํค๋๋ฅผ ์์ฑํ๊ณ , Retryer์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ธฐ๋ง ํ๋ฉด ๋ฉ๋๋ค. ์ถ๊ฐ์ ์ธ ์ธํ๋ผ๋ฅผ ๊ตฌ์ถํ ํ์๊ฐ ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ ํ๋ฆฌ์ผ์ด์ ์์ Retryer๋ก ์ ์ก๋๋ ๋ฉ์์ง ์๋ฅผ ์ค์ด๊ธฐ ์ํด ์ ํ ๋ ผ๋ฆฌ๋ฅผ ์ ์ฉํ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ ํํ๊ณ Consumer๋ฅผ ์ค์งํ์ฌ ํด๋น ์ ํ๋ฆฌ์ผ์ด์ ์์ ์ฌ์ฒ๋ฆฌ๋ฅผ ์ํํ์ต๋๋ค.
์ถ์ฒ : habr.com
