flume.root.logger=DEBUG
root.logger를 debug로 바꾸니 아래와 같은 에러가 발생했다.
18 Mar 2020 08:58:16,193 DEBUG KafkaSource - Event #: 0
18 Mar 2020 08:58:16,193 ERROR KafkaSource - KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)
18 Mar 2020 08:58:16,193 ERROR KafkaSource - KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)
18 Mar 2020 08:58:16,196 DEBUG KafkaSource - Waited: 1002
18 Mar 2020 08:58:16,196 DEBUG KafkaSource - Event #: 0
18 Mar 2020 08:58:16,196 ERROR KafkaSource - KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)
18 Mar 2020 08:58:16,196 DEBUG KafkaSource - Waited: 1002
18 Mar 2020 08:58:16,196 DEBUG KafkaSource - Event #: 0
18 Mar 2020 08:58:16,196 ERROR KafkaSource - KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)
에러를 따라가다보니 kafkaSource.class에서 log level이 debug일때만 실행되는 소스가 있는데
if (!iterStatus) {
if (log.isDebugEnabled()) {
this.counter.incrementKafkaEmptyCount();
log.debug("Returning with backoff. No more data to read");
}
return Status.BACKOFF;
incrementKafkaEmptyCount를 할때
return increment(COUNTER_KAFKA_EMPTY)
를 하는데, increment시에
protected long increment(String counter) {
return counterMap.get(counter).incrementAndGet();
}
counterMap.get(counter) 이 부분이 null이어서 nullPointerException 발생.
수정된 버그(https://issues.apache.org/jira/browse/FLUME-2578)라고 하는데 우리는 kafka-source를 낮은버전을 써서 발생하는 듯.
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-kafka-source</artifactId>
<version>1.6.0</version>
</dependency>
...
log4j.logger.org.apache.flume.source = INFO
일단 해당 부분을 info로 해두었다.
잘 읽었습니다.
카프카 버전을 올릴 수 있게 되시면 좋겠네요.