MQTT 메세지 송/수신을 위해 MQTT Broker가 필요
여러가지 MQTT Broker 플랫폼이 있는데 오픈소스 중 가장 유명한 mosquitto, 그리고 서베이 중 경량, 멀티스레딩을 강조하는 nanomq test
$ sudo apt install openjdk-8-jdk mosquitto mosquitto-client
간략한 상태 확인 명령어
$ ps -ef | grep (프로세스 이름) # 내가 실행시킨 프로세스가 현재 실행중인지, PID는 어떻게 되는지 확인 가능
$ netstat -tnlpa | grep (프로세스 이름 or Port 번호) # 실행시킨 프로세스가 사용중인 포트 확인 등에 유용
$ pkill -9 (프로세스 이름) # 해당 프로세스 죽이
simple mosquitto publish
$ mosquitto_pub -h 'host_ip_address' -t 'topic' -m 'message'
simpe mosquitto subscribe
$ mosquitto_sub -h 'host_ip_address' -t 'topic'
https://github.com/emqx/nanomq
$ sudo apt install ninja-build
$ git clone https://github.com/emqx/nanomq.git ; cd nanomq
$ git submodule update --init --recursive
$ mkdir build && cd build
$ cmake -G Ninja ..
$ ninja
MQTT 5.0, 3.1.1, 3.1 프로토콜 구현
Eclipse Paho MQTT Python 클라이언트 라이브러리 사용
추후 Amazon MSK 구현 시 원활함을 위해 MSK에서 권장하는 Kafka 버전인 2.6.2버전으로 테스트
kafka_2.1.2-2.6.2.tgz 설치, ZooKeeper 사용
(Kafka 최신 버전에서는 Zookeeper 의존성 제거를 위해 KRaft로 변경 중에 있으나 아직 안정적인 버전, 참고 문헌 부족 등의 이유로 Zookeeper로 테스트)
$ wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
$ tar -xzf kafka_2.12-2.6.2.tgz
$ cd kafka_2.1.2-2.6.2
Zookeeper 실행
$ bin/zookeeper-server-start.sh config/zookeeper.properties (-daemon)
Kafka broker 실행
$ bin/kafka-server-start.sh config/server.properties (-daemon)
Topic 만들기
$ bin/kafka-topics.sh --create --topic topic_name --bootstrap-server localhost:9092
생성된 Topic 확인
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Topic 상세 정보 확인
$ bin/kafka-topics.sh --describe --topic topic_name --bootstrap-server localhost:9092
Topic에 event 쓰기
$ bin/kafka-console-producer.sh --topic topic_name --bootstrap-server localhost:9092
> 이후 입력하는 줄마다 topic에 기록됨
> Ctrl-C로 중지
Topic의 event 읽기
$ bin/kafka-console-consumer.sh --topic topic_name --from-beginning --bootstrap-server localhost:9092
Kafka Connect 예시
$ nano config/connect-standalone.properties
plugin.path=libs/connect-file-2.6.2.jar
save
$ echo -e "foo\nbar" > test.txt
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
# 만약 nanomq 실행 중 위 명령이 에러가 발생할 경우 사용 포트 충돌 문제
# nanomq의 websocket 포트 = 8083 = kafka connector 포트
# 1. nanomq stop
# 2. /etc/nanomq.conf 파일의 websocket.enable=false로 변경
# 3. nanomq start -d
명령어 실행되면 test.sink.txt 파일 생성
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
echo more_text >> test.txt
텍스트 내용 추가되면 consumer 출력, sink 파일에 추가됨
테스트로 생성한 이벤트, 환경 데이터 삭제
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
https://github.com/dpkp/kafka-python
[카프카] Python으로 Kafka에 전송(Producer)하고 가져오기(consumer)
[Kafka Connect] Connector Rest API 정리
https://github.com/johanvandevenne/kafka-connect-mqtt
Kafka mqtt connector 사용법 - mqtt broker
maven 설치 필요
$ sudo apt install maven
$ git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
$ cd kafka-connect-mqtt
$ mvn clean install
위 open source로는 연결 실패
Apache Camel에서 지원하는 mqtt kafka source connector를 사용하여 mqtt message to kafka 성공
camel-mqtt-source-kafka-connector source configuration
$ sudo apt install maven
$ git clone https://github.com/apache/camel-kafka-connector
$ cd camel-kafka-connector/connectors/camel-mqtt-source-kafka-connector
$ mvn clean package
$ cd target
$ cp KAFKA_PLUGIN_PATH ./*.tar.gz
# kafka directory 이동
$ nano config/camel-mqtt-source.properties
name=CamelMqttSourceConnector
connector.class=org.apache.camel.kafkaconnector.mqttsource.Came>
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
#value.converter=org.apache.kafka.connect.storage.StringConvert>
value.converter=org.apache.kafka.connect.converters.ByteArrayCo>
value.converter.schemas.enable=false
topics=test
camel.kamelet.mqtt-source.topic=test
camel.kamelet.mqtt-source.brokerUrl=tcp://127.0.0.1:1883
converter.encoding=UTF-8
# kafka connect 실행
$ bin/connect-standalone.sh config/connect-standalone.properties config/camel-mqtt-source.properties
# 이후 kafka consumer 실행, mqtt broker로 message publish하여 테스트