vidigummy SOMA

Kafka와 Azure에서의 사용(Event Hub, Event Grid)

vidi 2022. 8. 14. 22:25

드디어 내가 좀 아는 것에 대해 이야기를 할 수 있는 기회가 왔다. 그 전 포스팅은... 정리가 안 되어있는 느낌일 것이다. 당연하다. 내가 직접 쓰지는 않거든... 더 공부해서 오도록 하겠다.

  1. 알게 된 + 공부하게 된 계기
    1. 일단 우리 프로젝트에 기술적인 제목을 붙이자면 Webhook BigData Pipeling 정도가 되겠다.
    2. 데이터를 수집하는 입장에서는 수 많은 곳에서 날라오는 Webhook들을 최대한 빠르고 오류 없이 DataWareHouse에 집어 넣는 것이 관건.
    3. 일반적으로 생각을 해보자고, 서버리스인 Azure Function는 각 이벤트에 대한 Webhook이 올 때 마다 서버가 생성되어 데이터를 처리한다. 이 것들을 좀 과장해서 OLTP DataBase / OLAP DataWarehouse / Cosmos DB라고 쳐보자. 어떤 문제가 생길까?
    4. 요청이 생길 때 마다 트랜잭션이 생기고, 이 통신 비용과 그것들을 위한 스펙은 모두 돈이다. 0.1초 내에 100개의 이벤트가 생기는 서비스를 생각해보자. 물론 트랜잭션은 바로 생겼다가 없어지겠지만, 1초면 10000개의 트랜잭션이 생기고, 서버리스로 운영한다고 하더라도 비용이 장난 없을 것이다. 보내는 데이터의 양이 똑같지 않냐고 말해도, tcp를 포함한 대부분의 http 통신은 한번 발생 할 때 마다 비용이 늘어나며, 서버리스의 경우, 인스턴스가 늘어나는 비용은 무시할 수 없다.
    5. 심지어 database들이 늘어난 트래픽을 바로 감당 못 할 경우, 데이터 유실등의 문제가 발생할 수 밖에 없다. 이건 데이터 처리에 있어서 아주 큰 문제이다.
    6. 이것들을 처리하기 위해 Kafka가 필요한 것이다. Batch 작업과 Queue 작업 둘 다 가능케 하는.
  2. 구성 요소(프로듀서, 컨슈머, 브로커, 토픽) 
    1. 프로듀서
      1. 토픽에 메시지 전송을 할 때
        1. 토픽, 키, 값이 들어간다.
        2. 설정들이 properties를 이용한다
          1. 이것들을 이용해 프로듀서 객체를 생성함
          2. 프로듀서 객체는 send를 지원함
          3. producerRecord를 이용한다
      2. 프로듀서의 기본 흐름
        1. send()를 보내게 되면 Serializer에 들어가는데,
          1. 여기서 프로듀서 객체를 byte 배열로 변환시키게 된다.
        2. 그 다음은 Prtitioner이 있는데
          1. 여기서 들어갈 피티션을 정하게 된다.
        3. 이 메시지들을 배치로 묶어서 브로커에 전달하게 된다.
          1. sender라는 애가 배치를 가져와서 브로커로 이동시킴
      3. Sender의 기본 동작
        1. 배치가 찼는지 여보에 상관 없이 Sender는 차례대로 브로커에 전송
        2. 그 앞단 친구들은 Sender가 메시지를 보내는 동안 배치로 모음
        3. 이 모든 일들은 Sender가 별도 쓰레드로 동작하는 덕분이다.
        4. 설정들
          1. 배치 사이즈 설정(bach.size)
            1. 특정 크기 만큼 배치가 차면 바로 보내는 것
          2. linger.ms(전송 대기 시간)
            1. 대기 시간이 없으면 배치를 바로 전송
            2. 대시 시간을 주면 그 시간 만큼 기다렸다 배치를 전송
      4. 전송 결과 확인 안 함
        1. 전송 실패를 알 수 없음
        2. 실패에 대한 별도 처리가 없는 메시지 전송도 사용할 수 있음
      5. 전송 결과 확인 해야 할 때
        1. Future를 사용
          1. 블로킹을 실행한다.
            1. 메시지를 하나씩만 보내게 됨 → 배치 효과가 떨어짐 → 처리량 저하
              1. 처리량이 낮아도 되는 경우에만 사용
        2. callBack객체를 전달
          1. 처리량 저하는 없음
          2. Blocking 아님
      6. 전송 보장과 ack
        1. ack = 0
          1. 서버 응답을 기다리지 않음 → 속도 빠름
          2. 전송 보장도 안됨
        2. ack = 1
          1. 파티션의 리더에 저장되면 응답 받음
          2. 리더 장애시 메시지 유실 가능
        3. ack = all(또는 -1)
          1. 모든 리플리ㅣ카에 저장되면 응답 받음
            1. 팔로워를 포함함
            2. 브로커 min.insync.replicas 설정에 따라 달라짐
      7. ack + min.insync.replicas
        1. min.insync.replicas(브로커 옵션)
          1. 프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
          2. 팔로워 모두가 받진 않더라도 괜찮아
      8. 에러 유형
        1. 전송 과정에서 실패
          1. 전송 타ㅣㅁ 아웃
          2. 리더 다운에 의한 새 리더 선출 진행중
          3. 브로커 설정 메시지 크기 한도 초과
          4. 등등
        2. 전송 전에 시래
          1. 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과
          2. 프로듀서 버퍼가 다 차서 기다린 시간이 최대 대기 시간 초과
          3. 등등
      9. 대응 예시
        1. 재시도
          1. 브로커 응답 타임 아웃, 일시적인 리더 없음 등)
          2. 재시도 위치
            1. 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
              1. tetries 속성
            2. send() 메서드에서 익셉션 발생 시 익셉션 타입에 따라 send() 재 호출
            3. 콜백 메서드에서 익셉션 받으면 타입에 따라 send() 재 호출
          3. 아주 특별한 이유가 없다면 무한 재시도는 하면 안됨
        2. 기록
          1. 추후 처리를 위해 기록
            1. 별도 파일, DB 등을 이용해서 실패한 메시지 기록
            2. 추후에 수동(또는 자동) 보정 작업 진행
          2. 기록 위치
            1. send() 메서드에서 익셉션 발생시
            2. send() 메서드에 저달한 콜백에서 익셉션을 받는 경우
            3. send()메서드가 리턴한 Future의 get() 메서드에서 익셉션 발생시
        3. 재시도와 메시지 중복 전송 가능성
          1. 네트워크… 기억나지? dup ack
          2. enable.idempotence 속성을 통해 해결 가능함
        4. 재시도와 순서
          1. 재시도는 전송 순서를 바꾸기도 함
            1. max.in.flight.requests.per.conection
              1. 블록킹 없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수
              2. 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음
                1. 전송 순서가 중요하면 이 값을 1로 지정해야함
    2. 컨슈머
      1. 토픽 파티션에서 레코드 조회
      2. 설정은 서버 지정, group ID 지정, 메시지를 읽어와서 역직렬화
      3. 컨슈머또한 객체이다.
      4. subscribe 메서드를 쓴다.(특정 조건을 충족할 때 까지 대기한다)
      5. 토픽 파티션은 그룹 단위 할당이다.
        1. 컨슈머 그룹 단위로 파티션 할당
          1. 컨슈머 개수와 파티션 개수는 밀접한 관련을 맺는다.
          2. 파티션 개수 < 컨슈머 그룹에서 컨슈머 개수
            1. 컨슈머는 놀게 됨
              1. 파티션이 두개 있는 토픽
              2. 파티션이 두개, 컨슈머가 한개
              3. 컨슈머 하나가 두개의 파티션에서 정보를 가져옴
              4. 그 반대라면, 컨슈머는 가져올 수 있는 파티션이 없어서 놀게 됨
        2. 커밋과 오프셋에 대한 이해
          1. 컨슈머 poll method를 실행하면 이전 커밋 오프셋 이후로 읽어옴
        3. 커밋된 오프셋이 없는 경우
          1. 처음 접근이거나 커밋한 오프셋이 없는 경우
            1. auto.offset.reset 설정 사용
            2. earliest : 맨 처음 오프셋 사용
            3. latest : 가장 마지막 오프셋 사용(기본값)
            4. none : 컨슈머 그룹에 대한 이전 커밋이 없으면 익셉션 발생
        4. 컨슈머 설정
          1. 조회에 영향을 주는 주요 설정
            1. fetch.min.bytes
              1. 조회 시 브로커가 전송할 최소 데이터 크기
                1. 기본값 1
                2. 이 값이 크면 대기 시간은 늘지만 처리량이 증가
              2. fetch.max.wait.ms
                1. 데이터가 최소 크기가 될 떄 까지 기다릴 시간
                  1. 기본값 500
                  2. 브로커가 리턴할 때 까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름
              3. max.partition.fetch.bytes
                1. 파티션 당 서버가 리턴할 수 있는 최대 크기
                2. 기본값 1048576(1MB)
          2. 자동 커밋/수동 커밋
            1. enable.auto.comit 설정
              1. 일정 주기로 컨슈머가 읽은 오프셋을 커밋(기본값) / 수동으로 커밋 실행
            2. auto.commit.interval.ms(자동 커밋 주기)
              1. 기본값 500ms
            3. poll(), close() 메서드 호출 시 자동 커밋 실행
            4. 수동 커밋(동기/비동기 커밋)
              1. commitSync() / commitAsync()← 성공 실패 여부 callback으로만 알 수 있음
          3. 재처리와 순서
            1. 동일 메시지 조회 가능성
              1. 일시적 커밋 실패, 리밸런스 등에 의해 발생
            2. 컨슈머는 멱등성(idempotence)을 고려해야함
              1. 아래 메시지를 재 처리 할 경우
                1. 조회수 1 증가 → 좋아요 1증가 → 조회수 1증가
                2. 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음
            3. 데이터 특성에 따라 타임스탬프, 일련 번호 등을 활용
          4. 세션 타임아웃, 하트비트, 최대 poll 간격
            1. 컨슈머는 하트비트를 전송해서 연결 유지
              1. 브로커는 일정 시간 컨슈ㅁㅓ로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행
              2. 관련 설정
                1. session.timeout.ms
                  1. 세션 타임 아웃 시간(기본 10초)
                2. heartbeat.interval.ms
                  1. 하트비트 전송 주기(기본 3초)
                    1. session.timeout.ms의 1/3 추천
            2. max.poll..interval.ms
              1. poll() 메서드의 최대 호출 간격
              2. 이 시간이 지나도록 poll()하지 않으면 컨슈머를 그룹에서 빼고 리밸런스 진행
      6. 종료 처리
        1. 다른 쓰레드에서 wakeup() 메서드 호출
          1. poll() 메서드가 WakeupException 발생 →close() 메서드로 종료 처리
      7. 주의점
        1. 쓰레드가 안전하지 않다
        2. KafkaConsumer는 쓰레드에 안전하지 않음
          1. 여러 쓰레드에서 동시에 사용하지 말 것!
          2. wakeup() 메서드는 예외

Event Hubs
Event Grid

 

  1. Azure에서의 사용
    1. 뭐... 그냥 kafka를 쓸 수는 있다. LinkedIn에서 개발된 Apache Kafka Project는 confluent라는 프로젝트로 재탄생 했다.
    2. 이것을 Azure에서 Confluent라는 managed service로 사용할 수 있긴 한데... 사용을 하려면 엄청난 비용을 내야한다. 달에 50만원 정도...
    3. 그렇기 때문에 우리 팀은 Apache Kafka를 이용한 CloudEvents라는 프로젝트의 Event Hub/Event Grid를 사용하기로 했고, 지금은 Event Hub를 쓰고 있다.(비싸다 이것도. 사실 EventHub 자체는 싼데, 붙어있는 Stream Analystic이 비싸다. Event Grid를 쓰려고 하고 있지만, 아직 사용법이 보이지 않는다.)
    4. Event Hub는 Kafka의 대용량 데이터 수신/Queue/Batch 작업을 가능케 한다. 물론 Consumer 설정을 하는 것이 빡세긴 하다.(비용이) 하지만, Stream Analystic의 쿼리 기능으로 유의미한 데이터만 뽑아서 보낼 수 있다는 점도 매우 장점이다.
    5. 현재의 Consumer는 Cosmos DB이며, 데이터를 안전하게 처리하고 있다.
    6. 아 맞다 Event Grid는 Queue/컨슈머 설정 같은 존재이다. 가격이 싸다. GCP의 Pub/Sub 쯤으로 생각하면 된다.

'vidigummy SOMA' 카테고리의 다른 글

4차 스프린트 회고(정말 짧은)(개강 전 마지막)  (2) 2022.08.28
3차 스프린트 회고  (2) 2022.08.21
Data Warehouse  (0) 2022.08.14
디자인 패턴(생성패턴)  (0) 2022.08.14