10-5 카프카 연결해보기

kst5137·2022년 3월 11일
0

1. 플럼과 카프카 연결



worker2가 수집

flume : 보내는 애
받는 애

파이썬프로듀서가 메세지 보내면 vm브로커 vmflume 받는애
flume 보내는애 보내면 컨슈머가 받을 수 있게

1. 플룸이 메세지를 카프카로 보내기

[플룸]	->	[카프카 브로커]	->	[컨슈머]
프로듀서

1) /etc/hosts 설정
[브로커 IP] broker

2) 플룸 설정
agent.sources = s1
agent.channels = c1
agent.sinks = k1

agent.sources.s1.type = exec
agent.sources.s1.command = tail -F /var/log/syslog
agent.sources.s1.channels = c1

agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.topic = test
agent.sinks.k1.kafka.bootstrap.servers = 192.168.100.201:9092
agent.sinks.k1.kafka.flumeBatchSize = 100
agent.sinks.k1.kafka.producer.acks = 0
agent.sinks.k1.channel = c1

agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.type= memory

플럼 실행하는법
/opt/flume/bin/flume-ng agent -n agent --conf /opt/flume/conf -f /opt/flume/conf/flume-conf.properties

플럼에서 로그 남기는법
logger -p local1.notice logging hahahahahahaha

2. 플룸이 메세지를 카프카로부터 받기

프로듀서	->	[카프카 브로커]	->	[플룸]
					컨슈머

1) /etc/hosts 설정
[브로커 IP] broker

agent.sources = k1
agent.channels = c1
agent.sinks = s1

agent.sources.k1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.k1.kafka.topics = test
agent.sources.k1.kafka.bootstrap.servers = 192.168.100.201:9092
agent.sources.k1.channels = c1

agent.sinks.s1.type= FILE_ROLL
agent.sinks.s1.channel = c1
agent.sinks.s1.sink.directory = /opt/flume/data
agent.sinks.s1.sink.rollInterval = 60

agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.type= memory

다하고 확인하고 싶으면 /opt/flume/data

2.파일비트와 카프카 연결

  1. 파일비트가 메세지를 카프카로 보내기
    [파일비트] -> [카프카 브로커] -> [컨슈머]
    프로듀서

1) /etc/hosts 설정
[브로커 IP] broker

2) 파일비트 설정
/etc/filebeat/filebeat.yml

다른 output 설정은 주석처리하고
밑에 내용 추가

output.kafka:
  hosts: ["192.168.100.201:9092"]
  topic: "test"

3) 실행
systemctl restart filebeat

3. 로그스태시와 카프카연결

  1. 로그스태시가 카프카에게 메시지를 전송
    [로그스태시] -> [카프카 브로커] -> [컨슈머]
    프로듀서

1) /etc/hosts 설정
[브로커 IP] broker

2) 파일비트 설정
/etc/logstash/conf.d/logstash.conf

input {
  file {
    path => "csv 파일의 절대 경로"
    start_position => "beginning"  
    sincedb_path => "/dev/null"  
  }
}

filter {
  csv {
    separator => ","
    columns => ["Country","1980","1981","1982","1983","1984","1985","1986","1987","1988","1989","1990","1991","1992","1993","1994","1995","1996","1997","1998","1999","2000","2001","2002","2003","2004","2005","2006","2007","2008","2009","2010"]
  }
  mutate {convert => ["1980", "float"]}
  mutate {convert => ["1981", "float"]}
  mutate {convert => ["1982", "float"]}
  mutate {convert => ["1983", "float"]}
  mutate {convert => ["1984", "float"]}
  mutate {convert => ["1985", "float"]}
  mutate {convert => ["1986", "float"]}
  mutate {convert => ["1987", "float"]}
  mutate {convert => ["1988", "float"]}
  mutate {convert => ["1989", "float"]}
  mutate {convert => ["1990", "float"]}
  mutate {convert => ["1991", "float"]}
  mutate {convert => ["1992", "float"]}
  mutate {convert => ["1993", "float"]}
  mutate {convert => ["1994", "float"]}
  mutate {convert => ["1995", "float"]}
  mutate {convert => ["1996", "float"]}
  mutate {convert => ["1997", "float"]}
  mutate {convert => ["1998", "float"]}
  mutate {convert => ["1999", "float"]}
  mutate {convert => ["2000", "float"]}
  mutate {convert => ["2001", "float"]}
  mutate {convert => ["2002", "float"]}
  mutate {convert => ["2003", "float"]}
  mutate {convert => ["2004", "float"]}
  mutate {convert => ["2005", "float"]}
  mutate {convert => ["2006", "float"]}
  mutate {convert => ["2007", "float"]}
  mutate {convert => ["2008", "float"]}
  mutate {convert => ["2009", "float"]}
  mutate {convert => ["2010", "float"]}
}

output {
  kafka {
    bootstrap_servers => "192.168.100.201:9092"
    topic_id => "test"
  }
}

3) 실행
systemctl restart filebeat

2. 로그스태시가 메세지를 카프카로부터 받기

[프로듀서]	->	[카프카 브로커]	->	[로그스태시]
					컨슈머

1) /etc/hosts 설정
[브로커 IP] broker

2) 파일비트 설정
/etc/logstash/conf.d/logstash.conf
input {
kafka {
bootstrap_servers => "192.168.100.201:9092"
topics => ["test"]
}
}

output {
  file {
    path => "/etc/logstash/data/kafka.log"
  }
}
profile
공부중인 학생

0개의 댓글