작성
·
382
·
수정됨
0
안녕하세요,
connect와 ksql 을 함께 수강중입니다.
시간이 촉박해서 나름 필요한 부분으로 생각되는 부분만 수강하고 있는데요,
csv를 spoolDirCsv로 읽었습니다.
강의용 자료 spooldir_source.json 을 사용했구요
kafka-console-consumer --bootstrap-server localhost:9092 --topic spooldir-test-topic --from-beginning --property print.key=true 하면 잘 나옵니다.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic spooldir-test-topic --from-beginning --property print.key=true 하면 안나오구요.
avro --> elastic search 에 저장하려고 하는데요,
spooldir_source.json 에서 다음 4줄을 추가해봤습니다.
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081"
그랬더니,
kafka-console-consumer 도 안나오고 kafka-avro-console-consumer 도 안나옵니다.
ksql에서 확인해보면, connector와 topic은 생성되어 있습니다.
spoolDirCsv에서 만들어진 topic을 어떻게 avro topic으로 전환할 수 있는지요?
궁극적으로는 elastic search index로 생성하려고 합니다.
만약 설명이 복잡해진다면, 강의목록이라도 알려주실 수 있으시면, 그 부분 수강해서 이해해 보도록 하겠습니다.
감사합니다.
답변 1
0
안녕하세요, 인프런 AI 인턴입니다.
spoolDirCsv로 읽은 csv 데이터를 avro로 전환하려면 Kafka Connect의 AvroConverter를 사용해야 합니다.
먼저 Kafka Connect의 AvroConverter를 사용하기 위해서는 schema.registry.url을 설정해 주어야 합니다. 이 설정은 schema registry가 위치한 URL을 가리키게 되어 있습니다. schema registry는 스키마 패키지를 관리해 주는 서비스로서, schema.json 파일에서 코드로 자동 매핑하는 역할도 수행합니다.
따라서, “key.converter.schema.registry.url”과 “value.converter.schema.registry.url”을 설정한 후에 Kafka Connect로 spoolDirCsv의 데이터를 읽으면 avro 스키마를 이용해 Kafka topic으로 데이터를 쓸 수 있게 됩니다.
다음으로, Elasticsearch에 avro 데이터를 쓰기 위해서는 Kafka Connect의 다른 Connectors 중 하나인 “kafka-connect-elasticsearch”를 사용해야 합니다.
이 Connector는 Kafka topic에서 데이터를 읽어 Elasticsearch index에 데이터를 작성하는 것을 처리하는 기능을 제공합니다. 자세한 설명과 사용 예제는 Kafka documentation을 참고하시면 도움이 될 것입니다.
혹시나 복잡하게 느껴진다면, Connectors를 구동하기 위해서는 별도의 구성 파일 설정이 필요합니다. 본 문제와 관련된 자세한 설정 방법은 Kafka documentation을 참고하시면 도움이 될 것입니다.
그러나, 데이터의 처리 과정은 복잡한 처리를 필요로 하기 때문에 복잡한 작업입니다. 레퍼런스를 참고하며 생성하시길 권장드립니다.
감사합니다.
안녕하세요.
제가 인터넷 보면서 이리저리 해봤는데 아직은 스킬이 모자라서, 잘 안됩니다.
제가 필요한 사항은, CSV로 읽어서 AVRO로 보낸는 것입니다.
AVRO에서 Elastic search로 가는건 알고 있습니다.
config file을 이렇게 했는데, 안되고 있습니다.
{
"name": "csv_spooldir_demand",
"config": {
"tasks.max": "1",
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"input.path": "/home/user/spool_jb_dir",
"input.file.pattern": "^.*\\.csv",
"error.path": "/home/user/spool_jb_dir/error",
"finished.path": "/home/user/spool_jb_dir/finished",
"empty.poll.wait.ms": 9000,
"halt.on.error": "false",
"key.converter.schema.registry.url": "http://192.168.56.101:8081",
"value.converter.schema.registry.url": "http://192.168.56.101:8081",
"topic": "spooldir-demand-topic",
"csv.first.row.as.header": "true",
"schema.generation.enabled": "true"
}
}
강의용 템플릿으로 주신 config에서 아래 2줄만 추가했습니다.
"key.converter.schema.registry.url": "http://192.168.56.101:8081",
"value.converter.schema.registry.url": "http://192.168.56.101:8081",
혹시 답변 가능하시다면 부탁드리고요,아니어도 괜찮습니다.
강의 자체가 워낙 훌륭하여 많이 배우고 있습니다.
감사합니다.