worker2가 수집
flume : 보내는 애
받는 애
파이썬프로듀서가 메세지 보내면 vm브로커 vmflume 받는애
flume 보내는애 보내면 컨슈머가 받을 수 있게
[플룸] -> [카프카 브로커] -> [컨슈머]
프로듀서
1) /etc/hosts 설정
[브로커 IP] broker
2) 플룸 설정
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type = exec
agent.sources.s1.command = tail -F /var/log/syslog
agent.sources.s1.channels = c1
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.topic = test
agent.sinks.k1.kafka.bootstrap.servers = 192.168.100.201:9092
agent.sinks.k1.kafka.flumeBatchSize = 100
agent.sinks.k1.kafka.producer.acks = 0
agent.sinks.k1.channel = c1
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.type= memory
플럼 실행하는법
/opt/flume/bin/flume-ng agent -n agent --conf /opt/flume/conf -f /opt/flume/conf/flume-conf.properties
플럼에서 로그 남기는법
logger -p local1.notice logging hahahahahahaha
프로듀서 -> [카프카 브로커] -> [플룸]
컨슈머
1) /etc/hosts 설정
[브로커 IP] broker
agent.sources = k1
agent.channels = c1
agent.sinks = s1
agent.sources.k1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.k1.kafka.topics = test
agent.sources.k1.kafka.bootstrap.servers = 192.168.100.201:9092
agent.sources.k1.channels = c1
agent.sinks.s1.type= FILE_ROLL
agent.sinks.s1.channel = c1
agent.sinks.s1.sink.directory = /opt/flume/data
agent.sinks.s1.sink.rollInterval = 60
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.type= memory
다하고 확인하고 싶으면 /opt/flume/data
1) /etc/hosts 설정
[브로커 IP] broker
2) 파일비트 설정
/etc/filebeat/filebeat.yml
다른 output 설정은 주석처리하고
밑에 내용 추가
output.kafka:
hosts: ["192.168.100.201:9092"]
topic: "test"
3) 실행
systemctl restart filebeat
1) /etc/hosts 설정
[브로커 IP] broker
2) 파일비트 설정
/etc/logstash/conf.d/logstash.conf
input {
file {
path => "csv 파일의 절대 경로"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["Country","1980","1981","1982","1983","1984","1985","1986","1987","1988","1989","1990","1991","1992","1993","1994","1995","1996","1997","1998","1999","2000","2001","2002","2003","2004","2005","2006","2007","2008","2009","2010"]
}
mutate {convert => ["1980", "float"]}
mutate {convert => ["1981", "float"]}
mutate {convert => ["1982", "float"]}
mutate {convert => ["1983", "float"]}
mutate {convert => ["1984", "float"]}
mutate {convert => ["1985", "float"]}
mutate {convert => ["1986", "float"]}
mutate {convert => ["1987", "float"]}
mutate {convert => ["1988", "float"]}
mutate {convert => ["1989", "float"]}
mutate {convert => ["1990", "float"]}
mutate {convert => ["1991", "float"]}
mutate {convert => ["1992", "float"]}
mutate {convert => ["1993", "float"]}
mutate {convert => ["1994", "float"]}
mutate {convert => ["1995", "float"]}
mutate {convert => ["1996", "float"]}
mutate {convert => ["1997", "float"]}
mutate {convert => ["1998", "float"]}
mutate {convert => ["1999", "float"]}
mutate {convert => ["2000", "float"]}
mutate {convert => ["2001", "float"]}
mutate {convert => ["2002", "float"]}
mutate {convert => ["2003", "float"]}
mutate {convert => ["2004", "float"]}
mutate {convert => ["2005", "float"]}
mutate {convert => ["2006", "float"]}
mutate {convert => ["2007", "float"]}
mutate {convert => ["2008", "float"]}
mutate {convert => ["2009", "float"]}
mutate {convert => ["2010", "float"]}
}
output {
kafka {
bootstrap_servers => "192.168.100.201:9092"
topic_id => "test"
}
}
3) 실행
systemctl restart filebeat
[프로듀서] -> [카프카 브로커] -> [로그스태시]
컨슈머
1) /etc/hosts 설정
[브로커 IP] broker
2) 파일비트 설정
/etc/logstash/conf.d/logstash.conf
input {
kafka {
bootstrap_servers => "192.168.100.201:9092"
topics => ["test"]
}
}
output {
file {
path => "/etc/logstash/data/kafka.log"
}
}