오늘은 Hikari CP에 대해 알아보겠습니다. (이 글은 3.4.1 버전 기준으로 작성되었습니다.)
Hikari CP를 사용하며 커넥션을 어떻게 주고 받는지 궁금하여 알아봤습니다.
CP는 DB와 연결을 미리 생성하여 풀에 보관하고, 필요할 때마다 이 연결을 주고 받는 기술입니다.
DB와 연결은 비용이 크기 때문에, 매번 새로운 연결을 초기화 하는 것은 비효율적 입니다. CP를 사용해 DB 연결의 생성 및 해제를 최소화하고, 효율적으로 자원을 관리할 수 있습니다.
CP는 다음과 같은 동작을 합니다
Hikari CP는 JDBC 연결 풀링을 위한 높은 성능을 제공하는 경량화된 CP 라이브러리입니다.
스프링 부트 2.0부터 default CP가 되었습니다.
먼저 전체적인 동작을 그려봤습니다.
전체적인 그림을 보기엔 좋으나, 코드와 자세한 내용은 더 밑에 정리해두었으니 참고하시면 이해하는데 도움이 될거라고 생각합니다.
일반적인 CP와 동작하는 방법은 같습니다. 다만 FastPool이 존재합니다.
그렇다면 Pool 내부는 어떻게 동작하고 있을까요?
// 커넥션 생성
try (Connection connection = dataSource.getConnection()) {
// 데이터베이스 작업 수행
...
} catch (SQLException e) {
e.printStackTrace();
} finally {
// 커넥션 반납
dataSource.close();
}
일반적으로 트랜잭션은 다음과 같이 실행됩니다.
여기서 커넥션을 받아오는 getConnection()과, 반납하는 close()를 CP가 처리하게됩니다.
getConnection을 얻는 방법은 다음과 같습니다.
1, 2, 3번을 클래스별로 뜯어보겠습니다.
Hikari CP의 데이터 소스입니다.
getConnection
@Override
public Connection getConnection() throws SQLException
{
// 1
if (isClosed()) {
throw new SQLException("HikariDataSource " + this + " has been closed.");
}
// 2
if (fastPathPool != null) {
return fastPathPool.getConnection();
}
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
validate();
try {
pool = result = new HikariPool(this);
this.seal();
}
catch (PoolInitializationException pie) {
if (pie.getCause() instanceof SQLException) {
throw (SQLException) pie.getCause();
}
else {
throw pie;
}
}
}
}
}
return result.getConnection();
}
커넥션 풀로부터 커넥션을 얻기위한 메서드입니다.
*ㅤfastPathPool은 pool과 같은 HikariPool 객체지만, final 멤버여서 생성자에서 초기화되고, 변경이 없습니다. 이는 동기화로 인한 오버헤드를 없에주어 더 빠릅니다.
Hikari CP에 대한 기본 풀링 동작을 제공하는 기본 풀 입니다.
constructor
public HikariPool(final HikariConfig config)
{
super(config);
this.connectionBag = new ConcurrentBag<>(this);
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
checkFailFast();
// 기타 설정 초기화
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
}
getConnection
public Connection getConnection(final long hardTimeout) throws SQLException
{
// 1
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
// 2
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}
final long now = currentTime();
// 3
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
// 4
metricsTracker.recordBorrowStats(poolEntry, startTime);
// 5
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
//6
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
suspendResumeLock.release();
}
}
커넥션이 담기는 가방(리스트) 역할을 합니다.
borrow
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// 1
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// 2
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
// 3
listener.addBagItem(waiting);
// 4
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
이전에 사용했던 커넥션중 재사용 가능한 것을 찾아 반환합니다.
*sharedList에서 사용 가능한 커넥션을 찾아 반환합니다.
a. 대기중인 스레드 수를 1 증가시킵니다.
b. 해당 커넥션이 사용중이 아닌경우, 사용중으로 상태를 변경합니다.
c. 대기중인 다른 스레드에 의해 빼앗긴 커넥션이 있다면 **listener에게 커넥션을 요청합니다.
sharedList에 사용가능한 커넥션이 없다면 listener에게 커넥션을 요청합니다.
***handoffQueue에서 timeout 시간만큼 커넥션을 찾습니다. 주어진 시간동안 커넥션을 반환하지 못하면 null을 반환합니다.
ㅤ
*ㅤsharedList는 커넥션을 공유하는 리스트로, 현재 사용중이지 않은 커넥션이 저장되어 있습니다.
** listener는 커넥션 풀 이벤트를 처리하는 인터페이스를 나타냅니다. listener.addBagItem은 대기중인 스레드를 전달하여 커넥션 요청을 하도록 만듭니다.
*** handoffQueue는 대기중인 스레드가 기다리고 있는 대기열 입니다.
ㅤ
ㅤ
커넥션을 close 하는 방법은 다음과 같습니다.
1, 2, 3번을 클래스별로 뜯어보겠습니다.
ㅤ
Connection 객체의 Proxy입니다.
ㅤ
close
public final void close() throws SQLException {
// 1
closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) {
// 2
leakTask.cancel();
try {
// 3
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
lastAccess = currentTime();
}
if (dirtyBits != 0) {
poolEntry.resetConnectionState(this, dirtyBits);
lastAccess = currentTime();
}
}
catch (SQLException e) {
if (!poolEntry.isMarkedEvicted()) {
throw checkException(e);
}
}
finally {
// 4
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle(lastAccess);
}
}
}
ㅤ
ConcurrentBag에 담겨져 있는 커넥션 인스턴스 입니다.
void recycle(final long lastAccessed) {
// 1
if (connection != null) {
this.lastAccessed = lastAccessed;
hikariPool.recycle(this);
}
}
ㅤ
HikariCP에 대한 기본 풀링 동작을 제공하는 기본 풀 입니다.
void recycle(final PoolEntry poolEntry) {
// 1
metricsTracker.recordConnectionUsage(poolEntry);
// 2
connectionBag.requite(poolEntry);
}
ㅤ
커넥션이 담기는 가방(리스트) 역할을 합니다.
public void requite(final T bagEntry)
{
// 1
bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {
// 2
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
yield();
}
}
// 3
final List<Object> threadLocalList = threadList.get();
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
ㅤ
ㅤ
ㅤ
ㅤ
ㅤ
ㅤ