본문 바로가기
Apache Kafka

Apache kafka

by xogns93 2024. 11. 8.

다음과 같이 AWS EC2 인스턴스 인바운드 규칙을 설정하기 위한 설명을 참고합니다

 

AWS EC2 인바운드 규칙 설정

 

 

AWS EC2 SSH 접속

Unix 계열의 OS를 Local Host PC로 사용하시면, chmod 커맨드로 mykafkakey.pem을 실행할 수 있는 권한을 부여해야 합니다.

chmod 400
$ ssh -i ./mykafkakey.pem ubuntu@54.180.96.36

  

 EC2 Key Pair 생성 관련

OpenJDK 설치하기

1. 인스턴스 업데이트: 우선, EC2 인스턴스의 패키지 목록을 최신 상태로 업데이트합니다.

sudo apt upgrade
sudo apt update

    
2. Java 설치 가능한 버전 확인: 사용 가능한 OpenJDK 버전을 검색합니다. 이를 통해 원하는 Java 버전을 선택할 수 있습니다.

apt search openjdk

    
3. Java 설치: 원하는 버전의 OpenJDK를 설치합니다. 예를 들어, OpenJDK 11을 설치하고 싶다면, 다음과 같이 입력합니다.

sudo apt install -y openjdk-11-jdk


   -y 옵션은 설치 중 나타나는 모든 질문에 자동으로 yes로 답하는 옵션입니다, 이를 통해 설치 과정을 자동화할 수 있습니다.

 

※ kafka 3.6 버전부터 jdk17을 사용함 : https://kafka.apache.org/36/documentation.html

 


4. Java 버전 확인: Java 설치가 성공적으로 완료되었는지 확인하기 위해 설치된 Java의 버전을 확인합니다.

java -version


이 커맨드는 설치된 Java의 버전 정보를 출력합니다.

환경변수 설정
Java를 설치한 후, 특정 애플리케이션에서 Java 환경변수를 요구하는 경우가 있습니다. 예를 들어, JAVA_HOME 환경변수를 설정해야 하는 경우, .bashrc 또는 .profile 파일에 다음과 같이 추가할 수 있습니다.

1. 홈 디렉토리의 .bashrc 파일을 엽니다.

vi ~/.bashrc


2. 파일의 맨 아래에 JAVA_HOME 환경변수를 추가합니다. 예를 들어, OpenJDK 11을 설치한 경우 다음과 같이 추가합니다.

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$PATH:$JAVA_HOME/bin

 

       ...
       
fi

# colored GCC warnings and errors
#export GCC_COLORS='error=01;31:warning=01;35:note=01;36:caret=01;32:locus=01:quote=01'

# some more ls aliases
alias ll='ls -alF'
alias la='ls -A'
alias l='ls -CF'

# Add an "alert" alias for long running commands.  Use like so:
#   sleep 10; alert
alias alert='notify-send --urgency=low -i "$([ $? = 0 ] && echo terminal || echo error)" "$(history|tail -n1|sed -e '\''s/^\s*[0-9]\+\s*//;s/[;&|]\s*alert$//'\'')"'

# Alias definitions.
# You may want to put all your additions into a separate file like
# ~/.bash_aliases, instead of adding them here directly.
# See /usr/share/doc/bash-doc/examples in the bash-doc package.

if [ -f ~/.bash_aliases ]; then
    . ~/.bash_aliases
fi

# enable programmable completion features (you don't need to enable
# this, if it's already enabled in /etc/bash.bashrc and /etc/profile
# sources /etc/bash.bashrc).
if ! shopt -oq posix; then
  if [ -f /usr/share/bash-completion/bash_completion ]; then
    . /usr/share/bash-completion/bash_completion
  elif [ -f /etc/bash_completion ]; then
    . /etc/bash_completion
  fi
fi

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$PATH:$JAVA_HOME/bin


3. 변경 사항을 저장하고 .bashrc 파일을 닫은 다음, 변경 사항을 적용합니다.

source ~/.bashrc



이렇게 하면 AWS의 Ubuntu EC2 인스턴스에 Java를 설치하고 설정하는 방법을 완료할 수 있습니다.

 

kafka 다운로드

wget 커맨드로 아파치 카프카를 다운로드 받습니다.

wget https://downloads.apache.org/kafka/3.7.1/kafka_2.12-3.7.1.tgz

 

압축 파일을 해제합니다.

tar xvf ./kafka_2.12-3.7.1.tgz

 

server.properties 수정

kafka_2.13-3.7.1/config/server.properties 내용중, 아래 빨간색 하이라트 부분을 수정합니다.

vi ./server.properties

 

Zookeeper 실행

$ ./bin/zookeeper-server-start.sh -daemon /home/ubuntu/kafka_2.12-3.7.1/config/zookeeper.properties

 

Zookeeper 실행 확인

$ jps -m

 

 

4954 프로세스는 Zookeeper 서버를 실행하는 Java 프로세스입니다. QuorumPeerMain 클래스는 Zookeeper 서버를 시작하는 메인 클래스입니다.

 

Kafka 실행

$ ./bin/kafka-server-start.sh -daemon /home/ubuntu/kafka_2.12-3.7.1/config/server.properties

 

Kafka 실행 확인

$ jps -m

 

 

※ Kafka 실행 에러 발생

ubuntu@ip-172-31-45-65:~/kafka_2.12-2.5.0$ ./bin/kafka-server-start.sh ./config/server.properties                                      OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Not enough space' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/kafka_2.12-2.5.0/hs_err_pid23888.log

 

Kafka의 heap memory는 디폴트로 Kafka = 1GByte, Zookeeper=512MByte입니다.이는 현재 선택한 AWS EC2 프리티어 메모리가 1GByte 이기 때문에 위와 같은 에러가 발생합니다.그래서 Kafka와 Zookeeper의 힙 메모리를 .bashrc 파일에서 재설정해야 합니다.

 

export KAFKA_HEAP_OPTS="-Xms300m -Xmx300m"

※ 위 이미지 설정 대신에 300m(300Mbyte)로 모두 수정

 

KAFKA_HEAP_OPTS 환경 변수를 시스템에 등록합니다.

$ source ./.bashrc

 

Topic 생성(카프카 브로커를 실행 후, 토픽을 생성해야 함)

$ bin/kafaka-topics.sh \
> --create \
> --bootstrap-server 54.180.100.188:9092 \
> --topic tacocloudorders

 

Topic 목록 확인하기

$ bin/kafka-topics.sh --bootstrap-server 54.180.100.188:9092 --list

 

 

Topic 정보 확인

$ bin/kafka-topics.sh --bootstrap-server 54.180.100.188:9092 --topic tacocloudorders --describe

 

 

Kafka의 토픽에 저장된 메시지 확인

Kafka의 토픽에 저장된 메시지를 확인하는 가장 일반적인 방법은 Kafka의 커맨드 라인 툴인 `kafka-console-consumer`를 사용하는 것입니다. 이 도구를 사용하면 실시간으로 메시지를 소비하거나, 특정 시간에서부터의 메시지를 조회하는 등 다양한 방식으로 메시지를 확인할 수 있습니다.

kafka-console-consumer 사용법

1. 실시간으로 메시지 소비하기: Kafka 설치 디렉토리의 bin 폴더 안에서 다음 명령어를 실행하여, 특정 토픽의 메시지를 실시간으로 확인할 수 있습니다. 이 명령은 새로운 메시지가 토픽에 도착할 때마다 해당 메시지를 출력합니다.

 ./kafka-console-consumer.sh --bootstrap-server <broker-address>:<port> --topic <topic-name> --from-beginning

 

./kafka-console-consumer.sh --bootstrap-server 54.180.96.36:9092 --topic tacocloudorders --from-beginning
  • <broker-address>:<port>: Kafka 브로커의 주소와 포트번호입니다. 예를 들어, 54.180.96.36:9092.
  • <topic-name>: 메시지를 확인하고 싶은 토픽의 이름입니다. 예를 들어, tacocloud_orders.
  • --from-beginning: 이 옵션을 사용하면 토픽의 시작부터 현재까지의 모든 메시지를 확인할 수 있습니다. 이 옵션 없이 명령어를 실행하면, 명령어를 실행한 시점 이후에 도착하는 메시지만 확인할 수 있습니다.


2. 특정 시간부터의 메시지 소비하기: Kafka 2.1.0 이상에서는 --offset 대신 --from-beginning 또는 --partition과 함께 --offset 옵션을 사용하여 특정 시점부터 메시지를 소비할 수 있습니다. 또한 kafka-console-consumer는 --timestamp 옵션을 지원하지 않으므로, 특정 시간으로부터 메시지를 조회하고자 할 때는 다른 도구나 API를 사용해야 합니다.

 

 

topic 삭제

Kafka 토픽을 삭제하는 커맨드는 Kafka가 설치된 디렉토리 내의 bin 폴더에서 실행할 수 있는 kafka-topics.sh 스크립트를 사용하여 수행할 수 있습니다. 기본적인 커맨드 형식은 다음과 같습니다:

./kafka-topics.sh --delete --topic <토픽 이름> --bootstrap-server <브로커 목록>


여기서 <토픽 이름>에는 삭제하고자 하는 Kafka 토픽의 이름을, <브로커 목록>에는 Kafka 클러스터의 브로커들의 주소 목록을 콤마로 구분하여 입력합니다. 예를 들어, 브로커가 localhost:9092에 실행 중이고, 삭제하고자 하는 토픽 이름이 my-topic이라면, 다음과 같이 명령을 실행할 수 있습니다:

./kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092


이 커맨드를 실행하기 전에 Kafka 설정에서 토픽 삭제가 활성화되어 있는지 확인해야 합니다. Kafka의 server.properties 파일에서 delete.topic.enable=true 설정을 확인하거나 추가해야 할 수 있습니다. 이 옵션이 활성화되어 있지 않으면, 토픽을 삭제해도 실제로는 삭제되지 않고, 삭제 마킹만 되어 있게 됩니다.

또한, 토픽 삭제 후에는 해당 토픽에 관련된 데이터가 Kafka 클러스터의 브로커에서 실제로 삭제될 때까지 약간의 시간이 소요될 수 있습니다. 삭제 프로세스는 비동기적으로 수행됩니다.

 

Kafka 종료

./bin/kafka-server-stop.sh 을 실행시키면 종료됩니다


주의사항

  • kafka-console-consumer.sh 스크립트는 Kafka의 설치 경로 내 bin 디렉토리에 위치합니다. 따라서 해당 스크립트를 실행하기 전에 Kafka 설치 디렉토리로 이동하거나, 전체 경로를 명시해야 합니다.
  • 실제 운영 환경에서는 토픽에 저장된 대량의 메시지를 모두 소비하는 것이 시간이 많이 걸릴 수 있으므로, 필요한 정보를 효율적으로 검색하기 위해 적절한 옵션을 선택하여 사용하는 것이 중요합니다.

'Apache Kafka' 카테고리의 다른 글

Activemq artemis 관련  (0) 2024.11.08
Apache Kafka와 ActiveMQ  (0) 2024.11.08
Zookeeper, KRaft (Kafka Raft)  (1) 2024.11.07
Apache Kafka (아파치 카프카)  (1) 2024.11.07