쿠팡과 같은 쇼핑몰의 주 데이터 저장소를 HBase 로 한다고 했을 때, 어떤 데이터 모델과 key 설계를 해야할지 고민해보자.
쿠팡의 댓글 기능을 설계해본다.
댓글기능을 중심으로 요구사항을 작성해 보았다.

Order : 주문을 해야 프로덕트의 코멘트가 남으니까 주문정보가 있어야 함.
Product : content 는 이미지도 올라갈 수 있고 여러 가지 올라갈 수 있으니까 byteArray로 해놓았음. 그리고 생성된시간, 업데이트 된 시간 기록할 수 있도록 해놓았음Comment : 코멘트 자체에는 id를 두지 않았음. product id와 order id를 조합하면 유니크한 키가 됨. 요즘은 수정됨. 이라고 뜨니까 isEdited 등 class Comment {
String orderld;
String productid;
String userld;
String content;
long timestamp;
boolean isEdited;
}
class Order {
String id;
String userId;
}
class User {
String id;
String name;
String nickname;
}
class Product {
String id;
String sellerId;
byte[] content;
long createdAt;
long updatedAt;
}
class Seller {
String id;
String name;
}
Comment --> Order : dependent
Order --> User : dependent
Comment --> User : dependent
Comment --> Product : dependent
Product --> Seller : dependent
ididididorderId:productIduserId:reverseTimestampOfComment10개 조회해서 10개의 코멘트 row key얻은 다음에 그 rowkey로 코멘트들을 다시 조회하는 식이 될 것
productId:c:reverseTimestampOfComment:reverseLengthOfContentInCommentRowKey에 가장 먼저 productId가 들어 가야 pid별로 모이고, 전체가 spot하게 분산이 될 것임. 잘 팔리는 상품은 리뷰를 동시에 여러 사람이 남길 수도 있으니 timestamp가 같은 서로 다른 리뷰들이 있을 수 있음. reverseLengthOfContentInComment를 콘텐츠 length를 길게 한 애들이 같은 시간에서는 더 앞에 올 수 있도록 위치시킴.
productId:r:number결국 프로덕트에 남겨진 코멘트들 중에 좋은 코멘트들 10개씩 남긴다고 했으니 index table용도로 남기고, 이거를 어떤 걸 위치시킬지 계산하는 건 다른 시스템을 만들 거고, Overwrite 한다고 생각을 했다. 그래서 pid가 앞에 옴.
product-comment 와 RowKey의 길이 수가 다르다.
Value: RowKey of comment로 한 이유
우리는 Index table 방식을 채택했다. 코멘트 데이터는 양이 매우 많기 때문에, 이를 value로 직접 저장하면 스토리지를 많이 차지하는 단점이 있다. 또한 같은 데이터를 여러 곳에 저장할 경우 RDB처럼 트랜잭션으로 연결할 수 없으므로, 원본이 변경될 때마다 모든 저장소를 갱신해야 한다. 하지만 HBase는 이러한 다중 업데이트를 보장하기 어렵고, 이를 직접 구현하려면 시스템 복잡도가 높아져 버그 발생 가능성이 크다.
따라서 우리는 comment의 RowKey만 index table에 저장하기로 했다. RowKey는 일반적으로 변하지 않으므로, index table은 한 번 저장된 후 immutable하게 유지된다.
물론 이 방식은 index table 조회 후 원본 코멘트를 다시 조회해야 하므로 응답 속도가 다소 느려진다. 그러나 채팅 서비스처럼 밀리초 단위의 빠른 응답이 요구되는 것이 아니라, 쇼핑몰 코멘트 조회처럼 100~200ms 정도의 지연이 허용 가능한 서비스라면 충분히 감수할 수 있다고 판단했다. 결국, 속도보다 데이터 일관성과 관리 용이성을 우선시하여 이런 설계를 적용했다.
아래 순서로 진행한다.
settings.gradle
rootProject.name = 'de-hbase-coupang'
build.gradle
plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
group 'de.hbase.coupang'
version '1.0'
repositories { mavenCentral() }
application {
mainClassName = 'de.hbase.coupang.CoupangApp'
}
dependencies {
implementation "com.linecorp.armeria:armeria:1.22.1"
runtimeOnly 'ch.qos.logback:logback-classic:1.3.6'
compileOnly 'org.projectlombok:lombok:1.18.26'
annotationProcessor 'org.projectlombok:lombok:1.18.26'
implementation 'org.apache.hbase:hbase-client:2.5.3-hadoop3'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'com.google.guava:guava:33.0.0-jre'
}
test { useJUnitPlatform() }
de.hbase.common.HBaseData
package de.hbase.common;
public interface HBaseData {
byte[] getRowKey();
}
de.hbase.common.KeyBytes
package de.hbase.common;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.primitives.Bytes as GBytes;
public final class KeyBytes {
private KeyBytes(){}
// 모든 id는 정확히 10글자라는 가정
public static byte[] id10(String id) {
if (id == null || id.length() != 10) {
throw new IllegalArgumentException("id must be exactly 10 chars");
}
return Bytes.toBytes(id);
}
public static byte[] revTs(long ts) {
long rev = Long.MAX_VALUE - ts;
return Bytes.toBytes(rev);
}
public static byte[] revLen(int len) {
int rev = Integer.MAX_VALUE - len;
return Bytes.toBytes(rev);
}
public static byte[] cat(byte[]... parts) {
byte[] out = new byte[0];
for (byte[] p : parts) out = com.google.common.primitives.Bytes.concat(out, p);
return out;
}
}
de.hbase.coupang.data.User
package de.hbase.coupang.data;
import de.hbase.common.HBaseData;
import static de.hbase.common.KeyBytes.*;
import org.apache.hadoop.hbase.util.Bytes;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class User implements HBaseData {
private String id;
private String name;
private String nickname;
@Override public byte[] getRowKey() { return id10(id); }
}
de.hbase.coupang.data.Seller
package de.hbase.coupang.data;
import de.hbase.common.HBaseData;
import static de.hbase.common.KeyBytes.*;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class Seller implements HBaseData {
private String id;
private String name;
@Override public byte[] getRowKey() { return id10(id); }
}
de.hbase.coupang.data.Product
package de.hbase.coupang.data;
import de.hbase.common.HBaseData;
import static de.hbase.common.KeyBytes.*;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class Product implements HBaseData {
private String id;
private String sellerId;
private byte[] content; // 이미지/리치콘텐츠
private long createdAt;
private long updatedAt;
@Override public byte[] getRowKey() { return id10(id); }
}
de.hbase.coupang.data.Order
package de.hbase.coupang.data;
import de.hbase.common.HBaseData;
import static de.hbase.common.KeyBytes.*;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class Order implements HBaseData {
private String id;
private String userId;
@Override public byte[] getRowKey() { return id10(id); }
}
de.hbase.coupang.data.Comment
package de.hbase.coupang.data;
import de.hbase.common.HBaseData;
import static de.hbase.common.KeyBytes.*;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class Comment implements HBaseData {
private String orderId; // 10 bytes
private String productId; // 10 bytes
private String userId; // 10 bytes
private String content;
private long timestamp;
private boolean isEdited;
// RowKey: orderId(10) + productId(10)
@Override public byte[] getRowKey() {
return cat(id10(orderId), id10(productId));
}
}
de.hbase.coupang.storage.AbstractJsonStorage<T extends HBaseData>
package de.hbase.coupang.storage;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.hbase.common.HBaseData;
import lombok.Getter;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
public abstract class AbstractJsonStorage<T extends HBaseData> {
@Getter protected final String tableName;
protected final Table table;
@Getter protected final byte[] cf = Bytes.toBytes("cf");
@Getter protected final byte[] qualifier = Bytes.toBytes("q");
protected final ObjectMapper objectMapper = new ObjectMapper();
public AbstractJsonStorage(String tableName, Connection connection) throws IOException {
this.tableName = tableName;
this.table = connection.getTable(TableName.valueOf(tableName));
}
public abstract Class<T> clazz();
public T get(byte[] rowKey) throws IOException {
Result r = table.get(new Get(rowKey));
byte[] raw = r.getValue(cf, qualifier);
if (raw == null) return null;
return objectMapper.readValue(raw, clazz());
}
public void put(T value) throws IOException {
Put p = new Put(value.getRowKey());
p.addColumn(cf, qualifier, objectMapper.writeValueAsBytes(value));
table.put(p);
}
public List<T> getBatch(List<byte[]> rowKeys) throws IOException {
List<Get> gets = rowKeys.stream().map(Get::new).collect(Collectors.toList());
Result[] results = table.get(gets);
List<T> out = new ArrayList<>();
for (Result r : results) {
if (r == null) continue;
byte[] raw = r.getValue(cf, qualifier);
if (raw != null) out.add(objectMapper.readValue(raw, clazz()));
}
return out;
}
}
de.hbase.coupang.index.IndexRow
package de.hbase.coupang.index;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data @AllArgsConstructor
public class IndexRow {
private byte[] rowKey;
private byte[] value; // comment row key
}
de.hbase.coupang.storage.index.AbstractIndexStorage
package de.hbase.coupang.storage.index;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.hbase.coupang.index.IndexRow;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public abstract class AbstractIndexStorage {
protected final String tableName;
protected final Table table;
protected final byte[] cf = Bytes.toBytes("cf");
protected final byte[] q = Bytes.toBytes("q");
protected final ObjectMapper om = new ObjectMapper();
public AbstractIndexStorage(String tableName, Connection connection) throws IOException {
this.tableName = tableName;
this.table = connection.getTable(TableName.valueOf(tableName));
}
public void put(byte[] rowKey, byte[] commentRowKey) throws IOException {
Put p = new Put(rowKey);
p.addColumn(cf, q, commentRowKey); // 값에는 comment의 rowkey만 저장
table.put(p);
}
public List<IndexRow> scanWithPrefix(byte[] prefix, int limit) throws IOException {
Filter f = new PrefixFilter(prefix);
return scan(f, limit);
}
public List<IndexRow> scanFrom(byte[] startRowKeyInclusive, int limit, boolean reverse) throws IOException {
CompareOperator op = reverse ? CompareOperator.LESS_OR_EQUAL : CompareOperator.GREATER_OR_EQUAL;
Filter f = new RowFilter(op, new BinaryComparator(startRowKeyInclusive));
return scan(f, limit);
}
private List<IndexRow> scan(Filter filter, int limit) throws IOException {
Scan s = new Scan();
s.addColumn(cf, q);
s.setFilter(filter);
s.setLimit(limit);
ResultScanner rs = table.getScanner(s);
List<IndexRow> out = new ArrayList<>();
for (Result r : rs) {
byte[] val = r.getValue(cf, q);
if (val == null) continue;
out.add(new IndexRow(r.getRow(), val));
}
return out;
}
}
// de.hbase.coupang.storage.UserStorage
package de.hbase.coupang.storage;
import de.hbase.coupang.data.User;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
public class UserStorage extends AbstractJsonStorage<User> {
public UserStorage(Connection c) throws IOException { super("user", c); }
@Override public Class<User> clazz() { return User.class; }
}
// de.hbase.coupang.storage.SellerStorage
package de.hbase.coupang.storage;
import de.hbase.coupang.data.Seller;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
public class SellerStorage extends AbstractJsonStorage<Seller> {
public SellerStorage(Connection c) throws IOException { super("seller", c); }
@Override public Class<Seller> clazz() { return Seller.class; }
}
// de.hbase.coupang.storage.ProductStorage
package de.hbase.coupang.storage;
import de.hbase.coupang.data.Product;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
public class ProductStorage extends AbstractJsonStorage<Product> {
public ProductStorage(Connection c) throws IOException { super("product", c); }
@Override public Class<Product> clazz() { return Product.class; }
}
// de.hbase.coupang.storage.OrderStorage
package de.hbase.coupang.storage;
import de.hbase.coupang.data.Order;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
public class OrderStorage extends AbstractJsonStorage<Order> {
public OrderStorage(Connection c) throws IOException { super("order", c); }
@Override public Class<Order> clazz() { return Order.class; }
}
// de.hbase.coupang.storage.CommentStorage
package de.hbase.coupang.storage;
import de.hbase.coupang.data.Comment;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
public class CommentStorage extends AbstractJsonStorage<Comment> {
public CommentStorage(Connection c) throws IOException { super("comment", c); }
@Override public Class<Comment> clazz() { return Comment.class; }
}
// de.hbase.coupang.storage.index.UserCommentIndexStorage
package de.hbase.coupang.storage.index;
import de.hbase.coupang.index.IndexRow;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
import java.util.List;
import static de.hbase.common.KeyBytes.*;
public class UserCommentIndexStorage extends AbstractIndexStorage {
public UserCommentIndexStorage(Connection c) throws IOException { super("user-comment", c); }
public static byte[] key(String userId, long ts) {
return cat(id10(userId), revTs(ts));
}
public List<IndexRow> latest(String userId, int limitPlus1) throws IOException {
return scanWithPrefix(id10(userId), limitPlus1);
}
}
// de.hbase.coupang.storage.index.ProductCommentIndexStorage
package de.hbase.coupang.storage.index;
import de.hbase.coupang.index.IndexRow;
import org.apache.hadoop.hbase.client.Connection;
import java.io.IOException;
import java.util.List;
import static de.hbase.common.KeyBytes.*;
public class ProductCommentIndexStorage extends AbstractIndexStorage {
private static final byte[] C = new byte[]{'c'};
public ProductCommentIndexStorage(Connection c) throws IOException { super("product-comment", c); }
public static byte[] key(String productId, long ts, int contentBytesLen) {
return cat(id10(productId), C, revTs(ts), revLen(contentBytesLen));
}
public static byte[] prefixProduct(String productId) {
return cat(id10(productId), C);
}
public List<IndexRow> latest(String productId, int limitPlus1) throws IOException {
return scanWithPrefix(prefixProduct(productId), limitPlus1);
}
}
// de.hbase.coupang.storage.index.ProductCommentRankStorage
package de.hbase.coupang.storage.index;
import de.hbase.coupang.index.IndexRow;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
import static de.hbase.common.KeyBytes.*;
public class ProductCommentRankStorage extends AbstractIndexStorage {
private static final byte[] R = new byte[]{'r'};
public ProductCommentRankStorage(Connection c) throws IOException { super("product-comment-rank", c); }
public static byte[] key(String productId, short number) {
return cat(id10(productId), R, Bytes.toBytes(number));
}
public static byte[] prefixProduct(String productId) {
return cat(id10(productId), R);
}
public List<IndexRow> topN(String productId, int n) throws IOException {
return scanWithPrefix(prefixProduct(productId), n);
}
}
// de.hbase.coupang.api.CreateProductRequest
package de.hbase.coupang.api;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class CreateProductRequest {
private String id; // 10 chars
private String sellerId; // 10 chars
private byte[] content;
}
// de.hbase.coupang.api.CreateOrderRequest
package de.hbase.coupang.api;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class CreateOrderRequest {
private String id; // 10
private String userId; // 10
}
// de.hbase.coupang.api.CreateCommentRequest
package de.hbase.coupang.api;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class CreateCommentRequest {
private String orderId; // 10
private String productId; // 10
private String userId; // 10 (검증용: order.userId와 동일해야)
private String content;
}
// de.hbase.coupang.api.GetByIndexStartRequest
package de.hbase.coupang.api;
import lombok.*;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class GetByIndexStartRequest {
private byte[] startIndexKey; // Base64 in JSON
private int limit;
}
// de.hbase.coupang.api.GetCommentsResponse
package de.hbase.coupang.api;
import de.hbase.coupang.data.Comment;
import lombok.*;
import java.util.List;
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class GetCommentsResponse {
private List<Comment> comments;
private byte[] nextIndexKey; // null이면 다음 페이지 없음
}
// de.hbase.coupang.api.RankedCommentsRequest
package de.hbase.coupang.api;
import lombok.*;
import java.util.List;
/**
* 랭킹 Upsert용: 1..N 순서대로 commentRowKey(Base64) 리스트를 보냄
*/
@Data @NoArgsConstructor @AllArgsConstructor @Builder
public class RankedCommentsRequest {
private String productId;
private List<byte[]> commentRowKeys; // JSON에서 Base64로 입력
}
de.hbase.coupang.api.CoupangApi
package de.hbase.coupang.api;
import com.linecorp.armeria.common.*;
import com.linecorp.armeria.server.annotation.*;
import de.hbase.coupang.data.*;
import de.hbase.coupang.index.IndexRow;
import de.hbase.coupang.storage.*;
import de.hbase.coupang.storage.index.*;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import static de.hbase.coupang.storage.index.ProductCommentIndexStorage.key as pcKey;
import static de.hbase.coupang.storage.index.UserCommentIndexStorage.key as ucKey;
import static de.hbase.coupang.storage.index.ProductCommentIndexStorage.prefixProduct;
import static de.hbase.coupang.storage.index.ProductCommentRankStorage.key as rankKey;
@RequiredArgsConstructor
public class CoupangApi {
private final UserStorage userStorage;
private final SellerStorage sellerStorage;
private final ProductStorage productStorage;
private final OrderStorage orderStorage;
private final CommentStorage commentStorage;
private final UserCommentIndexStorage userCommentIdx;
private final ProductCommentIndexStorage productCommentIdx;
private final ProductCommentRankStorage productCommentRank;
@Post("/user")
public HttpResponse createRandomUser() throws IOException {
User u = User.builder()
.id(RandomStringUtils.randomAlphabetic(10))
.name(RandomStringUtils.randomAlphabetic(5))
.nickname(RandomStringUtils.randomAlphabetic(5))
.build();
userStorage.put(u);
return HttpResponse.ofJson(HttpStatus.CREATED, u);
}
@Post("/seller")
public HttpResponse createRandomSeller() throws IOException {
Seller s = Seller.builder()
.id(RandomStringUtils.randomAlphabetic(10))
.name(RandomStringUtils.randomAlphabetic(6))
.build();
sellerStorage.put(s);
return HttpResponse.ofJson(HttpStatus.CREATED, s);
}
@Post("/product")
public HttpResponse createProduct(CreateProductRequest req) throws IOException {
Product p = Product.builder()
.id(req.getId())
.sellerId(req.getSellerId())
.content(req.getContent())
.createdAt(System.currentTimeMillis())
.updatedAt(System.currentTimeMillis())
.build();
productStorage.put(p);
return HttpResponse.ofJson(HttpStatus.CREATED, p);
}
@Post("/order")
public HttpResponse createOrder(CreateOrderRequest req) throws IOException {
Order o = Order.builder().id(req.getId()).userId(req.getUserId()).build();
orderStorage.put(o);
return HttpResponse.ofJson(HttpStatus.CREATED, o);
}
@Post("/comment")
public HttpResponse createComment(CreateCommentRequest req) throws IOException {
// (옵션) 주문-유저 정합성 간단 검증
Order ord = orderStorage.get(Bytes.toBytes(req.getOrderId()));
if (ord == null || !ord.getUserId().equals(req.getUserId())) {
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8,
"order.userId mismatch or order not found");
}
long now = System.currentTimeMillis();
Comment c = Comment.builder()
.orderId(req.getOrderId())
.productId(req.getProductId())
.userId(req.getUserId())
.content(req.getContent())
.timestamp(now)
.isEdited(false)
.build();
commentStorage.put(c);
// 인덱스 upsert
byte[] commentRowKey = c.getRowKey();
int contentLenBytes = req.getContent() == null ? 0
: req.getContent().getBytes(StandardCharsets.UTF_8).length;
userCommentIdx.put(UserCommentIndexStorage.key(req.getUserId(), now), commentRowKey);
productCommentIdx.put(ProductCommentIndexStorage.key(req.getProductId(), now, contentLenBytes), commentRowKey);
return HttpResponse.ofJson(HttpStatus.CREATED, c);
}
// 상품 최신 댓글 (첫 페이지)
@Get("/product/{productId}/comments/latest")
public HttpResponse getProductCommentsLatest(@Param("productId") String productId,
@Param("limit") int limit) throws IOException {
List<IndexRow> rows = productCommentIdx.latest(productId, limit + 1);
return HttpResponse.ofJson(HttpStatus.OK, toCommentsResponse(rows, limit));
}
// 상품 댓글 - 다음 페이지 (IndexRowKey 기준)
@Post("/product/comments/get")
public HttpResponse getProductCommentsNext(GetByIndexStartRequest req) throws IOException {
List<IndexRow> rows = productCommentIdx.scanFrom(req.getStartIndexKey(), req.getLimit() + 1, false);
return HttpResponse.ofJson(HttpStatus.OK, toCommentsResponse(rows, req.getLimit()));
}
// 유저 타임라인 최신 (내가 남긴 댓글)
@Get("/user/{userId}/comments/latest")
public HttpResponse getUserCommentsLatest(@Param("userId") String userId,
@Param("limit") int limit) throws IOException {
List<IndexRow> rows = userCommentIdx.latest(userId, limit + 1);
return HttpResponse.ofJson(HttpStatus.OK, toCommentsResponse(rows, limit));
}
// 유저 타임라인 - 다음 페이지
@Post("/user/comments/get")
public HttpResponse getUserCommentsNext(GetByIndexStartRequest req) throws IOException {
List<IndexRow> rows = userCommentIdx.scanFrom(req.getStartIndexKey(), req.getLimit() + 1, false);
return HttpResponse.ofJson(HttpStatus.OK, toCommentsResponse(rows, req.getLimit()));
}
// 상품 랭킹 댓글 조회 (상위 N)
@Get("/product/{productId}/comments/ranked")
public HttpResponse getRanked(@Param("productId") String productId,
@Param("limit") int limit) throws IOException {
List<IndexRow> rows = productCommentRank.topN(productId, limit);
List<byte[]> commentRowKeys = rows.stream().map(IndexRow::getValue).collect(Collectors.toList());
List<Comment> comments = commentStorage.getBatch(commentRowKeys);
return HttpResponse.ofJson(HttpStatus.OK, GetCommentsResponse.builder()
.comments(comments)
.nextIndexKey(null) // 랭킹은 페이지 의미 X
.build());
}
// 랭킹 Upsert (실습용)
@Post("/product/comments/ranked/upsert")
public HttpResponse upsertRank(RankedCommentsRequest req) throws IOException {
short i = 1;
for (byte[] ck : req.getCommentRowKeys()) {
productCommentRank.put(ProductCommentRankStorage.key(req.getProductId(), i), ck);
i++;
}
return HttpResponse.of(HttpStatus.OK);
}
// 공통: limit+1 스캔 → nextIndexKey 할당
private GetCommentsResponse toCommentsResponse(List<IndexRow> rows, int limit) throws IOException {
byte[] next = null;
if (rows.size() > limit) {
next = rows.get(rows.size() - 1).getRowKey();
rows = rows.subList(0, limit);
}
List<byte[]> commentRowKeys = rows.stream().map(IndexRow::getValue).collect(Collectors.toList());
List<Comment> comments = commentStorage.getBatch(commentRowKeys);
return GetCommentsResponse.builder().comments(comments).nextIndexKey(next).build();
}
}
de.hbase.coupang.CoupangApp
package de.hbase.coupang;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.docs.DocService;
import de.hbase.coupang.api.CoupangApi;
import de.hbase.coupang.storage.*;
import de.hbase.coupang.storage.index.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
@Slf4j
public class CoupangApp {
public static void main(String[] args) throws Exception {
if (args.length != 1 || StringUtils.isBlank(args[0])) {
log.error("pass zookeeper quorum of HBase as first argument");
System.exit(1);
}
String zk = args[0];
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.master", zk + ":16000");
conf.set("hbase.zookeeper.quorum", zk);
conf.set("hbase.zookeeper.property.clientPort", "2181");
HBaseAdmin.available(conf);
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
log.info("table 'user' available: {}", admin.isTableAvailable(TableName.valueOf("user")));
log.info("table 'seller' available: {}", admin.isTableAvailable(TableName.valueOf("seller")));
log.info("table 'product' available: {}", admin.isTableAvailable(TableName.valueOf("product")));
log.info("table 'order' available: {}", admin.isTableAvailable(TableName.valueOf("order")));
log.info("table 'comment' available: {}", admin.isTableAvailable(TableName.valueOf("comment")));
log.info("table 'user-comment' available: {}", admin.isTableAvailable(TableName.valueOf("user-comment")));
log.info("table 'product-comment' available: {}", admin.isTableAvailable(TableName.valueOf("product-comment")));
log.info("table 'product-comment-rank' available: {}", admin.isTableAvailable(TableName.valueOf("product-comment-rank")));
UserStorage userS = new UserStorage(conn);
SellerStorage sellerS = new SellerStorage(conn);
ProductStorage productS = new ProductStorage(conn);
OrderStorage orderS = new OrderStorage(conn);
CommentStorage commentS = new CommentStorage(conn);
UserCommentIndexStorage ucIdx = new UserCommentIndexStorage(conn);
ProductCommentIndexStorage pcIdx = new ProductCommentIndexStorage(conn);
ProductCommentRankStorage prIdx = new ProductCommentRankStorage(conn);
ServerBuilder sb = Server.builder();
sb.http(8080);
Server server = sb
.annotatedService(new CoupangApi(userS, sellerS, productS, orderS, commentS, ucIdx, pcIdx, prIdx))
.serviceUnder("/docs", new DocService())
.build();
server.start().join();
}
}
create 'user', 'cf'
create 'seller', 'cf'
create 'product', 'cf'
create 'order', 'cf'
create 'comment', 'cf'
create 'user-comment', 'cf'
create 'product-comment', 'cf'
create 'product-comment-rank', 'cf'
list
describe 'comment'
describe 'product-comment'
describe 'product-comment-rank'
./gradlew build shadowJar
scp -i $KEY build/libs/de-hbase-coupang-1.0-all.jar ubuntu@$HBASE_NODE:~/
java -jar de-hbase-coupang-1.0-all.jar localhost
# http://$HOST:8080/docs 로 API 문서 확인
Comment RowKey = orderId(10) + productId(10) → 불변/유일성
user-comment (index) RowKey = userId(10) + reverseTs(8) → 유저 타임라인 최신순
product-comment (index) RowKey = productId(10) + 'c'(1) + reverseTs(8) + reverseLen(4) → 상품 댓글 최신순 + 동일 ts tie-breaker
product-comment-rank (index) RowKey = productId(10) + 'r'(1) + number(2) → 상위 N 랭킹(Upsert)
Index Value = 항상 원본 comment의 RowKey만 저장 (스토리지 절감 + 일관성 유지)
페이지네이션은 limit+1 조회 후 마지막 row의 rowKey를 nextIndexKey로 반환 → 다음 페이지 시작점