[Android] DataStore는 데이터를 어떻게 저장할까

신민준·2025년 11월 9일
0

안드로이드

목록 보기
4/4

들어가며

Manifest Android Interview를 읽으며 안드로이드 파일 시스템에 대해 더욱 알아보기 위한 글이다.

구성 요소나 개념에 대한 내용은 책에 이미 잘 나와있으니 내가 알아볼 것은 코드에서 시작해서 실제 파일 시스템에 어떻게 접근하고 처리하는지를 알아볼 것이다.

대표적인 저장 방식인 DataStore를 다루고자 한다.

DataStore

dataStore.edit { prefs ->
      prefs[ACCESS_TOKEN] = cryptoManager.encrypt(accessToken)
}

흔히 위와 같은 형태로 DataStore를 활용한다.
edit은 다음과 같이 작성되어 있다.

public suspend fun DataStore<Preferences>.edit(
    transform: suspend (MutablePreferences) -> Unit
): Preferences {
    return this.updateData {
        // It's safe to return MutablePreferences since we freeze it in
        // PreferencesDataStore.updateData()
        it.toMutablePreferences().apply { transform(this) }
    }
}

그렇다면 이 updateData를 살펴보자

public suspend fun updateData(transform: suspend (t: T) -> T): T

Ctrl+B로 추적하면 실제 구현체까지는 나오지 않는다.
실제 구현체는 External Libraries에 있는 AAR 목록들에서 찾아볼 수 있는 androidx.datastore.core.DataStoreImpl에 있다.

override suspend fun updateData(transform: suspend (t: T) -> T): T {
        val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
        parentContextElement?.checkNotUpdating(this)
        val childContextElement = UpdatingDataContextElement(
            parent = parentContextElement,
            instance = this
        )
        return withContext(childContextElement) {
            val ack = CompletableDeferred<T>()
            val currentDownStreamFlowState = inMemoryCache.currentState
            val updateMsg =
                Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
            writeActor.offer(updateMsg)
            ack.await()
        }
    }

각 코드를 하나씩 살펴보자

val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
parentContextElement?.checkNotUpdating(this)
val childContextElement = UpdatingDataContextElement(
   parent = parentContextElement,
   instance = this
)

먼저 val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
여기서 UpdatingDataContextElement.Companion.Key은 아래와 같다.

internal class UpdatingDataContextElement(
    private val parent: UpdatingDataContextElement?,
    private val instance: DataStoreImpl<*>
) : CoroutineContext.Element {

    companion object {
        internal val NESTED_UPDATE_ERROR_MESSAGE = """
                Calling updateData inside updateData on the same DataStore instance is not supported
                since updates made in the parent updateData call will not be visible to the nested
                updateData call. See https://issuetracker.google.com/issues/241760537 for details.
            """.trimIndent()
        internal object Key : CoroutineContext.Key<UpdatingDataContextElement>
    }

즉 해당 DataStore가 호출된 코루틴에서 UpdatingDataContextElement를 받아온다.
parentContextElement?.checkNotUpdating(this)
UpdatingDataContextElement가 있는지 확인하고 있다면 지금 해당 코루틴에서 DataStore가 업데이트 중인지 checkNotUpdating를 통해 확인한다.

만약 동일 인스턴스에 대한 중첩 호출(재진입)이 감지되면, 이는 교착 상태(Deadlock)를 유발할 수 있으므로 즉시 예외를 발생시킨다.

val childContextElement = UpdatingDataContextElement(
   parent = parentContextElement,
   instance = this
)

이제 자식이 사용할 새로운 UpdatingDataContextElement를 만든다.

처음에는 이게 잘 이해가 안 갔는데
안드로이드 DataStore에서 예를 들어

// 바깥
dataStoreA.updateData {
	// 안쪽
    dataStoreA.updateData { 
       
        ... 
    }
}

이런식으로 중첩되게 호출되면 데드락이 발생할 수 있지만

// 바깥
dataStoreA.updateData {
	// 안쪽
    dataStoreB.updateData { 
       
        ... 
    }
}

와 같은 경우는 허용되어야 한다.
그렇기에 같은 DataStore에 대하여 parentContextElement?.checkNotUpdating(this)로 작성중인지를 확인하고 다른 DataStore에 대한 중첩호출은 허용되도록 val childContextElement = UpdatingDataContextElement( parent = parentContextElement, instance = this )로 부모-자식 관계를 형성해주는 것이다.

마지막으로

val ack = CompletableDeferred<T>()
val currentDownStreamFlowState = inMemoryCache.currentState
val updateMsg =
   Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
writeActor.offer(updateMsg)
ack.await()

이것은 경쟁 상태를 방지하고 모든 업데이트를 순차적으로 처리하기 위한 로직이다.

  1. val ack = CompletableDeferred<T>()
    • 비동기적으로 수행될 업데이트 작업의 최종 결과를 수신하기 위한 객체를 생성
  2. val currentDownStreamFlowState = inMemoryCache.currentState
    • 불필요한 디스크 읽기를 줄이기 위해 현재 메모리에 캐시된 데이터 상태를 가져옴
  3. val updateMsg = Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
    • 실제 업데이트를 수행할 Actor에게 전달할 메시지(작업 단위)를 생성
  4. writeActor.offer(updateMsg)
    • 생성된 업데이트 메시지를 실제 작업을 처리하는 actor의 메시지 큐에 전송
  5. ack.await()
    • 업데이트 작업을 완료하고 결과를 반환할 때까지 대기

writeActore는 다음과 같이 정의되어 있다.

private val writeActor = SimpleActor<Message.Update<T>>(
   scope = scope,
   onComplete = {
      // We expect it to always be non-null but we will leave the alternative as a no-op
      // just in case.
      it?.let {
         inMemoryCache.tryUpdate(Final(it))
      }
      // don't try to close storage connection if it was not created in the first place.
      if (storageConnectionDelegate.isInitialized()) {
         storageConnection.close()
      }
   },
   onUndeliveredElement = { msg, ex ->
      msg.ack.completeExceptionally(
         ex ?: CancellationException(
            "DataStore scope was cancelled before updateData could complete"
         )
      )
   }
) { msg ->
   handleUpdate(msg)
}

그중 실제로 메시지를 처리하는 handleUpdate를 살펴보자.

private suspend fun handleUpdate(update: Message.Update<T>) {
   update.ack.completeWith(
      runCatching {
         val result: T
         when (val currentState = inMemoryCache.currentState) {
            is Data -> {
               // We are already initialized, we just need to perform the update
               result = transformAndWrite(update.transform, update.callerContext)
            }

            is ReadException, is UnInitialized -> {
               if (currentState === update.lastState) {
                  // we need to try to read again
                  readAndInitOrPropagateAndThrowFailure()

                  // We've successfully read, now we need to perform the update
                  result = transformAndWrite(update.transform, update.callerContext)
               } else {
                  // Someone else beat us to read but also failed. We just need to
                  // signal the writer that is waiting on ack.
                  // This cast is safe because we can't be in the UnInitialized
                  // state if the state has changed.
                  throw (currentState as ReadException).readException
               }
            }

            is Final -> throw currentState.finalException // won't happen
         }
         result
      }
   )
}

그중 살펴볼 것은

is Data -> {
   result = transformAndWrite(update.transform, update.callerContext)
}

is ReadException, is UnInitialized -> {
   if (currentState === update.lastState) {
      readAndInitOrPropagateAndThrowFailure()
      result = transformAndWrite(update.transform, update.callerContext)
   } else {
      throw (currentState as ReadException).readException
   }
}

각각 DataStore가 이미 초기화되었고, 메모리에 정상적인 데이터를 갖고 있다는 Data와 이 막 시작되어 디스크에서 데이터를 한 번도 읽지 않은 상태인 UnInitialized이다.
둘 모두 result = transformAndWrite(update.transform, update.callerContext)로 최종적으로 update하는 건 같다.

readAndInitOrPropagateAndThrowFailure()는 그럼 뭘 할까

private suspend fun readAndInitOrPropagateAndThrowFailure() {
   val preReadVersion = coordinator.getVersion()
   try {
      readAndInit.runIfNeeded()
   } catch (throwable: Throwable) {
      inMemoryCache.tryUpdate(ReadException(throwable, preReadVersion))
      throw throwable
   }
}
internal abstract class RunOnce {
    private val runMutex = Mutex()
    private val didRun = CompletableDeferred<Unit>()
    protected abstract suspend fun doRun()

    suspend fun awaitComplete() = didRun.await()

    suspend fun runIfNeeded() {
        if (didRun.isCompleted) return
        runMutex.withLock {
            if (didRun.isCompleted) return
            doRun()
            didRun.complete(Unit)
        }
    }
}
private va readAndInit = InitDataStore(initTasksList)
private inner class InitDataStore(
        initTasksList: List<suspend (api: InitializerApi<T>) -> Unit>
    ) : RunOnce() {
    ...
    override suspend fun doRun() {
            val initData = if ((initTasks == null) || initTasks!!.isEmpty()) {
                // if there are no init tasks, we can directly read
                readDataOrHandleCorruption(hasWriteFileLock = false)
            } else {
                // if there are init tasks, we need to obtain a lock to ensure migrations
                // run as 1 chunk
                coordinator.lock {
                    val updateLock = Mutex()
                    var initializationComplete = false
                    var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value

                    val api = object : InitializerApi<T> {
                        override suspend fun updateData(transform: suspend (t: T) -> T): T {
                            return updateLock.withLock {
                                check(!initializationComplete) {
                                    "InitializerApi.updateData should not be called after " +
                                        "initialization is complete."
                                }

                                val newData = transform(currentData)
                                if (newData != currentData) {
                                    writeData(newData, updateCache = false)
                                    currentData = newData
                                }

                                currentData
                            }
                        }
                    }

                    initTasks?.forEach { it(api) }
                    // Init tasks have run successfully, we don't need them anymore.
                    initTasks = null
                    updateLock.withLock {
                        initializationComplete = true
                    }
                    // only to make compiler happy
                    Data(
                        value = currentData,
                        hashCode = currentData.hashCode(),
                        version = coordinator.getVersion()
                    )
                }
            }
            inMemoryCache.tryUpdate(initData)
        }
}
  • RunOnce는 오직 한번만 실행되는 것을 보장하는 도구다

doRun이 실행되는데 coordinator.lock은 파일 락(File Lock) 등을 사용하여, 이 앱의 모든 프로세스(여러 앱 인스턴스 포함)를 통틀어 오직 하나의 코루틴만이 이 람다 블록에 진입할 수 있도록 보장한다.

이중 디스크에서 데이터를 읽어오고 만약 파일이 손상되었으면(corrupted) 복구(handle)하는 함수인 readDataOrHandleCorruption를 살펴보자

private suspend fun readDataOrHandleCorruption(hasWriteFileLock: Boolean): Data<T> {
        try {
            return if (hasWriteFileLock) {
                val data = readDataFromFileOrDefault()
                Data(data, data.hashCode(), version = coordinator.getVersion())
            } else {
                val preLockVersion = coordinator.getVersion()
                coordinator.tryLock { locked ->
                    val data = readDataFromFileOrDefault()
                    val version = if (locked) coordinator.getVersion() else preLockVersion
                    Data(
                        data,
                        data.hashCode(),
                        version
                    )
                }
            }
        } catch (ex: CorruptionException) {
            var newData: T = corruptionHandler.handleCorruption(ex)
            var version: Int // initialized inside the try block

            try {
                doWithWriteFileLock(hasWriteFileLock) {
                    // Confirms the file is still corrupted before overriding
                    try {
                        newData = readDataFromFileOrDefault()
                        version = coordinator.getVersion()
                    } catch (ignoredEx: CorruptionException) {
                        version = writeData(newData, updateCache = true)
                    }
                }
            } catch (writeEx: Throwable) {
                // If we fail to write the handled data, add the new exception as a suppressed
                // exception.
                ex.addSuppressed(writeEx)
                throw ex
            }

            // If we reach this point, we've successfully replaced the data on disk with newData.
            return Data(newData, newData.hashCode(), version)
        }
    }

이중 readDataFromFileOrDefault()를 통해 드디어 storage에서 파일을 읽어오게 된다!

private suspend fun readDataFromFileOrDefault(): T {
	return storageConnection.readData()
}
private val storageConnectionDelegate = lazy {
	storage.createConnection()
}
internal val storageConnection by storageConnectionDelegate

먼저 createConnection()를 살펴보자.

interface Storage<T> {
    fun createConnection(): StorageConnection<T>
}
interface StorageConnection<T> : Closeable {
    suspend fun <R> readScope(
        block: suspend ReadScope<T>.(locked: Boolean) -> R
    ): R

    suspend fun writeScope(block: suspend WriteScope<T>.() -> Unit)

    val coordinator: InterProcessCoordinator
}

interface ReadScope<T> : Closeable {

    suspend fun readData(): T
}

interface WriteScope<T> : ReadScope<T> {

    suspend fun writeData(value: T)
}

이 인터페이스의 유일한 목적은 createConnection() 메소드를 제공하는 것이다.
StorageConnection<T>createConnection()이 반환하는 객체로, 실제로 파일(디스크)에 read()write() 작업을 수행하는 구체적인 로직을 담고 있고 실제 구현체는 다음과 같다.

class FileStorage<T>(
    private val serializer: Serializer<T>,
    private val coordinatorProducer: (File) -> InterProcessCoordinator = {
        createSingleProcessCoordinator(it)
    },
    private val produceFile: () -> File
) : Storage<T> {

    override fun createConnection(): StorageConnection<T> {
        val file = produceFile().canonicalFile

        synchronized(activeFilesLock) {
            val path = file.absolutePath
            check(!activeFiles.contains(path)) {
                "There are multiple DataStores active for the same file: $path. You should " +
                    "either maintain your DataStore as a singleton or confirm that there is " +
                    "no two DataStore's active on the same file (by confirming that the scope" +
                    " is cancelled)."
            }
            activeFiles.add(path)
        }

        return FileStorageConnection(file, serializer, coordinatorProducer(file)) {
            synchronized(activeFilesLock) {
                activeFiles.remove(file.absolutePath)
            }
        }
    }

    internal companion object {
        @GuardedBy("activeFilesLock")
        internal val activeFiles = mutableSetOf<String>()

        internal val activeFilesLock = Any()
    }
}

internal class FileStorageConnection<T>(
    private val file: File,
    private val serializer: Serializer<T>,
    override val coordinator: InterProcessCoordinator,
    private val onClose: () -> Unit
) : StorageConnection<T> {

    private val closed = AtomicBoolean(false)
    private val transactionMutex = Mutex()

    override suspend fun <R> readScope(
        block: suspend ReadScope<T>.(locked: Boolean) -> R
    ): R {
        checkNotClosed()

        val lock = transactionMutex.tryLock()
        try {
            return FileReadScope(file, serializer).use {
                block(it, lock)
            }
        } finally {
            if (lock) {
                transactionMutex.unlock()
            }
        }
    }

    override suspend fun writeScope(block: suspend WriteScope<T>.() -> Unit) {
        checkNotClosed()
        file.createParentDirectories()

        transactionMutex.withLock {
            val scratchFile = File(file.absolutePath + ".tmp")
            try {
                FileWriteScope(scratchFile, serializer).use {
                    block(it)
                }
                if (scratchFile.exists() && !scratchFile.atomicMoveTo(file)) {
                    throw IOException(
                        "Unable to rename $scratchFile to $file. " +
                        "This likely means that there are multiple instances of DataStore " +
                        "for this file. Ensure that you are only creating a single instance of " +
                        "datastore for this file."
                    )
                }
            } catch (ex: IOException) {
                if (scratchFile.exists()) {
                    scratchFile.delete() // Swallow failure to delete
                }
                throw ex
            }
        }
    }

    public override fun close() {
        closed.set(true)
        onClose()
    }

    private fun checkNotClosed() {
        check(!closed.get()) { "StorageConnection has already been disposed." }
    }

    private fun File.createParentDirectories() {
        val parent: File? = canonicalFile.parentFile

        parent?.let {
            it.mkdirs()
            if (!it.isDirectory) {
                throw IOException("Unable to create parent directories of $this")
            }
        }
    }
}

일단 StorageConnection을 생성하는 createConnection()부터 살펴보면 다음과 같이 되어있다.

override fun createConnection(): StorageConnection<T> {
   val file = produceFile().canonicalFile

   synchronized(activeFilesLock) {
      val path = file.absolutePath
      check(!activeFiles.contains(path)) {
         "There are multiple DataStores active for the same file: $path. You should " +
                 "either maintain your DataStore as a singleton or confirm that there is " +
                 "no two DataStore's active on the same file (by confirming that the scope" +
                 " is cancelled)."
      }
      activeFiles.add(path)
   }

   return FileStorageConnection(file, serializer, coordinatorProducer(file)) {
      synchronized(activeFilesLock) {
         activeFiles.remove(file.absolutePath)
      }
   }
}

여길 보면 produceFile().canonicalFile로 파일을 불러오는데 그렇다면 이 produceFile()이 뭔지를 알아야 한다. produceFileFileStorage의 파라미터로 () -> File이다. 이건

public actual object DataStoreFactory {
    @JvmOverloads // Generate constructors for default params for java users.
    public fun <T> create(
        serializer: Serializer<T>,
        corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
        migrations: List<DataMigration<T>> = listOf(),
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
        produceFile: () -> File
    ): DataStore<T> = create(
        storage = FileStorage(serializer = serializer, produceFile = produceFile),
        corruptionHandler = corruptionHandler,
        migrations = migrations,
        scope = scope
    )
    ...
}

이 팩토리에서 DataStore가 생성될 때 함께 생성되는데 여기서도 produceFile이 뭔지 알수 없다. 그렇다면 이 팩토리 함수가 호출되는 곳을 찾아가보자.

private val Context.tokenDataStore by preferencesDataStore(name = TOKEN_DATASTORE_NAME)

DataStore 생성을 위임할 때 사용되는 이 preferencesDataStore가 실제 DataStore를 생성하니 이것을 따라가자.

@Suppress("MissingJvmstatic")
public fun preferencesDataStore(
    name: String,
    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
    produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): ReadOnlyProperty<Context, DataStore<Preferences>> {
    return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
}
internal class PreferenceDataStoreSingletonDelegate internal constructor(
    private val name: String,
    private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
    private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
    private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {

    private val lock = Any()

    @GuardedBy("lock")
    @Volatile
    private var INSTANCE: DataStore<Preferences>? = null

    /**
     * Gets the instance of the DataStore.
     *
     * @param thisRef must be an instance of [Context]
     * @param property not used
     */
    override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
        return INSTANCE ?: synchronized(lock) {
            if (INSTANCE == null) {
                val applicationContext = thisRef.applicationContext

                INSTANCE = PreferenceDataStoreFactory.create(
                    corruptionHandler = corruptionHandler,
                    migrations = produceMigrations(applicationContext),
                    scope = scope
                ) {
                    applicationContext.preferencesDataStoreFile(name)
                }
            }
            INSTANCE!!
        }
    }
}

PreferenceDataStoreFactory가 실제 인스턴스를 생성하는 곳을 찾았다!
저 람다가 바로 produceFile이다.

public fun Context.preferencesDataStoreFile(name: String): File =
    this.dataStoreFile("$name.preferences_pb")
public fun Context.dataStoreFile(fileName: String): File =
    File(applicationContext.filesDir, "datastore/$fileName") 

다음과 같이 파일을 생성하는 것을 확인할 수 있다.

public File(File var1, String var2) {
    if (var2 == null) {
        throw new NullPointerException();
    } else {
        if (var1 != null) {
            if (var1.path.isEmpty()) {
                this.path = FS.resolve(FS.getDefaultParent(), FS.normalize(var2));
            } else {
                this.path = FS.resolve(var1.path, FS.normalize(var2));
            }
        } else {
            this.path = FS.normalize(var2);
        }

        this.prefixLength = FS.prefixLength(this.path);
    }
}

그렇다면 이 FS, 파일시스템은 어떻게 되어있을까

static {
    if (IoOverNio.IS_ENABLED_IN_GENERAL) {
        FS = new IoOverNioFileSystem(DefaultFileSystem.getFileSystem());
    } else {
        FS = DefaultFileSystem.getFileSystem();
    }
   ...
}

일단 위와 같이 FS를 선언한다.

class IoOverNioFileSystem extends FileSystem {
    private final FileSystem parent;

    IoOverNioFileSystem(FileSystem var1) {
        this.parent = var1;
    }
    ...
}
final class DefaultFileSystem {
    private DefaultFileSystem() {
    }

    public static FileSystem getFileSystem() {
        return new UnixFileSystem();
    }
}

여기서 UnixFileSystem과 IoOverNioFileSystem을 간단히 설명하자면

  • UnixFileSystem : 구형 레거시, Java 7 이전에 사용됨, Unix/Linux 시스템에서 파일 작업을 할 때 사용되던 실제 구현체로 직접 네이티브(C) 코드를 호출하여 운영체제와 통신
  • IoOverNioFileSystem : 신형 어댑터, Java 7 이후에 사용됨, 클래스의 요청을 처리하는 '어댑터(Adapter)'로 실제 DefaultFileSystem이 처리한다.

여기서 처음 알게된 건데 DefaultFileSystem는 위의 코드에서 확인할 수 있다시피 UnixFileSystem을 리턴한다. 결국 처리는 UnixFileSystem가 하고 IoOverNioFileSystem는 정말 어댑터의 역할만을 수행한다.

그렇다면 File 생성자의

this.path = FS.resolve(FS.getDefaultParent(), FS.normalize(var2));

에서 이 resolve는 아래와 같이 parent에 위임하고

class IoOverNioFileSystem extends FileSystem {
	...
	public String resolve(String var1, String var2) {
        return this.parent.resolve(var1, var2);
    }
    ...
}

이 parent는 바로 DefaultFileSystem.getFileSystem() 가 리턴하는 UnixFileSystem이다.
UnixFileSystem의 resolve는 아래와 같이 되어있다.

private static String trimSeparator(String var0) {
   int var1 = var0.length();
   return var1 > 1 && var0.charAt(var1 - 1) == '/' ? var0.substring(0, var1 - 1) : var0;
}

public String resolve(String var1, String var2) {
   if (var2.isEmpty()) {
      return var1;
   } else if (var2.charAt(0) == '/') {
      return var1.equals("/") ? var2 : trimSeparator(var1 + var2);
   } else {
      return var1.equals("/") ? trimSeparator(var1 + var2) : trimSeparator(var1 + '/' + var2);
   }
}

즉 실제 파일 경로를 생성한다.
결국

public fun Context.dataStoreFile(fileName: String): File =
    File(applicationContext.filesDir, "datastore/$fileName") 

applicationContext.filesDir, "datastore/$fileName"로 만든 파일 경로를 갖는 File 인스턴스를 반환한다.

조금 많이 돌아왔는데 결국은 이렇게 File 인스턴스를 만드는 함수가 바로 produceFile이고 이렇게 얻은 File 인스턴스로 절대경로 등의 값으로

override fun createConnection(): StorageConnection<T> {
   val file = produceFile().canonicalFile

   synchronized(activeFilesLock) {
      val path = file.absolutePath
      check(!activeFiles.contains(path)) {
         "There are multiple DataStores active for the same file: $path. You should " +
                 "either maintain your DataStore as a singleton or confirm that there is " +
                 "no two DataStore's active on the same file (by confirming that the scope" +
                 " is cancelled)."
      }
      activeFiles.add(path)
   }

   return FileStorageConnection(file, serializer, coordinatorProducer(file)) {
      synchronized(activeFilesLock) {
         activeFiles.remove(file.absolutePath)
      }
   }
}

FileStorageConnection를 반환하는 것이다.
그리고 이 FileStorageConnection

override suspend fun <R> readScope(
        block: suspend ReadScope<T>.(locked: Boolean) -> R
    ): R {
        checkNotClosed()

        val lock = transactionMutex.tryLock()
        try {
            return FileReadScope(file, serializer).use {
                block(it, lock)
            }
        } finally {
            if (lock) {
                transactionMutex.unlock()
            }
        }
    }
    ...
}

와 같고 이 readScope는 다음과 같다.

internal open class FileReadScope<T>(
   protected val file: File,
   protected val serializer: Serializer<T>
) : ReadScope<T> {

    private val closed = AtomicBoolean(false)

    override suspend fun readData(): T {
        checkNotClosed()
        return try {
            FileInputStream(file).use { stream ->
                serializer.readFrom(stream)
            }
        } catch (ex: FileNotFoundException) {
            if (file.exists()) {
                // Re-read to prevent throwing from a race condition where the file is created by
                // another process after the initial read attempt but before `file.exists()` is
                // called. Otherwise file exists but we can't read it; throw FileNotFoundException
                // because something is wrong.
                return FileInputStream(file).use { stream ->
                    serializer.readFrom(stream)
                }
            } else {
                serializer.defaultValue
            }
        }
    }
    ...
}

FileInputStream으로 파일을 읽어오는 것이다.
그럼 이제 다시 DataStoreImpl로 돌아가서 이렇게 파일에서 읽어온 데이터로

private suspend fun readDataOrHandleCorruption(hasWriteFileLock: Boolean): Data<T> {
   try {
       return if (hasWriteFileLock) {
           val data = readDataFromFileOrDefault()
           Data(data, data.hashCode(), version = coordinator.getVersion())
       }
       ...
   }
      ...
}

Data 인스턴스를 만들고 InitDataStoredoRun()에서

var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value

의 value인 Preferences로

val initData = if ((initTasks == null) || initTasks!!.isEmpty()) {
   readDataOrHandleCorruption(hasWriteFileLock = false)
} else {
   coordinator.lock {
      val updateLock = Mutex()
      var initializationComplete = false
      var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value

      val api = object : InitializerApi<T> {
         override suspend fun updateData(transform: suspend (t: T) -> T): T {
            return updateLock.withLock {
               check(!initializationComplete) {
                  "InitializerApi.updateData should not be called after " +
                          "initialization is complete."
               }

               val newData = transform(currentData)
               if (newData != currentData) {
                  writeData(newData, updateCache = false)
                  currentData = newData
               }

               currentData
            }
         }
      }

      initTasks?.forEach { it(api) }
      initTasks = null
      updateLock.withLock {
         initializationComplete = true
      }
      Data(
         value = currentData,
         hashCode = currentData.hashCode(),
         version = coordinator.getVersion()
      )
   }
}
inMemoryCache.tryUpdate(initData)

메모리 캐시를 이 Data의 value인 Preferences로 만든 Data 인스턴스인 initDatatryUpdate를 통해 업데이트한다.

androidx.datastore.core/DataStoreInMemoryCache
val currentState: State<T>
	get() = cachedValue.value

val flow: Flow<State<T>>
	get() = cachedValue
fun tryUpdate(
   newState: State<T>
): State<T> {
   val updated = cachedValue.updateAndGet { cached ->
      when (cached) {
         is ReadException<T>, UnInitialized -> {
            newState
         }
         is Data<T> -> {
            if (newState.version > cached.version) {
               newState
            } else {
               cached
            }
         }

         is Final<T> -> {
            cached
         }
      }
   }
   return updated
}

즉 state가 Data로 업데이트되었으니 handleUpdate()에서 초기화 작업이 완료된 것이다.

이제 마지막으로 transformAndWrite()를 통해 작성을 해준다.

private suspend fun transformAndWrite(
   transform: suspend (t: T) -> T,
   callerContext: CoroutineContext
): T = coordinator.lock {
   val curData = readDataOrHandleCorruption(hasWriteFileLock = true)
   val newData = withContext(callerContext) { transform(curData.value) }

   curData.checkHashCode()

   if (curData.value != newData) {
      writeData(newData, updateCache = true)
   }
   newData
}

internal suspend fun writeData(newData: T, updateCache: Boolean): Int {
   var newVersion = 0

   storageConnection.writeScope {
      newVersion = coordinator.incrementAndGetVersion()
      writeData(newData)
      if (updateCache) {
         inMemoryCache.tryUpdate(Data(newData, newData.hashCode(), newVersion))
      }
   }

   return newVersion
}

최종적으로 작성되는 곳은 storageConnection.writeScope { ... writeData(newData) ... }로 이것은 아래와 같다.

internal class FileWriteScope<T>(file: File, serializer: Serializer<T>) :
   FileReadScope<T>(file, serializer), WriteScope<T> {

   override suspend fun writeData(value: T) {
      checkNotClosed()
      val fos = FileOutputStream(file)
      fos.use { stream ->
         serializer.writeTo(value, UncloseableOutputStream(stream))
         stream.fd.sync()
      }
   }
}

DataStore에서 사용할 때 키-값의 형식으로 사용하는데 이 Preferences와 디스크에 저장될 .preferences_pb 파일(바이너리 데이터) 사이를 서로 번역/변환하는 것을 serializer가 수행한다.

PreferencesFileSerializer

internal object PreferencesFileSerializer : Serializer<Preferences> {
   internal const val fileExtension = "preferences_pb"

   override val defaultValue: Preferences
      get() {
         return emptyPreferences()
      }

   @Throws(IOException::class, CorruptionException::class)
   override suspend fun readFrom(input: InputStream): Preferences {
      val preferencesProto = PreferencesMapCompat.readFrom(input)

      val mutablePreferences = mutablePreferencesOf()

      preferencesProto.preferencesMap.forEach { (name, value) ->
         addProtoEntryToPreferences(name, value, mutablePreferences)
      }

      return mutablePreferences.toPreferences()
   }

   @Suppress("InvalidNullabilityOverride") // Remove after b/232460179 is fixed
   @Throws(IOException::class, CorruptionException::class)
   override suspend fun writeTo(t: Preferences, output: OutputStream) {
      val preferences = t.asMap()
      val protoBuilder = PreferenceMap.newBuilder()

      for ((key, value) in preferences) {
         protoBuilder.putPreferences(key.name, getValueProto(value))
      }

      protoBuilder.build().writeTo(output)
   }

   private fun getValueProto(value: Any): Value {
      return when (value) {
         is Boolean -> Value.newBuilder().setBoolean(value).build()
         is Float -> Value.newBuilder().setFloat(value).build()
         is Double -> Value.newBuilder().setDouble(value).build()
         is Int -> Value.newBuilder().setInteger(value).build()
         is Long -> Value.newBuilder().setLong(value).build()
         is String -> Value.newBuilder().setString(value).build()
         is Set<*> ->
            @Suppress("UNCHECKED_CAST")
            Value.newBuilder()
               .setStringSet(StringSet.newBuilder().addAllStrings(value as Set<String>))
               .build()
         is ByteArray -> Value.newBuilder().setBytes(ByteString.copyFrom(value)).build()
         else ->
            throw IllegalStateException(
               "PreferencesSerializer does not support type: ${value.javaClass.name}"
            )
      }
   }

   private fun addProtoEntryToPreferences(
      name: String,
      value: Value,
      mutablePreferences: MutablePreferences
   ) {
      return when (value.valueCase) {
         Value.ValueCase.BOOLEAN ->
            mutablePreferences[booleanPreferencesKey(name)] = value.boolean
         Value.ValueCase.FLOAT -> mutablePreferences[floatPreferencesKey(name)] = value.float
         Value.ValueCase.DOUBLE -> mutablePreferences[doublePreferencesKey(name)] = value.double
         Value.ValueCase.INTEGER -> mutablePreferences[intPreferencesKey(name)] = value.integer
         Value.ValueCase.LONG -> mutablePreferences[longPreferencesKey(name)] = value.long
         Value.ValueCase.STRING -> mutablePreferences[stringPreferencesKey(name)] = value.string
         Value.ValueCase.STRING_SET ->
            mutablePreferences[stringSetPreferencesKey(name)] =
               value.stringSet.stringsList.toSet()
         Value.ValueCase.BYTES ->
            mutablePreferences[byteArrayPreferencesKey(name)] = value.bytes.toByteArray()
         Value.ValueCase.VALUE_NOT_SET -> throw CorruptionException("Value not set.")
         null -> throw CorruptionException("Value case is null.")
      }
   }
}

각각을 간단하게 정리하면

  1. internal const val fileExtension = "preferences_pb"
  • SharedPreferences의 XML과 달리 DataStore는 Protocol Buffers (Protobuf)라는 효율적인 바이너리 포맷 사용
  • Protocol Buffers : 데이터의 구조를 미리 약속하고, 이 구조에 맞춰 데이터를 효율적인 바이너리 형태로 변환하는 기술
  1. readFrom()
  • InputStream (.preferences_pb 파일)에서 바이너리 데이터를 읽어 Protobuf 객체(Preferences)로 파싱하는 함수
  1. writeTo()
  • Preferences를 Map으로 변환한 후 Kotlin 타입을 Protobuf의 Value 객체로 Wrapping후 바이너리 데이터로 직렬화하여 OutputStream에 작성

그렇다면 이 writeTo를 했으니 실제로 파일에 작성되었을까?
그렇지 않다!

아직은 물리적 디스크에 저장되지 않고 메모리에 있는 버퍼(시스템 캐시)에 복사된다.
만약 OS가 이 버퍼를 디스크에 쓰기 전에 앱이 강제 종료되거나 전원이 꺼지면 데이터는 유실된다!

그것을 방지하는 것이 바로 마지막에 있는 stream.fd.sync()이다.

FileDescriptor

저 fd는 FileDescriptor이고 이 FileDescriptor는 아래와 같이 되어있다.

public final class FileDescriptor {
	...
    public native void sync() throws SyncFailedException;
}

sync()는 native 키워드가 붙어있는 함수로 실제 OS의 함수를 호출한다.

sync()는 지금 바로 메모리 버퍼에 있는 모든 데이터를 물리적 디스크에 동기화하고 OS에 명령하는 함수이다.
이 함수의 핵심은 실제 물리적 매체에 기록될 때까지 반환되지 않아 이 함수가 리턴되면, 데이터가 디스크에 안전하게 저장되었음을 100% 보장한다는 것이다.

즉, DataStore는 단순 작성이 아닌 실제 물리적 디스크에 저장하는 것까지 보장한다.

결론

지금까지 DataStore가 어떻게 파일을 읽고 쓰는지를 알아보았다.

사실 대부분의 DataStore의 코드는 race condition을 방지하고 무결성과 원자성을 보장하는데 집중되어 있고 실제 입출력은 흔히들 아는 FileOutputStreamFileInputStream로 수행한다.

아무생각없이 사용만했던 DataStore에 대해 조금은 이해한 것 같다.

profile
안드로이드 외길

0개의 댓글