From 8883e739714637c6f5557c2650d0008b6da8a297 Mon Sep 17 00:00:00 2001 From: Koitharu Date: Sun, 7 May 2023 10:38:50 +0300 Subject: [PATCH] Migrate to Okio somewhere --- .../download/ui/worker/DownloadWorker.kt | 10 +++-- .../koitharu/kotatsu/local/data/PagesCache.kt | 12 +++--- .../data/importer/SingleMangaImporter.kt | 29 +++++++------- .../koitharu/kotatsu/local/ui/ImportWorker.kt | 2 +- .../kotatsu/reader/domain/PageLoader.kt | 7 ++-- .../kotatsu/reader/ui/PageSaveHelper.kt | 11 ++++-- .../kotatsu/utils/CancellableSource.kt | 18 +++++++++ .../java/org/koitharu/kotatsu/utils/ext/IO.kt | 38 ++++++------------- 8 files changed, 72 insertions(+), 55 deletions(-) create mode 100644 app/src/main/java/org/koitharu/kotatsu/utils/CancellableSource.kt diff --git a/app/src/main/java/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt b/app/src/main/java/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt index c33e7400d..2923e524e 100644 --- a/app/src/main/java/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt +++ b/app/src/main/java/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt @@ -32,6 +32,8 @@ import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.internal.closeQuietly import okio.IOException +import okio.buffer +import okio.sink import org.koitharu.kotatsu.R import org.koitharu.kotatsu.base.domain.MangaDataRepository import org.koitharu.kotatsu.core.network.CommonHeaders @@ -50,12 +52,12 @@ import org.koitharu.kotatsu.parsers.model.MangaSource import org.koitharu.kotatsu.parsers.util.await import org.koitharu.kotatsu.parsers.util.mapToSet import org.koitharu.kotatsu.utils.WorkManagerHelper -import org.koitharu.kotatsu.utils.ext.copyToSuspending import org.koitharu.kotatsu.utils.ext.deleteAwait import org.koitharu.kotatsu.utils.ext.getDisplayMessage import org.koitharu.kotatsu.utils.ext.ifNullOrEmpty import org.koitharu.kotatsu.utils.ext.printStackTraceDebug import org.koitharu.kotatsu.utils.ext.runCatchingCancellable +import org.koitharu.kotatsu.utils.ext.writeAllCancellable import org.koitharu.kotatsu.utils.progress.TimeLeftEstimator import java.io.File import java.util.UUID @@ -261,8 +263,10 @@ class DownloadWorker @AssistedInject constructor( val call = okHttp.newCall(request) val file = File(destination, tempFileName) val response = call.clone().await() - file.outputStream().use { out -> - checkNotNull(response.body).byteStream().copyToSuspending(out) + checkNotNull(response.body).use { body -> + file.sink(append = false).buffer().use { + it.writeAllCancellable(body.source()) + } } return file } diff --git a/app/src/main/java/org/koitharu/kotatsu/local/data/PagesCache.kt b/app/src/main/java/org/koitharu/kotatsu/local/data/PagesCache.kt index 1de0e9516..255bfdda6 100644 --- a/app/src/main/java/org/koitharu/kotatsu/local/data/PagesCache.kt +++ b/app/src/main/java/org/koitharu/kotatsu/local/data/PagesCache.kt @@ -6,17 +6,19 @@ import dagger.hilt.android.qualifiers.ApplicationContext import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runInterruptible import kotlinx.coroutines.withContext +import okio.Source +import okio.buffer +import okio.sink import org.koitharu.kotatsu.parsers.util.SuspendLazy import org.koitharu.kotatsu.utils.FileSize -import org.koitharu.kotatsu.utils.ext.copyToSuspending import org.koitharu.kotatsu.utils.ext.longHashCode import org.koitharu.kotatsu.utils.ext.printStackTraceDebug import org.koitharu.kotatsu.utils.ext.runCatchingCancellable import org.koitharu.kotatsu.utils.ext.subdir import org.koitharu.kotatsu.utils.ext.takeIfReadable import org.koitharu.kotatsu.utils.ext.takeIfWriteable +import org.koitharu.kotatsu.utils.ext.writeAllCancellable import java.io.File -import java.io.InputStream import javax.inject.Inject import javax.inject.Singleton @@ -49,11 +51,11 @@ class PagesCache @Inject constructor(@ApplicationContext context: Context) { } } - suspend fun put(url: String, inputStream: InputStream): File = withContext(Dispatchers.IO) { + suspend fun put(url: String, source: Source): File = withContext(Dispatchers.IO) { val file = File(cacheDir.get().parentFile, url.longHashCode().toString()) try { - file.outputStream().use { out -> - inputStream.copyToSuspending(out) + file.sink(append = false).buffer().use { + it.writeAllCancellable(source) } lruCache.get().put(url, file) } finally { diff --git a/app/src/main/java/org/koitharu/kotatsu/local/data/importer/SingleMangaImporter.kt b/app/src/main/java/org/koitharu/kotatsu/local/data/importer/SingleMangaImporter.kt index 7b170eb19..70307defa 100644 --- a/app/src/main/java/org/koitharu/kotatsu/local/data/importer/SingleMangaImporter.kt +++ b/app/src/main/java/org/koitharu/kotatsu/local/data/importer/SingleMangaImporter.kt @@ -7,16 +7,19 @@ import dagger.Reusable import dagger.hilt.android.qualifiers.ApplicationContext import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.runInterruptible +import kotlinx.coroutines.withContext +import okio.buffer +import okio.sink +import okio.source import org.koitharu.kotatsu.core.exceptions.UnsupportedFileException import org.koitharu.kotatsu.local.data.CbzFilter import org.koitharu.kotatsu.local.data.LocalManga import org.koitharu.kotatsu.local.data.LocalStorageChanges import org.koitharu.kotatsu.local.data.LocalStorageManager import org.koitharu.kotatsu.local.data.input.LocalMangaInput -import org.koitharu.kotatsu.utils.ext.copyToSuspending import org.koitharu.kotatsu.utils.ext.resolveName +import org.koitharu.kotatsu.utils.ext.writeAllCancellable import java.io.File import java.io.IOException import javax.inject.Inject @@ -30,17 +33,17 @@ class SingleMangaImporter @Inject constructor( private val contentResolver = context.contentResolver - suspend fun import(uri: Uri, progressState: MutableStateFlow?): LocalManga { + suspend fun import(uri: Uri): LocalManga { val result = if (isDirectory(uri)) { - importDirectory(uri, progressState) + importDirectory(uri) } else { - importFile(uri, progressState) + importFile(uri) } localStorageChanges.emit(result) return result } - private suspend fun importFile(uri: Uri, progressState: MutableStateFlow?): LocalManga { + private suspend fun importFile(uri: Uri): LocalManga = withContext(Dispatchers.IO) { val contentResolver = storageManager.contentResolver val name = contentResolver.resolveName(uri) ?: throw IOException("Cannot fetch name from uri: $uri") if (!CbzFilter.isFileSupported(name)) { @@ -50,14 +53,14 @@ class SingleMangaImporter @Inject constructor( runInterruptible { contentResolver.openInputStream(uri) }?.use { source -> - dest.outputStream().use { output -> - source.copyToSuspending(output, progressState = progressState) + dest.sink().buffer().use { output -> + output.writeAllCancellable(source.source()) } } ?: throw IOException("Cannot open input stream: $uri") - return LocalMangaInput.of(dest).getManga() + LocalMangaInput.of(dest).getManga() } - private suspend fun importDirectory(uri: Uri, progressState: MutableStateFlow?): LocalManga { + private suspend fun importDirectory(uri: Uri): LocalManga { val root = requireNotNull(DocumentFile.fromTreeUri(context, uri)) { "Provided uri $uri is not a tree" } @@ -80,9 +83,9 @@ class SingleMangaImporter @Inject constructor( docFile.copyTo(subDir) } } else { - inputStream().use { input -> - File(destDir, requireName()).outputStream().use { output -> - input.copyToSuspending(output) + inputStream().source().use { input -> + File(destDir, requireName()).sink().buffer().use { output -> + output.writeAllCancellable(input) } } } diff --git a/app/src/main/java/org/koitharu/kotatsu/local/ui/ImportWorker.kt b/app/src/main/java/org/koitharu/kotatsu/local/ui/ImportWorker.kt index 1d008d6b0..eee8038df 100644 --- a/app/src/main/java/org/koitharu/kotatsu/local/ui/ImportWorker.kt +++ b/app/src/main/java/org/koitharu/kotatsu/local/ui/ImportWorker.kt @@ -48,7 +48,7 @@ class ImportWorker @AssistedInject constructor( val uri = inputData.getString(DATA_URI)?.toUriOrNull() ?: return Result.failure() setForeground(getForegroundInfo()) val result = runCatchingCancellable { - importer.import(uri, null).manga + importer.import(uri).manga } val notification = buildNotification(result) notificationManager.notify(uri.hashCode(), notification) diff --git a/app/src/main/java/org/koitharu/kotatsu/reader/domain/PageLoader.kt b/app/src/main/java/org/koitharu/kotatsu/reader/domain/PageLoader.kt index c1403fd0d..35546a83a 100644 --- a/app/src/main/java/org/koitharu/kotatsu/reader/domain/PageLoader.kt +++ b/app/src/main/java/org/koitharu/kotatsu/reader/domain/PageLoader.kt @@ -20,6 +20,7 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import okhttp3.OkHttpClient import okhttp3.Request +import okio.source import org.koitharu.kotatsu.core.network.CommonHeaders import org.koitharu.kotatsu.core.parser.MangaRepository import org.koitharu.kotatsu.core.parser.RemoteMangaRepository @@ -186,7 +187,7 @@ class PageLoader @Inject constructor( val entry = zip.getEntry(uri.fragment) zip.getInputStream(entry) }.use { - cache.put(pageUrl, it) + cache.put(pageUrl, it.source()) } } } else { @@ -204,8 +205,8 @@ class PageLoader @Inject constructor( val body = checkNotNull(response.body) { "Null response" } - body.withProgress(progress).byteStream().use { - cache.put(pageUrl, it) + body.withProgress(progress).use { + cache.put(pageUrl, it.source()) } } } diff --git a/app/src/main/java/org/koitharu/kotatsu/reader/ui/PageSaveHelper.kt b/app/src/main/java/org/koitharu/kotatsu/reader/ui/PageSaveHelper.kt index 4fdeeab85..9f89886e9 100644 --- a/app/src/main/java/org/koitharu/kotatsu/reader/ui/PageSaveHelper.kt +++ b/app/src/main/java/org/koitharu/kotatsu/reader/ui/PageSaveHelper.kt @@ -12,11 +12,14 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import okhttp3.HttpUrl.Companion.toHttpUrl import okio.IOException +import okio.buffer +import okio.sink +import okio.source import org.koitharu.kotatsu.base.domain.MangaDataRepository import org.koitharu.kotatsu.parsers.model.MangaPage import org.koitharu.kotatsu.parsers.util.toFileNameSafe import org.koitharu.kotatsu.reader.domain.PageLoader -import org.koitharu.kotatsu.utils.ext.copyToSuspending +import org.koitharu.kotatsu.utils.ext.writeAllCancellable import java.io.File import javax.inject.Inject import kotlin.coroutines.Continuation @@ -49,10 +52,10 @@ class PageSaveHelper @Inject constructor( } } runInterruptible(Dispatchers.IO) { - contentResolver.openOutputStream(destination) + contentResolver.openOutputStream(destination)?.sink()?.buffer() }?.use { output -> - pageFile.inputStream().use { input -> - input.copyToSuspending(output) + pageFile.source().use { input -> + output.writeAllCancellable(input) } } ?: throw IOException("Output stream is null") return destination diff --git a/app/src/main/java/org/koitharu/kotatsu/utils/CancellableSource.kt b/app/src/main/java/org/koitharu/kotatsu/utils/CancellableSource.kt new file mode 100644 index 000000000..9830d86b6 --- /dev/null +++ b/app/src/main/java/org/koitharu/kotatsu/utils/CancellableSource.kt @@ -0,0 +1,18 @@ +package org.koitharu.kotatsu.utils + +import kotlinx.coroutines.Job +import kotlinx.coroutines.ensureActive +import okio.Buffer +import okio.ForwardingSource +import okio.Source + +class CancellableSource( + private val job: Job?, + delegate: Source, +) : ForwardingSource(delegate) { + + override fun read(sink: Buffer, byteCount: Long): Long { + job?.ensureActive() + return super.read(sink, byteCount) + } +} diff --git a/app/src/main/java/org/koitharu/kotatsu/utils/ext/IO.kt b/app/src/main/java/org/koitharu/kotatsu/utils/ext/IO.kt index 1b05eddc3..82a3780ce 100644 --- a/app/src/main/java/org/koitharu/kotatsu/utils/ext/IO.kt +++ b/app/src/main/java/org/koitharu/kotatsu/utils/ext/IO.kt @@ -3,37 +3,23 @@ package org.koitharu.kotatsu.utils.ext import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.withContext import okhttp3.ResponseBody +import okio.BufferedSink +import okio.Source +import org.koitharu.kotatsu.utils.CancellableSource import org.koitharu.kotatsu.utils.progress.ProgressResponseBody -import java.io.InputStream -import java.io.OutputStream - -suspend fun InputStream.copyToSuspending( - out: OutputStream, - bufferSize: Int = DEFAULT_BUFFER_SIZE, - progressState: MutableStateFlow? = null, -): Long = withContext(Dispatchers.IO) { - val job = currentCoroutineContext()[Job] - val total = available() - var bytesCopied: Long = 0 - val buffer = ByteArray(bufferSize) - var bytes = read(buffer) - while (bytes >= 0) { - out.write(buffer, 0, bytes) - bytesCopied += bytes - job?.ensureActive() - bytes = read(buffer) - job?.ensureActive() - if (progressState != null && total > 0) { - progressState.value = bytesCopied / total.toFloat() - } - } - bytesCopied -} fun ResponseBody.withProgress(progressState: MutableStateFlow): ResponseBody { return ProgressResponseBody(this, progressState) } + +suspend fun Source.cancellable(): Source { + val job = currentCoroutineContext()[Job] + return CancellableSource(job, this) +} + +suspend fun BufferedSink.writeAllCancellable(source: Source) = withContext(Dispatchers.IO) { + writeAll(source.cancellable()) +}