
이번 시리즈에서는 Java Redis Client중 Jedis와 Lettuce를 비교해 볼 생각이다.
저번 포스팅에서 이론적인 부분은 어느정도 비교를 했으니, 이번에는 코드를 살펴보며 이론적인 부분이 어떻게 구현되어 있는지 살펴보겠다.
가장 먼저 싱글 스레드 기준으로, 이 사용자 요청이 들어오면 가장 먼저 실행되는 Jedis.java 파일을 살펴보겠다.
애플리케이션 코드에서 get 요청이 들어오면, 다음 메소드가 실행된다.
@Override
public String get(final String key) {
checkIsInMultiOrPipeline(); // 1
return connection.executeCommand(commandObjects.get(key)); // 2
}
1번 주석은 Pipeline 모드나 multi 모드를 판별하는 역할을 하며, 싱글스레드 기준으로 살펴보겠으므로 넘어가겠다.
2번 주석은 실제 명령어가 실행되는 부분이다. 한번 들어가서 무슨 행동을 하는지 살펴보겠다.
public <T> T executeCommand(final CommandObject<T> commandObject) {
final CommandArguments args = commandObject.getArguments(); // 1
sendCommand(args); // 2
if (!args.isBlocking()) {
return commandObject.getBuilder().build(getOne()); // 3
} else {
try {
setTimeoutInfinite();
return commandObject.getBuilder().build(getOne()); // 3
} finally {
rollbackTimeout();
}
}
}
Connection 클래스의 executeCommand() 메소드이며, 크게 3부분으로 나뉜다.
final CommandArguments args = commandObject.getArguments();
파라미터로 받은 commandObject 안에서 커맨드 종류, 인자들, 이 커맨드가 blocking인지 여부, 응답을 어떻게 만들지를 꺼내는 단계이다.
sendCommand(args)
여기서는 Redis 서버에 커맨드를 실행시키기 위한 요청을 전송하며, 이는 쓰기 작업이 이루어진다. 내부에서 connect()로 현재 커넥션이 없다면 Redis와의 Connection후 커맨드를 전송한다.
commandObject.getBuilder().build(getOne())
여기서는 isBlocking에 따라 절차가 달라지는데, 이는 커맨드의 종류 때문이다. 만약 GET, SET처럼 서버가 즉시 응답할 수 있는 메소드라면 바로 응답을 받을 수 있지만 BLPOP, BRPOP, XREAD BLOCK처럼 오래 기다려야 응답을 받을 수 있는 커맨드의 경우는 setTimeoutInfinite()으로 Timeout을 크게 늘려 응답을 받을 때 까지 Blocking하고 결과를 반환한다.
그럼 이제 비교적 간단하게 한 명령을 실행해 보았는데, 이제는 JedisPool 설정을 통해 멀티스레드 환경에서 Jedis가 어떻게 동작하는지 정리해보겠다.
@Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(host, port);
config.setDatabase(0);
JedisPoolConfig pool = new JedisPoolConfig();
pool.setMaxTotal(3);
pool.setMaxIdle(3);
pool.setMinIdle(0);
pool.setMaxWait(Duration.ofSeconds(2));
JedisClientConfiguration clientCfg = JedisClientConfiguration.builder()
.usePooling().poolConfig(pool)
.build();
return new JedisConnectionFactory(config, clientCfg);
}
보통 RedisConfig설정을 할때 이런식으로 할텐데, pool설정에서 개수와 대기시간같은 설정을 할 수 있다.
이제 사용자 Application Code에서
try (Jedis jedis = pool.getResource()) {
return jedis.get("k");
}
다음과 같은 방식으로 Pool을 하나 빌려와 사용한다.
getResource는 다음과 같은 구성으로 되어있다.
@Override
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
여기서는 또 부모 클래스의 getResource로 jedis 객체를 생성하고(물론 이미 존재하면 재사용한다.) 이 setDataSource로 jedis가 Pool 소속이라는걸 알려 close()를 리턴하게 한다.
그리고 상위 클래스인 Pool의 getResource는 다음과 같은 구조를 가지고 있다.
public T getResource() {
try {
return super.borrowObject();
} catch (JedisException je) {
throw je;
} catch (Exception e) {
throw new JedisException("Could not get a resource from the pool", e);
}
}
여기서 또 부모 클래스의 borrowObject()를 호출하는데 중요한 부분만 정리해보자면,
public T borrowObject(Duration borrowMaxWaitDuration) throws Exception {
this.assertOpen();
Instant startInstant = Instant.now();
boolean negativeDuration = borrowMaxWaitDuration.isNegative();
Duration remainingWaitDuration = borrowMaxWaitDuration;
AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() && this.getNumIdle() < 2 && this.getNumActive() > this.getMaxTotal() - 3) {
this.removeAbandoned(ac);
}
PooledObject<T> p = null;
boolean blockWhenExhausted = this.getBlockWhenExhausted();
while(p == null) {
remainingWaitDuration = remainingWaitDuration.minus(this.durationSince(startInstant));
boolean create = false;
p = (PooledObject)this.idleObjects.pollFirst();
if (p == null) {
p = this.create(remainingWaitDuration);
if (!PooledObject.isNull(p)) {
create = true;
}
}
if (blockWhenExhausted) {
if (PooledObject.isNull(p)) {
p = negativeDuration ? (PooledObject)this.idleObjects.takeFirst() : (PooledObject)this.idleObjects.pollFirst(remainingWaitDuration);
}
if (PooledObject.isNull(p)) {
throw new NoSuchElementException(this.appendStats("Timeout waiting for idle object, borrowMaxWaitDuration=" + remainingWaitDuration));
}
} else if (PooledObject.isNull(p)) {
throw new NoSuchElementException(this.appendStats("Pool exhausted"));
}
if (!p.allocate()) {
p = null;
}
if (!PooledObject.isNull(p)) {
try {
this.factory.activateObject(p);
} catch (Exception e) {
try {
this.destroy(p, DestroyMode.NORMAL);
} catch (Exception var14) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException(this.appendStats("Unable to activate object"));
nsee.initCause(e);
throw nsee;
}
}
if (!PooledObject.isNull(p) && this.getTestOnBorrow()) {
boolean validate = false;
Throwable validationThrowable = null;
try {
validate = this.factory.validateObject(p);
} catch (Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
this.destroy(p, DestroyMode.NORMAL);
this.destroyedByBorrowValidationCount.incrementAndGet();
} catch (Exception var12) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException(this.appendStats("Unable to validate object"));
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
this.updateStatsBorrow(p, this.durationSince(startInstant));
return (T)p.getObject();
}
먼저, idle한 jedis 객체가 있는지 확인하고, 없으면 새로 만들기를 시도한다.
p = idleObjects.pollFirst();
if (p == null) {
p = create(remainingWaitDuration);
}
하지만 사용중인 jedis객체가 아까 설정한 pool 최대 개수에 도달했으면 대기한다. 이 경우는 시간 제한을 걸어놓고 대기한다.
if (!p.allocate()) { p = null; }
만약 꺼낸 Jedis 객체가 이미 누군가에게 할당이 됐으면, p를 null로 설정해 밑의 while문에서 다른 Jedis 객체를 재할당받는다.
while(p == null) {
remainingWaitDuration = remainingWaitDuration.minus(this.durationSince(startInstant));
boolean create = false;
p = (PooledObject)this.idleObjects.pollFirst();
if (p == null) {
p = this.create(remainingWaitDuration);
if (!PooledObject.isNull(p)) {
create = true;
}
}
if (blockWhenExhausted) {
if (PooledObject.isNull(p)) {
p = negativeDuration ? (PooledObject)this.idleObjects.takeFirst() : (PooledObject)this.idleObjects.pollFirst(remainingWaitDuration);
}
if (PooledObject.isNull(p)) {
throw new NoSuchElementException(this.appendStats("Timeout waiting for idle object, borrowMaxWaitDuration=" + remainingWaitDuration));
}
} else if (PooledObject.isNull(p)) {
throw new NoSuchElementException(this.appendStats("Pool exhausted"));
}
if (!p.allocate()) {
p = null;
}
if (!PooledObject.isNull(p)) {
try {
this.factory.activateObject(p);
} catch (Exception e) {
try {
this.destroy(p, DestroyMode.NORMAL);
} catch (Exception var14) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException(this.appendStats("Unable to activate object"));
nsee.initCause(e);
throw nsee;
}
}
if (!PooledObject.isNull(p) && this.getTestOnBorrow()) {
boolean validate = false;
Throwable validationThrowable = null;
try {
validate = this.factory.validateObject(p);
} catch (Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
this.destroy(p, DestroyMode.NORMAL);
this.destroyedByBorrowValidationCount.incrementAndGet();
} catch (Exception var12) {
}
p = null;
if (create) {
NoSuchElementException nsee = new NoSuchElementException(this.appendStats("Unable to validate object"));
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
이렇게 동시성 문제도 예방하고 있는것을 볼 수 있다.
이런 구조를 가지고 있어, 한 스레드당 Jedis 객체 하나를 사용하고, 결과적으로는 스레드 하나당 Connection 하나를 사용하는 꼴이 된다. Jedis의 Connection은 한 소켓에 대해 sendCommand()로 write , getOne()로 read작업을 수행한다.
하지만 만약 다른 스레드가 Connection()을 공유 한다면, 동시에 write()작업을 수행하게 될 수 있고, TCP연결은 메세지를 한 덩어리로 보장해주지 않기 때문에, 요청이 섞여서 꼬여버릴수가 있다.
또한 요청과 응답이 묶이지 않는다는 문제도 있어 이러한 구조를 택해야한다.
이런 이유로 Jedis/Connection은 멀티스레드 공유에 대해 thread-safe를 보장하지 않으며, 풀에서 빌려 스레드당 단독 사용하도록 하는 구조를 택한다.
다음 글에서는 Lettuce가 이를 어떻게 해결했는지 알아보도록 하겠다.