From a7a9ee9d59aa57cb3991c52c231bef0afb10847d Mon Sep 17 00:00:00 2001 From: Koitharu Date: Fri, 1 Dec 2023 14:07:35 +0200 Subject: [PATCH] Downloader improvements --- .../core/parser/RemoteMangaRepository.kt | 4 + .../kotatsu/core/prefs/AppSettings.kt | 4 - .../kotatsu/core/prefs/SourceSettings.kt | 4 + .../ui/worker/DownloadSlowdownDispatcher.kt | 28 +++++ .../download/ui/worker/DownloadWorker.kt | 100 ++++++++++++------ .../download/ui/worker/PausingHandle.kt | 21 +++- .../local/data/output/LocalMangaDirOutput.kt | 19 ++-- .../local/data/output/LocalMangaZipOutput.kt | 15 ++- app/src/main/res/values/strings.xml | 1 + app/src/main/res/xml/pref_downloads.xml | 15 +-- app/src/main/res/xml/pref_source.xml | 8 ++ 11 files changed, 158 insertions(+), 61 deletions(-) create mode 100644 app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadSlowdownDispatcher.kt diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/core/parser/RemoteMangaRepository.kt b/app/src/main/kotlin/org/koitharu/kotatsu/core/parser/RemoteMangaRepository.kt index 273631348..fffe8ef06 100644 --- a/app/src/main/kotlin/org/koitharu/kotatsu/core/parser/RemoteMangaRepository.kt +++ b/app/src/main/kotlin/org/koitharu/kotatsu/core/parser/RemoteMangaRepository.kt @@ -151,6 +151,10 @@ class RemoteMangaRepository( return parser.configKeyDomain.presetValues.toList() } + fun isSlowdownEnabled(): Boolean { + return getConfig().isSlowdownEnabled + } + private fun getConfig() = parser.config as SourceSettings private suspend fun asyncSafe(block: suspend CoroutineScope.() -> T): SafeDeferred { diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/AppSettings.kt b/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/AppSettings.kt index 29d52c439..1fd0c9e2b 100644 --- a/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/AppSettings.kt +++ b/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/AppSettings.kt @@ -259,9 +259,6 @@ class AppSettings @Inject constructor(@ApplicationContext context: Context) { } } - val isDownloadsSlowdownEnabled: Boolean - get() = prefs.getBoolean(KEY_DOWNLOADS_SLOWDOWN, false) - val isDownloadsWiFiOnly: Boolean get() = prefs.getBoolean(KEY_DOWNLOADS_WIFI, false) @@ -495,7 +492,6 @@ class AppSettings @Inject constructor(@ApplicationContext context: Context) { const val KEY_SHIKIMORI = "shikimori" const val KEY_ANILIST = "anilist" const val KEY_MAL = "mal" - const val KEY_DOWNLOADS_SLOWDOWN = "downloads_slowdown" const val KEY_DOWNLOADS_WIFI = "downloads_wifi" const val KEY_ALL_FAVOURITES_VISIBLE = "all_favourites_visible" const val KEY_DOH = "doh" diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/SourceSettings.kt b/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/SourceSettings.kt index 0080c0b1d..af19f5ae1 100644 --- a/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/SourceSettings.kt +++ b/app/src/main/kotlin/org/koitharu/kotatsu/core/prefs/SourceSettings.kt @@ -11,6 +11,7 @@ import org.koitharu.kotatsu.parsers.model.MangaSource import org.koitharu.kotatsu.parsers.model.SortOrder private const val KEY_SORT_ORDER = "sort_order" +private const val KEY_SLOWDOWN = "slowdown" class SourceSettings(context: Context, source: MangaSource) : MangaSourceConfig { @@ -20,6 +21,9 @@ class SourceSettings(context: Context, source: MangaSource) : MangaSourceConfig get() = prefs.getEnumValue(KEY_SORT_ORDER, SortOrder::class.java) set(value) = prefs.edit { putEnumValue(KEY_SORT_ORDER, value) } + val isSlowdownEnabled: Boolean + get() = prefs.getBoolean(KEY_SLOWDOWN, false) + @Suppress("UNCHECKED_CAST") override fun get(key: ConfigKey): T { return when (key) { diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadSlowdownDispatcher.kt b/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadSlowdownDispatcher.kt new file mode 100644 index 000000000..e59722045 --- /dev/null +++ b/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadSlowdownDispatcher.kt @@ -0,0 +1,28 @@ +package org.koitharu.kotatsu.download.ui.worker + +import kotlinx.coroutines.delay +import org.koitharu.kotatsu.core.parser.MangaRepository +import org.koitharu.kotatsu.core.parser.RemoteMangaRepository +import org.koitharu.kotatsu.parsers.model.MangaSource + +class DownloadSlowdownDispatcher( + private val mangaRepositoryFactory: MangaRepository.Factory, + private val defaultDelay: Long, +) { + private val timeMap = HashMap() + + suspend fun delay(source: MangaSource) { + val repo = mangaRepositoryFactory.create(source) as? RemoteMangaRepository ?: return + if (!repo.isSlowdownEnabled()) { + return + } + val lastRequest = synchronized(timeMap) { + val res = timeMap[source] ?: 0L + timeMap[source] = System.currentTimeMillis() + res + } + if (lastRequest != 0L) { + delay(lastRequest + defaultDelay - System.currentTimeMillis()) + } + } +} diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt b/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt index e236af835..fbdc8cfa3 100644 --- a/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt +++ b/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/DownloadWorker.kt @@ -28,6 +28,10 @@ import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.withContext import okhttp3.OkHttpClient import okhttp3.Request @@ -71,6 +75,7 @@ import org.koitharu.kotatsu.parsers.util.runCatchingCancellable import java.io.File import java.util.UUID import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger import javax.inject.Inject @HiltWorker @@ -81,7 +86,6 @@ class DownloadWorker @AssistedInject constructor( private val cache: PagesCache, private val localMangaRepository: LocalMangaRepository, private val mangaDataRepository: MangaDataRepository, - private val settings: AppSettings, private val mangaRepositoryFactory: MangaRepository.Factory, @LocalStorageChanges private val localStorageChanges: MutableSharedFlow, notificationFactoryFactory: DownloadNotificationFactory.Factory, @@ -89,16 +93,15 @@ class DownloadWorker @AssistedInject constructor( private val notificationFactory = notificationFactoryFactory.create(params.id) private val notificationManager = appContext.getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager + private val slowdownDispatcher = DownloadSlowdownDispatcher(mangaRepositoryFactory, SLOWDOWN_DELAY) @Volatile private var lastPublishedState: DownloadState? = null private val currentState: DownloadState get() = checkNotNull(lastPublishedState) - private val pausingHandle = PausingHandle() private val timeLeftEstimator = TimeLeftEstimator() private val notificationThrottler = Throttler(400) - private val pausingReceiver = PausingReceiver(params.id, pausingHandle) override suspend fun doWork(): Result { setForeground(getForegroundInfo()) @@ -109,7 +112,9 @@ class DownloadWorker @AssistedInject constructor( val chaptersIds = inputData.getLongArray(CHAPTERS_IDS)?.takeUnless { it.isEmpty() } val downloadedIds = getDoneChapters(manga) return try { - downloadMangaImpl(manga, chaptersIds, downloadedIds) + withContext(PausingHandle()) { + downloadMangaImpl(manga, chaptersIds, downloadedIds) + } Result.success(currentState.toWorkData()) } catch (e: CancellationException) { withContext(NonCancellable) { @@ -153,6 +158,7 @@ class DownloadWorker @AssistedInject constructor( ) { var manga = subject val chaptersToSkip = excludedIds.toMutableSet() + val pausingReceiver = PausingReceiver(id, PausingHandle.current()) withMangaLock(manga) { ContextCompat.registerReceiver( applicationContext, @@ -180,39 +186,47 @@ class DownloadWorker @AssistedInject constructor( } val chapters = getChapters(mangaDetails, includedIds) for ((chapterIndex, chapter) in chapters.withIndex()) { + checkIsPaused() if (chaptersToSkip.remove(chapter.id)) { publishState(currentState.copy(downloadedChapters = currentState.downloadedChapters + 1)) continue } - val pages = runFailsafe(pausingHandle) { + val pages = runFailsafe { repo.getPages(chapter) } - for ((pageIndex, page) in pages.withIndex()) { - runFailsafe(pausingHandle) { - val url = repo.getPageUrl(page) - val file = cache.get(url) - ?: downloadFile(url, destination, tempFileName, repo.source) - output.addPage( - chapter = chapter, - file = file, - pageNumber = pageIndex, - ext = MimeTypeMap.getFileExtensionFromUrl(url), - ) + val pageCounter = AtomicInteger(0) + channelFlow { + val semaphore = Semaphore(MAX_PAGES_PARALLELISM) + for ((pageIndex, page) in pages.withIndex()) { + checkIsPaused() + launch { + semaphore.withPermit { + runFailsafe { + val url = repo.getPageUrl(page) + val file = cache.get(url) + ?: downloadFile(url, destination, tempFileName, repo.source) + output.addPage( + chapter = chapter, + file = file, + pageNumber = pageIndex, + ext = MimeTypeMap.getFileExtensionFromUrl(url), + ) + } + send(pageIndex) + } + } } + }.collect { publishState( currentState.copy( totalChapters = chapters.size, currentChapter = chapterIndex, totalPages = pages.size, - currentPage = pageIndex, + currentPage = pageCounter.incrementAndGet(), isIndeterminate = false, eta = timeLeftEstimator.getEta(), ), ) - - if (settings.isDownloadsSlowdownEnabled) { - delay(SLOWDOWN_DELAY) - } } if (output.flushChapter(chapter)) { runCatchingCancellable { @@ -244,14 +258,9 @@ class DownloadWorker @AssistedInject constructor( } private suspend fun runFailsafe( - pausingHandle: PausingHandle, block: suspend () -> R, ): R { - if (pausingHandle.isPaused) { - publishState(currentState.copy(isPaused = true, eta = -1L)) - pausingHandle.awaitResumed() - publishState(currentState.copy(isPaused = false)) - } + checkIsPaused() var countDown = MAX_FAILSAFE_ATTEMPTS failsafe@ while (true) { try { @@ -266,9 +275,13 @@ class DownloadWorker @AssistedInject constructor( ), ) countDown = MAX_FAILSAFE_ATTEMPTS + val pausingHandle = PausingHandle.current() pausingHandle.pause() - pausingHandle.awaitResumed() - publishState(currentState.copy(isPaused = false, error = null)) + try { + pausingHandle.awaitResumed() + } finally { + publishState(currentState.copy(isPaused = false, error = null)) + } } else { countDown-- val retryDelay = if (e is TooManyRequestExceptions) { @@ -282,6 +295,18 @@ class DownloadWorker @AssistedInject constructor( } } + private suspend fun checkIsPaused() { + val pausingHandle = PausingHandle.current() + if (pausingHandle.isPaused) { + publishState(currentState.copy(isPaused = true, eta = -1L)) + try { + pausingHandle.awaitResumed() + } finally { + publishState(currentState.copy(isPaused = false)) + } + } + } + private suspend fun downloadFile( url: String, destination: File, @@ -295,13 +320,19 @@ class DownloadWorker @AssistedInject constructor( .cacheControl(CommonHeaders.CACHE_CONTROL_NO_STORE) .get() .build() + slowdownDispatcher.delay(source) val call = okHttp.newCall(request) val file = File(destination, tempFileName) - val response = call.clone().await() - checkNotNull(response.body).use { body -> - file.sink(append = false).buffer().use { - it.writeAllCancellable(body.source()) + try { + val response = call.clone().await() + checkNotNull(response.body).use { body -> + file.sink(append = false).buffer().use { + it.writeAllCancellable(body.source()) + } } + } catch (e: CancellationException) { + file.delete() + throw e } return file } @@ -461,8 +492,9 @@ class DownloadWorker @AssistedInject constructor( private companion object { const val MAX_FAILSAFE_ATTEMPTS = 2 + const val MAX_PAGES_PARALLELISM = 4 const val DOWNLOAD_ERROR_DELAY = 500L - const val SLOWDOWN_DELAY = 100L + const val SLOWDOWN_DELAY = 200L const val MANGA_ID = "manga_id" const val CHAPTERS_IDS = "chapters" const val TAG = "download" diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/PausingHandle.kt b/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/PausingHandle.kt index 6a660c2ab..c73cd6bbe 100644 --- a/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/PausingHandle.kt +++ b/app/src/main/kotlin/org/koitharu/kotatsu/download/ui/worker/PausingHandle.kt @@ -1,11 +1,13 @@ package org.koitharu.kotatsu.download.ui.worker import androidx.annotation.AnyThread +import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first +import kotlin.coroutines.AbstractCoroutineContextElement +import kotlin.coroutines.CoroutineContext -class PausingHandle { +class PausingHandle : AbstractCoroutineContextElement(PausingHandle) { private val paused = MutableStateFlow(false) @@ -15,7 +17,7 @@ class PausingHandle { @AnyThread suspend fun awaitResumed() { - paused.filter { !it }.first() + paused.first { !it } } @AnyThread @@ -27,4 +29,17 @@ class PausingHandle { fun resume() { paused.value = false } + + suspend fun yield() { + if (paused.value) { + paused.first { !it } + } + } + + companion object : CoroutineContext.Key { + + suspend fun current() = checkNotNull(currentCoroutineContext()[this]) { + "PausingHandle not found in current context" + } + } } diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaDirOutput.kt b/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaDirOutput.kt index 339bcfcba..9dd76c100 100644 --- a/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaDirOutput.kt +++ b/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaDirOutput.kt @@ -2,6 +2,8 @@ package org.koitharu.kotatsu.local.data.output import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runInterruptible +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.koitharu.kotatsu.core.model.findById import org.koitharu.kotatsu.core.model.isLocal import org.koitharu.kotatsu.core.util.ext.deleteAwait @@ -20,6 +22,7 @@ class LocalMangaDirOutput( private val chaptersOutput = HashMap() private val index = MangaIndex(File(rootFile, ENTRY_NAME_INDEX).takeIfReadable()?.readText()) + private val mutex = Mutex() init { if (!manga.isLocal) { @@ -29,7 +32,7 @@ class LocalMangaDirOutput( override suspend fun mergeWithExisting() = Unit - override suspend fun addCover(file: File, ext: String) { + override suspend fun addCover(file: File, ext: String) = mutex.withLock { val name = buildString { append("cover") if (ext.isNotEmpty() && ext.length <= 4) { @@ -44,7 +47,7 @@ class LocalMangaDirOutput( flushIndex() } - override suspend fun addPage(chapter: MangaChapter, file: File, pageNumber: Int, ext: String) { + override suspend fun addPage(chapter: MangaChapter, file: File, pageNumber: Int, ext: String) = mutex.withLock { val output = chaptersOutput.getOrPut(chapter) { ZipOutput(File(rootFile, chapterFileName(chapter) + SUFFIX_TMP)) } @@ -61,14 +64,14 @@ class LocalMangaDirOutput( index.addChapter(chapter, chapterFileName(chapter)) } - override suspend fun flushChapter(chapter: MangaChapter): Boolean { - val output = chaptersOutput.remove(chapter) ?: return false + override suspend fun flushChapter(chapter: MangaChapter): Boolean = mutex.withLock { + val output = chaptersOutput.remove(chapter) ?: return@withLock false output.flushAndFinish() flushIndex() - return true + true } - override suspend fun finish() { + override suspend fun finish() = mutex.withLock { flushIndex() for (output in chaptersOutput.values) { output.flushAndFinish() @@ -76,7 +79,7 @@ class LocalMangaDirOutput( chaptersOutput.clear() } - override suspend fun cleanup() { + override suspend fun cleanup() = mutex.withLock { for (output in chaptersOutput.values) { output.file.deleteAwait() } @@ -88,7 +91,7 @@ class LocalMangaDirOutput( } } - suspend fun deleteChapter(chapterId: Long) { + suspend fun deleteChapter(chapterId: Long) = mutex.withLock { val chapter = checkNotNull(index.getMangaInfo()?.chapters) { "No chapters found" }.findById(chapterId) ?: error("Chapter not found") diff --git a/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaZipOutput.kt b/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaZipOutput.kt index 0596a7324..747296ceb 100644 --- a/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaZipOutput.kt +++ b/app/src/main/kotlin/org/koitharu/kotatsu/local/data/output/LocalMangaZipOutput.kt @@ -3,6 +3,8 @@ package org.koitharu.kotatsu.local.data.output import androidx.annotation.WorkerThread import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runInterruptible +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.koitharu.kotatsu.core.model.isLocal import org.koitharu.kotatsu.core.util.ext.deleteAwait import org.koitharu.kotatsu.core.util.ext.readText @@ -20,6 +22,7 @@ class LocalMangaZipOutput( private val output = ZipOutput(File(rootFile.path + ".tmp")) private val index = MangaIndex(null) + private val mutex = Mutex() init { if (!manga.isLocal) { @@ -27,7 +30,7 @@ class LocalMangaZipOutput( } } - override suspend fun mergeWithExisting() { + override suspend fun mergeWithExisting() = mutex.withLock { if (rootFile.exists()) { runInterruptible(Dispatchers.IO) { mergeWith(rootFile) @@ -35,7 +38,7 @@ class LocalMangaZipOutput( } } - override suspend fun addCover(file: File, ext: String) { + override suspend fun addCover(file: File, ext: String) = mutex.withLock { val name = buildString { append(FILENAME_PATTERN.format(0, 0, 0)) if (ext.isNotEmpty() && ext.length <= 4) { @@ -49,7 +52,7 @@ class LocalMangaZipOutput( index.setCoverEntry(name) } - override suspend fun addPage(chapter: MangaChapter, file: File, pageNumber: Int, ext: String) { + override suspend fun addPage(chapter: MangaChapter, file: File, pageNumber: Int, ext: String) = mutex.withLock { val name = buildString { append(FILENAME_PATTERN.format(chapter.branch.hashCode(), chapter.number, pageNumber)) if (ext.isNotEmpty() && ext.length <= 4) { @@ -65,7 +68,7 @@ class LocalMangaZipOutput( override suspend fun flushChapter(chapter: MangaChapter): Boolean = false - override suspend fun finish() { + override suspend fun finish() = mutex.withLock { runInterruptible(Dispatchers.IO) { output.put(ENTRY_NAME_INDEX, index.toString()) output.finish() @@ -73,10 +76,12 @@ class LocalMangaZipOutput( } rootFile.deleteAwait() output.file.renameTo(rootFile) + Unit } - override suspend fun cleanup() { + override suspend fun cleanup() = mutex.withLock { output.file.deleteAwait() + Unit } override fun close() { diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index bdefa5ace..16de403ef 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -533,4 +533,5 @@ Filtering by multiple genres is not supported by this manga source Filtering by multiple states is not supported by this manga source Search is not supported by this manga source + You can enable download slowdown for each manga source individually in the source settings if you are having problems with server-side blocking diff --git a/app/src/main/res/xml/pref_downloads.xml b/app/src/main/res/xml/pref_downloads.xml index c08522ac4..a9e792269 100644 --- a/app/src/main/res/xml/pref_downloads.xml +++ b/app/src/main/res/xml/pref_downloads.xml @@ -17,13 +17,14 @@ android:defaultValue="false" android:key="downloads_wifi" android:summary="@string/downloads_wifi_only_summary" - android:title="@string/downloads_wifi_only" + android:title="@string/downloads_wifi_only" /> + + - - diff --git a/app/src/main/res/xml/pref_source.xml b/app/src/main/res/xml/pref_source.xml index 5422bb214..21e5cca6c 100644 --- a/app/src/main/res/xml/pref_source.xml +++ b/app/src/main/res/xml/pref_source.xml @@ -17,4 +17,12 @@ android:summary="@string/clear_source_cookies_summary" android:title="@string/clear_cookies" /> + +