• 카테고리

    질문 & 답변
  • 세부 분야

    데이터 엔지니어링

  • 해결 여부

    미해결

카프카 Consumer 작동 에러 (p.134)

23.01.09 19:00 작성 23.01.09 19:28 수정 조회수 540

0

안녕하세요 수강 중 아래 실습 진행이 어려워 질문 드립니다.

수집 파일럿 실행 5단계에서 카프카 Consumer 작동 실습시 Resetting offset 문구가 뜨며 데이터 확인이 안됩니다.

  1. 시뮬레이터 재실행 및 스마트카 상태 정보 다시 옮겨 보았지만 안되었습니다.

  2. car-batch-log 를 확인했을때 위와 같이 뜨는데 상관없는지 궁금합니다.

[오류문구]

[root@server02 ~]# kafka-console-consumer --bootstrap-server server02.hadoop.com :9092 --topic SmartCar-Topic --partition 0

23/01/09 18:47:52 INFO utils.Log4jControllerRegistration$: Registered kafka:type =kafka.Log4jController MBean

23/01/09 18:47:52 INFO consumer.ConsumerConfig: ConsumerConfig values:

auto.commit.interval.ms = 5000

auto.offset.reset = latest

bootstrap.servers = [server02.hadoop.com:9092]

check.crcs = true

client.dns.lookup = default

client.id =

connections.max.idle.ms = 540000

default.api.timeout.ms = 60000

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = console-consumer-96835

heartbeat.interval.ms = 3000

interceptor.classes = []

internal.leave.group.on.close = true

isolation.level = read_uncommitted

key.deserializer = class org.apache.kafka.common.serialization.ByteArray Deserializer

max.partition.fetch.bytes = 1048576

max.poll.interval.ms = 300000

max.poll.records = 500

metadata.max.age.ms = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

metrics.sample.window.ms = 30000

partition.assignment.strategy = [class org.apache.kafka.clients.consumer .RangeAssignor]

receive.buffer.bytes = 65536

reconnect.backoff.max.ms = 1000

reconnect.backoff.ms = 50

request.timeout.ms = 30000

retry.backoff.ms = 100

sasl.client.callback.handler.class = null

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.login.callback.handler.class = null

sasl.login.class = null

sasl.login.refresh.buffer.seconds = 300

sasl.login.refresh.min.period.seconds = 60

sasl.login.refresh.window.factor = 0.8

sasl.login.refresh.window.jitter = 0.05

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

send.buffer.bytes = 131072

session.timeout.ms = 10000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.endpoint.identification.algorithm = null

ssl.key.password = null

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = null

ssl.keystore.password = null

ssl.keystore.type = JKS

ssl.protocol = TLS

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

value.deserializer = class org.apache.kafka.common.serialization.ByteArr ayDeserializer

23/01/09 18:47:53 INFO utils.AppInfoParser: Kafka version: 2.2.1-cdh6.3.2

23/01/09 18:47:53 INFO utils.AppInfoParser: Kafka commitId: unknown

23/01/09 18:47:53 INFO consumer.KafkaConsumer: [Consumer clientId=consumer-1, gr oupId=console-consumer-96835] Subscribed to partition(s): SmartCar-Topic-0

23/01/09 18:47:53 INFO clients.Metadata: Cluster ID: AWKsMltnSBSo2f2sVF2P-g

23/01/09 18:47:53 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId =console-consumer-96835] Resetting offset for partition SmartCar-Topic-0 to offs et 14.

감사합니다.

답변 1

답변을 작성해보세요.

1

안녕하세요! "Idy242"님!

Resetting offset 는...카프카 브로커 상태에 따라 발생할 수 있는 메세지 입니다.

그보다 먼저 로그 시물레이터가 정상 실행 되었는지 확인이 필요해 보입니다.

현재 실습 단계에선, P133에서 2개(배치성/실시간성)의 java 로그 시뮬레이터를 실행 시키는데요..

이때 시뮬레이터가 만들어 내는 배치로그와 실시간로그의 생성 되는 위치는 아래와 같습니다.

  1. 배치로그는 mv 명령을 통해 아래의 위치에 파일이 있어야 하고요~

/home/pilot-pjt/working/car-batch-log/

  1. 실시간로그는 java 시뮬레이터 명령과 동시에 아래의 경로에 파일이 만들어 집니다.

/home/pilot-pjt/working/driver-realtime-log

문의하신 현재 파일럿 실습 단계는... 위 2번의 경로에서부터 발생한 데이터를 플럼이 수집해서, 카프카 서버에 전송하고, 이것을 카프카 컨슈머 명령으로 확인 하는 작업 입니다.

현재 보여주신 로그상에선 아예 카프카 서버로 데이터가 들어 오지 않았기 때문에, 카프카 컨슈머가 받은 데이터가 없는 것으로 보입니다.

그럼 카프카의 앞단계인 아래 3가지를 체크해 보셔야 할 것 같습니다.

  1. 실시간 로그 시뮬레이터 명령을 정상 실행 했는지?
    $ java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.

    DriverLogMain 20160101 3 &

  2. 아래 경로에 실시간 로그가 만들어 지는지?
    /home/pilot-pjt/working/driver-realtime-log

  3. 플럼이 정상 작동 중인지? 플럼 로그에 에러는 없는지?

확인 부탁 드립니다~~~ ^^

-빅디 드림

ldy242님의 프로필

ldy242

질문자

2023.01.10

답변 감사합니다. 안내 해주신 사항으로 다시 체크하며 수행해보니 카프카 consumer는 작동이 잘 되고 있습니다.

그런데 p.135의 수집 기능 점검에서

플럼 표준 출력 로그 전송에 문제가 있어 문의드립니다.

[오류 문구]

[root@server02 car-batch-log]# ls -ltr

total 0

[root@server02 car-batch-log]# tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

2023-01-10 10:08:24,949 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: SmartCarInfo_SpoolSource started

2023-01-10 10:08:25,029 INFO org.eclipse.jetty.util.log: Logging initialized @1996ms

2023-01-10 10:08:25,244 INFO org.eclipse.jetty.server.Server: jetty-9.3.25.v20180904, build timestamp: 2018-09-05T06:11:46+09:00, git hash: 3ce520221d0240229c862b122d2b06c12a625732

2023-01-10 10:08:25,460 INFO org.eclipse.jetty.server.AbstractConnector: Started ServerConnector@2d5a0fec{HTTP/1.1,[http/1.1]}{0.0.0.0:41414}

2023-01-10 10:08:25,460 INFO org.eclipse.jetty.server.Server: Started @2427ms

2023-01-10 10:08:25,509 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version: 2.2.1-cdh6.3.2

2023-01-10 10:08:25,509 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId: null

2023-01-10 10:08:25,513 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: DriverCarInfo_KafkaSink: Successfully registered new MBean.

2023-01-10 10:08:25,513 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: DriverCarInfo_KafkaSink started

2023-01-10 10:08:25,560 INFO org.apache.kafka.clients.Metadata: Cluster ID: AWKsMltnSBSo2f2sVF2P-g

 

카프카에 실시간으로 데이터 유입되는 것은 확인이 되는데

[p.135]

tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

해당 명령어에서 오류가 발생합니다.

  • chmod로 권한 설정은 동일하게 진행했습니다.

 

[vi /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log 문구]

[root@server02 ~]# vi /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

2023-01-09 15:57:13,119 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting

2023-01-09 15:57:13,128 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/var/run/cloudera-scm-agent/process/32-flume-AGENT/flume.conf

2023-01-09 15:57:13,132 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1

2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1

2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1

2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:channel1

2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1

2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:channel1

2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: sink1 Agent: tier1

2023-01-09 15:57:13,133 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1

2023-01-09 15:57:13,134 INFO org.apache.flume.conf.FlumeConfiguration: Processing:source1

2023-01-09 15:57:13,134 WARN org.apache.flume.conf.FlumeConfiguration: Agent configuration for 'tier1' has no configfilters.

2023-01-09 15:57:13,155 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [tier1]

2023-01-09 15:57:13,155 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels

2023-01-09 15:57:13,165 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel channel1 type memory

2023-01-09 15:57:13,173 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel channel1

2023-01-09 15:57:13,173 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source source1, type netcat

2023-01-09 15:57:13,193 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: sink1, type: logger

2023-01-09 15:57:13,194 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel channel1 connected to [source1, sink1]

2023-01-09 15:57:13,195 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:source1,state:IDLE} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@23a8f888 counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }

2023-01-09 15:57:13,199 INFO org.apache.flume.node.Application: Starting Channel channel1

2023-01-09 15:57:13,201 INFO org.apache.flume.node.Application: Waiting for channel: channel1 to start. Sleeping for 500 ms

2023-01-09 15:57:13,256 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.

2023-01-09 15:57:13,257 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started

2023-01-09 15:57:13,701 INFO org.apache.flume.node.Application: Starting Sink sink1

2023-01-09 15:57:13,703 INFO org.apache.flume.node.Application: Starting Source source1

2023-01-09 15:57:13,704 INFO org.apache.flume.source.NetcatSource: Source starting

2023-01-09 15:57:13,723 INFO org.eclipse.jetty.util.log: Logging initialized @1856ms

2023-01-09 15:57:13,725 INFO org.apache.flume.source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]

2023-01-09 15:57:13,824 INFO org.eclipse.jetty.server.Server: jetty-9.3.25.v20180904, build timestamp: 2018-09-05T06:11:46+09:00, git hash: 3ce520221d0240229c862b122d2b06c12a625732

2023-01-09 15:57:13,868 INFO org.eclipse.jetty.server.AbstractConnector: Started ServerConnector@5f31a941{HTTP/1.1,[http/1.1]}{0.0.0.0:41414}

2023-01-09 15:57:13,871 INFO org.eclipse.jetty.server.Server: Started @2004ms

2023-01-09 16:27:03,600 INFO org.apache.flume.node.Application: Shutting down configuration: { sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:source1,state:START} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@23a8f888 counterGroup:{ name:null counters:{runner.backoffs.consecutive=225, runner.backoffs=225} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }

2023-01-09 16:27:03,605 INFO org.apache.flume.node.Application: Stopping Source source1

2023-01-09 16:27:03,605 INFO org.apache.flume.lifecycle.LifecycleSupervisor: Stopping component: EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:source1,state:START} }

2023-01-09 16:27:03,605 INFO org.apache.flume.source.NetcatSource: Source stopping

2023-01-09 16:27:03,606 INFO org.apache.flume.node.Application: Stopping Sink sink1

2023-01-09 16:27:03,606 INFO org.apache.flume.lifecycle.LifecycleSupervisor: Stopping component: SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@23a8f888 counterGroup:{ name:null counters:{runner.backoffs.consecutive=225, runner.backoffs=225} } }

2023-01-09 16:27:03,606 INFO org.apache.flume.node.Application: Stopping Channel channel1

 


CM의 클러스터를 재시작하고 하니까 갑자기 됩니다.  

네~ 잘하셨습니다. ^^