Manifest Android Interview를 읽으며 안드로이드 파일 시스템에 대해 더욱 알아보기 위한 글이다.
구성 요소나 개념에 대한 내용은 책에 이미 잘 나와있으니 내가 알아볼 것은 코드에서 시작해서 실제 파일 시스템에 어떻게 접근하고 처리하는지를 알아볼 것이다.
대표적인 저장 방식인 DataStore를 다루고자 한다.
dataStore.edit { prefs ->
prefs[ACCESS_TOKEN] = cryptoManager.encrypt(accessToken)
}
흔히 위와 같은 형태로 DataStore를 활용한다.
이 edit은 다음과 같이 작성되어 있다.
public suspend fun DataStore<Preferences>.edit(
transform: suspend (MutablePreferences) -> Unit
): Preferences {
return this.updateData {
// It's safe to return MutablePreferences since we freeze it in
// PreferencesDataStore.updateData()
it.toMutablePreferences().apply { transform(this) }
}
}
그렇다면 이 updateData를 살펴보자
public suspend fun updateData(transform: suspend (t: T) -> T): T
Ctrl+B로 추적하면 실제 구현체까지는 나오지 않는다.
실제 구현체는 External Libraries에 있는 AAR 목록들에서 찾아볼 수 있는 androidx.datastore.core.DataStoreImpl에 있다.
override suspend fun updateData(transform: suspend (t: T) -> T): T {
val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
parentContextElement?.checkNotUpdating(this)
val childContextElement = UpdatingDataContextElement(
parent = parentContextElement,
instance = this
)
return withContext(childContextElement) {
val ack = CompletableDeferred<T>()
val currentDownStreamFlowState = inMemoryCache.currentState
val updateMsg =
Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
writeActor.offer(updateMsg)
ack.await()
}
}
각 코드를 하나씩 살펴보자
val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
parentContextElement?.checkNotUpdating(this)
val childContextElement = UpdatingDataContextElement(
parent = parentContextElement,
instance = this
)
먼저 val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
여기서 UpdatingDataContextElement.Companion.Key은 아래와 같다.
internal class UpdatingDataContextElement(
private val parent: UpdatingDataContextElement?,
private val instance: DataStoreImpl<*>
) : CoroutineContext.Element {
companion object {
internal val NESTED_UPDATE_ERROR_MESSAGE = """
Calling updateData inside updateData on the same DataStore instance is not supported
since updates made in the parent updateData call will not be visible to the nested
updateData call. See https://issuetracker.google.com/issues/241760537 for details.
""".trimIndent()
internal object Key : CoroutineContext.Key<UpdatingDataContextElement>
}
즉 해당 DataStore가 호출된 코루틴에서 UpdatingDataContextElement를 받아온다.
parentContextElement?.checkNotUpdating(this)
UpdatingDataContextElement가 있는지 확인하고 있다면 지금 해당 코루틴에서 DataStore가 업데이트 중인지 checkNotUpdating를 통해 확인한다.
만약 동일 인스턴스에 대한 중첩 호출(재진입)이 감지되면, 이는 교착 상태(Deadlock)를 유발할 수 있으므로 즉시 예외를 발생시킨다.
val childContextElement = UpdatingDataContextElement(
parent = parentContextElement,
instance = this
)
이제 자식이 사용할 새로운 UpdatingDataContextElement를 만든다.
처음에는 이게 잘 이해가 안 갔는데
안드로이드 DataStore에서 예를 들어
// 바깥
dataStoreA.updateData {
// 안쪽
dataStoreA.updateData {
...
}
}
이런식으로 중첩되게 호출되면 데드락이 발생할 수 있지만
// 바깥
dataStoreA.updateData {
// 안쪽
dataStoreB.updateData {
...
}
}
와 같은 경우는 허용되어야 한다.
그렇기에 같은 DataStore에 대하여 parentContextElement?.checkNotUpdating(this)로 작성중인지를 확인하고 다른 DataStore에 대한 중첩호출은 허용되도록 val childContextElement = UpdatingDataContextElement( parent = parentContextElement, instance = this )로 부모-자식 관계를 형성해주는 것이다.
마지막으로
val ack = CompletableDeferred<T>()
val currentDownStreamFlowState = inMemoryCache.currentState
val updateMsg =
Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
writeActor.offer(updateMsg)
ack.await()
이것은 경쟁 상태를 방지하고 모든 업데이트를 순차적으로 처리하기 위한 로직이다.
val ack = CompletableDeferred<T>()val currentDownStreamFlowState = inMemoryCache.currentStateval updateMsg = Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)writeActor.offer(updateMsg)ack.await()writeActore는 다음과 같이 정의되어 있다.
private val writeActor = SimpleActor<Message.Update<T>>(
scope = scope,
onComplete = {
// We expect it to always be non-null but we will leave the alternative as a no-op
// just in case.
it?.let {
inMemoryCache.tryUpdate(Final(it))
}
// don't try to close storage connection if it was not created in the first place.
if (storageConnectionDelegate.isInitialized()) {
storageConnection.close()
}
},
onUndeliveredElement = { msg, ex ->
msg.ack.completeExceptionally(
ex ?: CancellationException(
"DataStore scope was cancelled before updateData could complete"
)
)
}
) { msg ->
handleUpdate(msg)
}
그중 실제로 메시지를 처리하는 handleUpdate를 살펴보자.
private suspend fun handleUpdate(update: Message.Update<T>) {
update.ack.completeWith(
runCatching {
val result: T
when (val currentState = inMemoryCache.currentState) {
is Data -> {
// We are already initialized, we just need to perform the update
result = transformAndWrite(update.transform, update.callerContext)
}
is ReadException, is UnInitialized -> {
if (currentState === update.lastState) {
// we need to try to read again
readAndInitOrPropagateAndThrowFailure()
// We've successfully read, now we need to perform the update
result = transformAndWrite(update.transform, update.callerContext)
} else {
// Someone else beat us to read but also failed. We just need to
// signal the writer that is waiting on ack.
// This cast is safe because we can't be in the UnInitialized
// state if the state has changed.
throw (currentState as ReadException).readException
}
}
is Final -> throw currentState.finalException // won't happen
}
result
}
)
}
그중 살펴볼 것은
is Data -> {
result = transformAndWrite(update.transform, update.callerContext)
}
is ReadException, is UnInitialized -> {
if (currentState === update.lastState) {
readAndInitOrPropagateAndThrowFailure()
result = transformAndWrite(update.transform, update.callerContext)
} else {
throw (currentState as ReadException).readException
}
}
각각 DataStore가 이미 초기화되었고, 메모리에 정상적인 데이터를 갖고 있다는 Data와 이 막 시작되어 디스크에서 데이터를 한 번도 읽지 않은 상태인 UnInitialized이다.
둘 모두 result = transformAndWrite(update.transform, update.callerContext)로 최종적으로 update하는 건 같다.
readAndInitOrPropagateAndThrowFailure()는 그럼 뭘 할까
private suspend fun readAndInitOrPropagateAndThrowFailure() {
val preReadVersion = coordinator.getVersion()
try {
readAndInit.runIfNeeded()
} catch (throwable: Throwable) {
inMemoryCache.tryUpdate(ReadException(throwable, preReadVersion))
throw throwable
}
}
internal abstract class RunOnce {
private val runMutex = Mutex()
private val didRun = CompletableDeferred<Unit>()
protected abstract suspend fun doRun()
suspend fun awaitComplete() = didRun.await()
suspend fun runIfNeeded() {
if (didRun.isCompleted) return
runMutex.withLock {
if (didRun.isCompleted) return
doRun()
didRun.complete(Unit)
}
}
}
private va readAndInit = InitDataStore(initTasksList)
private inner class InitDataStore(
initTasksList: List<suspend (api: InitializerApi<T>) -> Unit>
) : RunOnce() {
...
override suspend fun doRun() {
val initData = if ((initTasks == null) || initTasks!!.isEmpty()) {
// if there are no init tasks, we can directly read
readDataOrHandleCorruption(hasWriteFileLock = false)
} else {
// if there are init tasks, we need to obtain a lock to ensure migrations
// run as 1 chunk
coordinator.lock {
val updateLock = Mutex()
var initializationComplete = false
var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value
val api = object : InitializerApi<T> {
override suspend fun updateData(transform: suspend (t: T) -> T): T {
return updateLock.withLock {
check(!initializationComplete) {
"InitializerApi.updateData should not be called after " +
"initialization is complete."
}
val newData = transform(currentData)
if (newData != currentData) {
writeData(newData, updateCache = false)
currentData = newData
}
currentData
}
}
}
initTasks?.forEach { it(api) }
// Init tasks have run successfully, we don't need them anymore.
initTasks = null
updateLock.withLock {
initializationComplete = true
}
// only to make compiler happy
Data(
value = currentData,
hashCode = currentData.hashCode(),
version = coordinator.getVersion()
)
}
}
inMemoryCache.tryUpdate(initData)
}
}
RunOnce는 오직 한번만 실행되는 것을 보장하는 도구다 이 doRun이 실행되는데 coordinator.lock은 파일 락(File Lock) 등을 사용하여, 이 앱의 모든 프로세스(여러 앱 인스턴스 포함)를 통틀어 오직 하나의 코루틴만이 이 람다 블록에 진입할 수 있도록 보장한다.
이중 디스크에서 데이터를 읽어오고 만약 파일이 손상되었으면(corrupted) 복구(handle)하는 함수인 readDataOrHandleCorruption를 살펴보자
private suspend fun readDataOrHandleCorruption(hasWriteFileLock: Boolean): Data<T> {
try {
return if (hasWriteFileLock) {
val data = readDataFromFileOrDefault()
Data(data, data.hashCode(), version = coordinator.getVersion())
} else {
val preLockVersion = coordinator.getVersion()
coordinator.tryLock { locked ->
val data = readDataFromFileOrDefault()
val version = if (locked) coordinator.getVersion() else preLockVersion
Data(
data,
data.hashCode(),
version
)
}
}
} catch (ex: CorruptionException) {
var newData: T = corruptionHandler.handleCorruption(ex)
var version: Int // initialized inside the try block
try {
doWithWriteFileLock(hasWriteFileLock) {
// Confirms the file is still corrupted before overriding
try {
newData = readDataFromFileOrDefault()
version = coordinator.getVersion()
} catch (ignoredEx: CorruptionException) {
version = writeData(newData, updateCache = true)
}
}
} catch (writeEx: Throwable) {
// If we fail to write the handled data, add the new exception as a suppressed
// exception.
ex.addSuppressed(writeEx)
throw ex
}
// If we reach this point, we've successfully replaced the data on disk with newData.
return Data(newData, newData.hashCode(), version)
}
}
이중 readDataFromFileOrDefault()를 통해 드디어 storage에서 파일을 읽어오게 된다!
private suspend fun readDataFromFileOrDefault(): T {
return storageConnection.readData()
}
private val storageConnectionDelegate = lazy {
storage.createConnection()
}
internal val storageConnection by storageConnectionDelegate
먼저 createConnection()를 살펴보자.
interface Storage<T> {
fun createConnection(): StorageConnection<T>
}
interface StorageConnection<T> : Closeable {
suspend fun <R> readScope(
block: suspend ReadScope<T>.(locked: Boolean) -> R
): R
suspend fun writeScope(block: suspend WriteScope<T>.() -> Unit)
val coordinator: InterProcessCoordinator
}
interface ReadScope<T> : Closeable {
suspend fun readData(): T
}
interface WriteScope<T> : ReadScope<T> {
suspend fun writeData(value: T)
}
이 인터페이스의 유일한 목적은 createConnection() 메소드를 제공하는 것이다.
StorageConnection<T>는 createConnection()이 반환하는 객체로, 실제로 파일(디스크)에 read() 및 write() 작업을 수행하는 구체적인 로직을 담고 있고 실제 구현체는 다음과 같다.
class FileStorage<T>(
private val serializer: Serializer<T>,
private val coordinatorProducer: (File) -> InterProcessCoordinator = {
createSingleProcessCoordinator(it)
},
private val produceFile: () -> File
) : Storage<T> {
override fun createConnection(): StorageConnection<T> {
val file = produceFile().canonicalFile
synchronized(activeFilesLock) {
val path = file.absolutePath
check(!activeFiles.contains(path)) {
"There are multiple DataStores active for the same file: $path. You should " +
"either maintain your DataStore as a singleton or confirm that there is " +
"no two DataStore's active on the same file (by confirming that the scope" +
" is cancelled)."
}
activeFiles.add(path)
}
return FileStorageConnection(file, serializer, coordinatorProducer(file)) {
synchronized(activeFilesLock) {
activeFiles.remove(file.absolutePath)
}
}
}
internal companion object {
@GuardedBy("activeFilesLock")
internal val activeFiles = mutableSetOf<String>()
internal val activeFilesLock = Any()
}
}
internal class FileStorageConnection<T>(
private val file: File,
private val serializer: Serializer<T>,
override val coordinator: InterProcessCoordinator,
private val onClose: () -> Unit
) : StorageConnection<T> {
private val closed = AtomicBoolean(false)
private val transactionMutex = Mutex()
override suspend fun <R> readScope(
block: suspend ReadScope<T>.(locked: Boolean) -> R
): R {
checkNotClosed()
val lock = transactionMutex.tryLock()
try {
return FileReadScope(file, serializer).use {
block(it, lock)
}
} finally {
if (lock) {
transactionMutex.unlock()
}
}
}
override suspend fun writeScope(block: suspend WriteScope<T>.() -> Unit) {
checkNotClosed()
file.createParentDirectories()
transactionMutex.withLock {
val scratchFile = File(file.absolutePath + ".tmp")
try {
FileWriteScope(scratchFile, serializer).use {
block(it)
}
if (scratchFile.exists() && !scratchFile.atomicMoveTo(file)) {
throw IOException(
"Unable to rename $scratchFile to $file. " +
"This likely means that there are multiple instances of DataStore " +
"for this file. Ensure that you are only creating a single instance of " +
"datastore for this file."
)
}
} catch (ex: IOException) {
if (scratchFile.exists()) {
scratchFile.delete() // Swallow failure to delete
}
throw ex
}
}
}
public override fun close() {
closed.set(true)
onClose()
}
private fun checkNotClosed() {
check(!closed.get()) { "StorageConnection has already been disposed." }
}
private fun File.createParentDirectories() {
val parent: File? = canonicalFile.parentFile
parent?.let {
it.mkdirs()
if (!it.isDirectory) {
throw IOException("Unable to create parent directories of $this")
}
}
}
}
일단 StorageConnection을 생성하는 createConnection()부터 살펴보면 다음과 같이 되어있다.
override fun createConnection(): StorageConnection<T> {
val file = produceFile().canonicalFile
synchronized(activeFilesLock) {
val path = file.absolutePath
check(!activeFiles.contains(path)) {
"There are multiple DataStores active for the same file: $path. You should " +
"either maintain your DataStore as a singleton or confirm that there is " +
"no two DataStore's active on the same file (by confirming that the scope" +
" is cancelled)."
}
activeFiles.add(path)
}
return FileStorageConnection(file, serializer, coordinatorProducer(file)) {
synchronized(activeFilesLock) {
activeFiles.remove(file.absolutePath)
}
}
}
여길 보면 produceFile().canonicalFile로 파일을 불러오는데 그렇다면 이 produceFile()이 뭔지를 알아야 한다. produceFile은 FileStorage의 파라미터로 () -> File이다. 이건
public actual object DataStoreFactory {
@JvmOverloads // Generate constructors for default params for java users.
public fun <T> create(
serializer: Serializer<T>,
corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
migrations: List<DataMigration<T>> = listOf(),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
produceFile: () -> File
): DataStore<T> = create(
storage = FileStorage(serializer = serializer, produceFile = produceFile),
corruptionHandler = corruptionHandler,
migrations = migrations,
scope = scope
)
...
}
이 팩토리에서 DataStore가 생성될 때 함께 생성되는데 여기서도 produceFile이 뭔지 알수 없다. 그렇다면 이 팩토리 함수가 호출되는 곳을 찾아가보자.
private val Context.tokenDataStore by preferencesDataStore(name = TOKEN_DATASTORE_NAME)
DataStore 생성을 위임할 때 사용되는 이 preferencesDataStore가 실제 DataStore를 생성하니 이것을 따라가자.
@Suppress("MissingJvmstatic")
public fun preferencesDataStore(
name: String,
corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): ReadOnlyProperty<Context, DataStore<Preferences>> {
return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
}
internal class PreferenceDataStoreSingletonDelegate internal constructor(
private val name: String,
private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {
private val lock = Any()
@GuardedBy("lock")
@Volatile
private var INSTANCE: DataStore<Preferences>? = null
/**
* Gets the instance of the DataStore.
*
* @param thisRef must be an instance of [Context]
* @param property not used
*/
override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
return INSTANCE ?: synchronized(lock) {
if (INSTANCE == null) {
val applicationContext = thisRef.applicationContext
INSTANCE = PreferenceDataStoreFactory.create(
corruptionHandler = corruptionHandler,
migrations = produceMigrations(applicationContext),
scope = scope
) {
applicationContext.preferencesDataStoreFile(name)
}
}
INSTANCE!!
}
}
}
PreferenceDataStoreFactory가 실제 인스턴스를 생성하는 곳을 찾았다!
저 람다가 바로 produceFile이다.
public fun Context.preferencesDataStoreFile(name: String): File =
this.dataStoreFile("$name.preferences_pb")
public fun Context.dataStoreFile(fileName: String): File =
File(applicationContext.filesDir, "datastore/$fileName")
다음과 같이 파일을 생성하는 것을 확인할 수 있다.
public File(File var1, String var2) {
if (var2 == null) {
throw new NullPointerException();
} else {
if (var1 != null) {
if (var1.path.isEmpty()) {
this.path = FS.resolve(FS.getDefaultParent(), FS.normalize(var2));
} else {
this.path = FS.resolve(var1.path, FS.normalize(var2));
}
} else {
this.path = FS.normalize(var2);
}
this.prefixLength = FS.prefixLength(this.path);
}
}
그렇다면 이 FS, 파일시스템은 어떻게 되어있을까
static {
if (IoOverNio.IS_ENABLED_IN_GENERAL) {
FS = new IoOverNioFileSystem(DefaultFileSystem.getFileSystem());
} else {
FS = DefaultFileSystem.getFileSystem();
}
...
}
일단 위와 같이 FS를 선언한다.
class IoOverNioFileSystem extends FileSystem {
private final FileSystem parent;
IoOverNioFileSystem(FileSystem var1) {
this.parent = var1;
}
...
}
final class DefaultFileSystem {
private DefaultFileSystem() {
}
public static FileSystem getFileSystem() {
return new UnixFileSystem();
}
}
여기서 UnixFileSystem과 IoOverNioFileSystem을 간단히 설명하자면
DefaultFileSystem이 처리한다.여기서 처음 알게된 건데 DefaultFileSystem는 위의 코드에서 확인할 수 있다시피 UnixFileSystem을 리턴한다. 결국 처리는 UnixFileSystem가 하고 IoOverNioFileSystem는 정말 어댑터의 역할만을 수행한다.
그렇다면 File 생성자의
this.path = FS.resolve(FS.getDefaultParent(), FS.normalize(var2));
에서 이 resolve는 아래와 같이 parent에 위임하고
class IoOverNioFileSystem extends FileSystem {
...
public String resolve(String var1, String var2) {
return this.parent.resolve(var1, var2);
}
...
}
이 parent는 바로 DefaultFileSystem.getFileSystem() 가 리턴하는 UnixFileSystem이다.
UnixFileSystem의 resolve는 아래와 같이 되어있다.
private static String trimSeparator(String var0) {
int var1 = var0.length();
return var1 > 1 && var0.charAt(var1 - 1) == '/' ? var0.substring(0, var1 - 1) : var0;
}
public String resolve(String var1, String var2) {
if (var2.isEmpty()) {
return var1;
} else if (var2.charAt(0) == '/') {
return var1.equals("/") ? var2 : trimSeparator(var1 + var2);
} else {
return var1.equals("/") ? trimSeparator(var1 + var2) : trimSeparator(var1 + '/' + var2);
}
}
즉 실제 파일 경로를 생성한다.
결국
public fun Context.dataStoreFile(fileName: String): File =
File(applicationContext.filesDir, "datastore/$fileName")
는 applicationContext.filesDir, "datastore/$fileName"로 만든 파일 경로를 갖는 File 인스턴스를 반환한다.
조금 많이 돌아왔는데 결국은 이렇게 File 인스턴스를 만드는 함수가 바로 produceFile이고 이렇게 얻은 File 인스턴스로 절대경로 등의 값으로
override fun createConnection(): StorageConnection<T> {
val file = produceFile().canonicalFile
synchronized(activeFilesLock) {
val path = file.absolutePath
check(!activeFiles.contains(path)) {
"There are multiple DataStores active for the same file: $path. You should " +
"either maintain your DataStore as a singleton or confirm that there is " +
"no two DataStore's active on the same file (by confirming that the scope" +
" is cancelled)."
}
activeFiles.add(path)
}
return FileStorageConnection(file, serializer, coordinatorProducer(file)) {
synchronized(activeFilesLock) {
activeFiles.remove(file.absolutePath)
}
}
}
FileStorageConnection를 반환하는 것이다.
그리고 이 FileStorageConnection이
override suspend fun <R> readScope(
block: suspend ReadScope<T>.(locked: Boolean) -> R
): R {
checkNotClosed()
val lock = transactionMutex.tryLock()
try {
return FileReadScope(file, serializer).use {
block(it, lock)
}
} finally {
if (lock) {
transactionMutex.unlock()
}
}
}
...
}
와 같고 이 readScope는 다음과 같다.
internal open class FileReadScope<T>(
protected val file: File,
protected val serializer: Serializer<T>
) : ReadScope<T> {
private val closed = AtomicBoolean(false)
override suspend fun readData(): T {
checkNotClosed()
return try {
FileInputStream(file).use { stream ->
serializer.readFrom(stream)
}
} catch (ex: FileNotFoundException) {
if (file.exists()) {
// Re-read to prevent throwing from a race condition where the file is created by
// another process after the initial read attempt but before `file.exists()` is
// called. Otherwise file exists but we can't read it; throw FileNotFoundException
// because something is wrong.
return FileInputStream(file).use { stream ->
serializer.readFrom(stream)
}
} else {
serializer.defaultValue
}
}
}
...
}
즉 FileInputStream으로 파일을 읽어오는 것이다.
그럼 이제 다시 DataStoreImpl로 돌아가서 이렇게 파일에서 읽어온 데이터로
private suspend fun readDataOrHandleCorruption(hasWriteFileLock: Boolean): Data<T> {
try {
return if (hasWriteFileLock) {
val data = readDataFromFileOrDefault()
Data(data, data.hashCode(), version = coordinator.getVersion())
}
...
}
...
}
Data 인스턴스를 만들고 InitDataStore가 doRun()에서
var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value
의 value인 Preferences로
val initData = if ((initTasks == null) || initTasks!!.isEmpty()) {
readDataOrHandleCorruption(hasWriteFileLock = false)
} else {
coordinator.lock {
val updateLock = Mutex()
var initializationComplete = false
var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value
val api = object : InitializerApi<T> {
override suspend fun updateData(transform: suspend (t: T) -> T): T {
return updateLock.withLock {
check(!initializationComplete) {
"InitializerApi.updateData should not be called after " +
"initialization is complete."
}
val newData = transform(currentData)
if (newData != currentData) {
writeData(newData, updateCache = false)
currentData = newData
}
currentData
}
}
}
initTasks?.forEach { it(api) }
initTasks = null
updateLock.withLock {
initializationComplete = true
}
Data(
value = currentData,
hashCode = currentData.hashCode(),
version = coordinator.getVersion()
)
}
}
inMemoryCache.tryUpdate(initData)
메모리 캐시를 이 Data의 value인 Preferences로 만든 Data 인스턴스인 initData로 tryUpdate를 통해 업데이트한다.
androidx.datastore.core/DataStoreInMemoryCache
val currentState: State<T>
get() = cachedValue.value
val flow: Flow<State<T>>
get() = cachedValue
fun tryUpdate(
newState: State<T>
): State<T> {
val updated = cachedValue.updateAndGet { cached ->
when (cached) {
is ReadException<T>, UnInitialized -> {
newState
}
is Data<T> -> {
if (newState.version > cached.version) {
newState
} else {
cached
}
}
is Final<T> -> {
cached
}
}
}
return updated
}
즉 state가 Data로 업데이트되었으니 handleUpdate()에서 초기화 작업이 완료된 것이다.
이제 마지막으로 transformAndWrite()를 통해 작성을 해준다.
private suspend fun transformAndWrite(
transform: suspend (t: T) -> T,
callerContext: CoroutineContext
): T = coordinator.lock {
val curData = readDataOrHandleCorruption(hasWriteFileLock = true)
val newData = withContext(callerContext) { transform(curData.value) }
curData.checkHashCode()
if (curData.value != newData) {
writeData(newData, updateCache = true)
}
newData
}
internal suspend fun writeData(newData: T, updateCache: Boolean): Int {
var newVersion = 0
storageConnection.writeScope {
newVersion = coordinator.incrementAndGetVersion()
writeData(newData)
if (updateCache) {
inMemoryCache.tryUpdate(Data(newData, newData.hashCode(), newVersion))
}
}
return newVersion
}
최종적으로 작성되는 곳은 storageConnection.writeScope { ... writeData(newData) ... }로 이것은 아래와 같다.
internal class FileWriteScope<T>(file: File, serializer: Serializer<T>) :
FileReadScope<T>(file, serializer), WriteScope<T> {
override suspend fun writeData(value: T) {
checkNotClosed()
val fos = FileOutputStream(file)
fos.use { stream ->
serializer.writeTo(value, UncloseableOutputStream(stream))
stream.fd.sync()
}
}
}
DataStore에서 사용할 때 키-값의 형식으로 사용하는데 이 Preferences와 디스크에 저장될 .preferences_pb 파일(바이너리 데이터) 사이를 서로 번역/변환하는 것을 serializer가 수행한다.
internal object PreferencesFileSerializer : Serializer<Preferences> {
internal const val fileExtension = "preferences_pb"
override val defaultValue: Preferences
get() {
return emptyPreferences()
}
@Throws(IOException::class, CorruptionException::class)
override suspend fun readFrom(input: InputStream): Preferences {
val preferencesProto = PreferencesMapCompat.readFrom(input)
val mutablePreferences = mutablePreferencesOf()
preferencesProto.preferencesMap.forEach { (name, value) ->
addProtoEntryToPreferences(name, value, mutablePreferences)
}
return mutablePreferences.toPreferences()
}
@Suppress("InvalidNullabilityOverride") // Remove after b/232460179 is fixed
@Throws(IOException::class, CorruptionException::class)
override suspend fun writeTo(t: Preferences, output: OutputStream) {
val preferences = t.asMap()
val protoBuilder = PreferenceMap.newBuilder()
for ((key, value) in preferences) {
protoBuilder.putPreferences(key.name, getValueProto(value))
}
protoBuilder.build().writeTo(output)
}
private fun getValueProto(value: Any): Value {
return when (value) {
is Boolean -> Value.newBuilder().setBoolean(value).build()
is Float -> Value.newBuilder().setFloat(value).build()
is Double -> Value.newBuilder().setDouble(value).build()
is Int -> Value.newBuilder().setInteger(value).build()
is Long -> Value.newBuilder().setLong(value).build()
is String -> Value.newBuilder().setString(value).build()
is Set<*> ->
@Suppress("UNCHECKED_CAST")
Value.newBuilder()
.setStringSet(StringSet.newBuilder().addAllStrings(value as Set<String>))
.build()
is ByteArray -> Value.newBuilder().setBytes(ByteString.copyFrom(value)).build()
else ->
throw IllegalStateException(
"PreferencesSerializer does not support type: ${value.javaClass.name}"
)
}
}
private fun addProtoEntryToPreferences(
name: String,
value: Value,
mutablePreferences: MutablePreferences
) {
return when (value.valueCase) {
Value.ValueCase.BOOLEAN ->
mutablePreferences[booleanPreferencesKey(name)] = value.boolean
Value.ValueCase.FLOAT -> mutablePreferences[floatPreferencesKey(name)] = value.float
Value.ValueCase.DOUBLE -> mutablePreferences[doublePreferencesKey(name)] = value.double
Value.ValueCase.INTEGER -> mutablePreferences[intPreferencesKey(name)] = value.integer
Value.ValueCase.LONG -> mutablePreferences[longPreferencesKey(name)] = value.long
Value.ValueCase.STRING -> mutablePreferences[stringPreferencesKey(name)] = value.string
Value.ValueCase.STRING_SET ->
mutablePreferences[stringSetPreferencesKey(name)] =
value.stringSet.stringsList.toSet()
Value.ValueCase.BYTES ->
mutablePreferences[byteArrayPreferencesKey(name)] = value.bytes.toByteArray()
Value.ValueCase.VALUE_NOT_SET -> throw CorruptionException("Value not set.")
null -> throw CorruptionException("Value case is null.")
}
}
}
각각을 간단하게 정리하면
internal const val fileExtension = "preferences_pb"readFrom()writeTo()그렇다면 이 writeTo를 했으니 실제로 파일에 작성되었을까?
그렇지 않다!
아직은 물리적 디스크에 저장되지 않고 메모리에 있는 버퍼(시스템 캐시)에 복사된다.
만약 OS가 이 버퍼를 디스크에 쓰기 전에 앱이 강제 종료되거나 전원이 꺼지면 데이터는 유실된다!
그것을 방지하는 것이 바로 마지막에 있는 stream.fd.sync()이다.
저 fd는 FileDescriptor이고 이 FileDescriptor는 아래와 같이 되어있다.
public final class FileDescriptor {
...
public native void sync() throws SyncFailedException;
}
이 sync()는 native 키워드가 붙어있는 함수로 실제 OS의 함수를 호출한다.
sync()는 지금 바로 메모리 버퍼에 있는 모든 데이터를 물리적 디스크에 동기화하고 OS에 명령하는 함수이다.
이 함수의 핵심은 실제 물리적 매체에 기록될 때까지 반환되지 않아 이 함수가 리턴되면, 데이터가 디스크에 안전하게 저장되었음을 100% 보장한다는 것이다.
즉, DataStore는 단순 작성이 아닌 실제 물리적 디스크에 저장하는 것까지 보장한다.
지금까지 DataStore가 어떻게 파일을 읽고 쓰는지를 알아보았다.
사실 대부분의 DataStore의 코드는 race condition을 방지하고 무결성과 원자성을 보장하는데 집중되어 있고 실제 입출력은 흔히들 아는 FileOutputStream과 FileInputStream로 수행한다.
아무생각없이 사용만했던 DataStore에 대해 조금은 이해한 것 같다.