안녕하세요.
강의중, kafka의 connect를 위한 dto (kafkaOrderDto) 를 만드는 부분이 있는데 강사님께서는 kafkaOrderDto, schema, payload, field 를 4개 클래스화 하여 사용하셨습니다.
또한 이에 따라 producer와 controller에 해당 dto를 세팅키 위한 코드들이 들어 있습니다.
그런데 저는 이부분이 귀찮아서(..) 클래스를 1개로 통합하여 사용하는 방법으로 짜보았습니다.
저보다 고수님들은 많으실테지만 편의를 위해 통합 Dto를 공유하여 봅니다.
1. KafkaConnectOrderDtoV1 (버전1)
package com.hwan2272.msaecomms.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@Data
public class KafkaConnectOrderDtoV1 {
private Schema schema;
private Payload payload;
public KafkaConnectOrderDtoV1() {
this.setSchemaInit();
}
public KafkaConnectOrderDtoV1(OrderDto orderDto) {
this.setSchemaInit();
this.setPayloadFromOrderDto(orderDto);
}
public void setSchemaInit() {
Schema sch = new Schema("struct",
Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
),
false,
"orders");
this.setSchema(sch);
}
public void setPayloadFromOrderDto(OrderDto orderDto) {
Payload pay = new Payload(
orderDto.getOrderId(),
orderDto.getUserId(),
orderDto.getProductId(),
orderDto.getQty(),
orderDto.getUnitPrice(),
orderDto.getTotalPrice(),
orderDto.getCreatedAt()
);
this.setPayload(pay);
}
@Data
@AllArgsConstructor
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
@Data
@AllArgsConstructor
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private Integer qty;
private Integer unit_price;
private Integer total_price;
private Date created_at;
}
}
schema, payload, field 를 모두 inner class 화 시키고, 메소드를 만들어 내부에서 셋팅이 되도록 했습니다.
여기에서, 개선점 몇가지가 보여 수정했습니다.
1) schema와 payload를 보면 orderDto와 필드명을 비교했을때 필드명이 단순히 CamelCase -> UnderScore 형태이다.
=> 파라미터로 Dto를 던져 필드명만 파싱하면 될것
2) 1을 적용할때 payload를 굳이 inner class화 안해도 될것 같다.
=> HashMap으로 변환
3) schema와 payload를 세팅하는 Dto의 변환 로직을 비슷하게 하면 될것 같다.
=> for 문으로 Dto를 돌아 필요한 세팅을 지정
이외 이런저런 생각들을 기반으로, V2도 만들게 됐습니다.
2. KafkaConnectOrderDtoV2 (버전2)
package com.hwan2272.msaecomms.dto;
import com.google.common.base.CaseFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.*;
@Data
public class KafkaConnectOrderDtoV2 {
private Schema schema;
//private Payload payload;
private Map<String, Object> payload;
/*public KafkaConnectOrderDtoIncrease() {
this.setSchemaInit();
}*/
public KafkaConnectOrderDtoV2(OrderDto orderDto) throws Exception {
this.setSchemaInit(orderDto);
this.setPayloadFromOrderDto(orderDto);
}
public void setSchemaInit(OrderDto orderDto) {
List<Field> fieldList = new ArrayList<>();
for(java.lang.reflect.Field f : orderDto.getClass().getDeclaredFields()) {
if(!f.getName().equals("createdAt")) {
String underscoreField = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, f.getName());
String simpleType = f.getType().getSimpleName();
Field field = new Field(
(simpleType.toLowerCase().contains("string") ? "string"
: simpleType.toLowerCase().contains("integer") ? "int32"
: simpleType.toLowerCase().contains("date") ? "int64" : ""),
true,
underscoreField
);
fieldList.add(field);
}
}
Schema sch = new Schema(
"struct",
fieldList,
false,
"orders");
this.setSchema(sch);
}
public void setPayloadFromOrderDto(OrderDto orderDto) {
Map<String, Object> payMap = new LinkedHashMap<>();
for(java.lang.reflect.Field f : orderDto.getClass().getDeclaredFields()) {
f.setAccessible(true);
String underscoreField = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, f.getName());
Object data = null;
try {
data = f.get(orderDto);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
payMap.put(underscoreField, data);
}
this.setPayload(payMap);
}
@Data
@AllArgsConstructor
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
/*@Data
@AllArgsConstructor
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private Integer qty;
private Integer unit_price;
private Integer total_price;
private Date created_at;
}*/
}
구글에서 제공하는 CaseFormat이 주력이 되겠습니다.
com.google.common.base.CaseFormat
String underscoreField = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, f.getName());
파라미터 orderDto의 필드는 java.lang.reflect.Field으로 가져옵니다.
해당 Dto를 사용하게 되면 orderDto만 파라미터로 넣어줄시 자동으로 Dto의 필드명들을 파싱하여 schema와 payload를 만드는 효과를 갖게 됩니다.
다만 V2는 setPayload에서 try catch등의 exception 핸들링이 필요합니다.
한가지 고민은 createdAt 같은 date 형 데이터들은 kafka에서 int64로 받아들이며 실제 토픽에도 숫자형으로 들어가지만
MariaDB에서 처리가 안되는 점입니다.
구글링해보니 이를 처리키위한 kafka connect의 config설정이나 기타 방법이 있긴한것 같습니다만,
여기에서는 createdAt이라는 칼럼의 특성상,
DB insert에만 의존하도록 하여야 할것으로 생각되어 Dto의 필드 파싱시에는 제외하였습니다.
V1, V2 적용버전 postman에서 테스트후 MariaDB적재 모두 정상 확인했습니다.
3. Producer에서의 사용
@Service
@Slf4j
public class KafkaConnectProducer {
@Autowired
KafkaTemplate kafkaTemplate;
public KafkaConnectOrderDtoV2 send(String topic, OrderDto orderDto) {
//KafkaConnectOrderDtoV1 kafkaConnectOrderDto = new KafkaConnectOrderDtoV1(orderDto);
//kafkaConnectOrderDto.setPayloadFromOrderDto(orderDto);
KafkaConnectOrderDtoV2 kafkaConnectOrderDto = null;
try {
kafkaConnectOrderDto = new KafkaConnectOrderDtoV2(orderDto);
} catch (Exception e) {
e.printStackTrace();
}
(저는 KafkaTemplate을 Autowired 로 사용합니다.)
결과적으로 핵심 코드는 모두 V1, V2 안에 들어 있고,
controller나 producer에서는 기본 선언 정도만 하는것으로 하면 되도록 되었습니다.
코드량을 조금이라도 줄여 보고자 시작한 작업이었습니다.
참고하셔서 여러분만의 DTO를 만들어 보셔도 좋고, (JsonObject를 만들어 형태만 맞춰서 던지면 무방할듯 보입니다.)
수강생 여러분께 조금이라도 도움이 되었으면 합니다.
감사합니다.