Downloader improvements

This commit is contained in:
Koitharu
2023-12-01 14:07:35 +02:00
parent ff05f3f79d
commit a7a9ee9d59
11 changed files with 158 additions and 61 deletions

View File

@@ -0,0 +1,28 @@
package org.koitharu.kotatsu.download.ui.worker
import kotlinx.coroutines.delay
import org.koitharu.kotatsu.core.parser.MangaRepository
import org.koitharu.kotatsu.core.parser.RemoteMangaRepository
import org.koitharu.kotatsu.parsers.model.MangaSource
class DownloadSlowdownDispatcher(
private val mangaRepositoryFactory: MangaRepository.Factory,
private val defaultDelay: Long,
) {
private val timeMap = HashMap<MangaSource, Long>()
suspend fun delay(source: MangaSource) {
val repo = mangaRepositoryFactory.create(source) as? RemoteMangaRepository ?: return
if (!repo.isSlowdownEnabled()) {
return
}
val lastRequest = synchronized(timeMap) {
val res = timeMap[source] ?: 0L
timeMap[source] = System.currentTimeMillis()
res
}
if (lastRequest != 0L) {
delay(lastRequest + defaultDelay - System.currentTimeMillis())
}
}
}

View File

@@ -28,6 +28,10 @@ import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
import okhttp3.Request
@@ -71,6 +75,7 @@ import org.koitharu.kotatsu.parsers.util.runCatchingCancellable
import java.io.File
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Inject
@HiltWorker
@@ -81,7 +86,6 @@ class DownloadWorker @AssistedInject constructor(
private val cache: PagesCache,
private val localMangaRepository: LocalMangaRepository,
private val mangaDataRepository: MangaDataRepository,
private val settings: AppSettings,
private val mangaRepositoryFactory: MangaRepository.Factory,
@LocalStorageChanges private val localStorageChanges: MutableSharedFlow<LocalManga?>,
notificationFactoryFactory: DownloadNotificationFactory.Factory,
@@ -89,16 +93,15 @@ class DownloadWorker @AssistedInject constructor(
private val notificationFactory = notificationFactoryFactory.create(params.id)
private val notificationManager = appContext.getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
private val slowdownDispatcher = DownloadSlowdownDispatcher(mangaRepositoryFactory, SLOWDOWN_DELAY)
@Volatile
private var lastPublishedState: DownloadState? = null
private val currentState: DownloadState
get() = checkNotNull(lastPublishedState)
private val pausingHandle = PausingHandle()
private val timeLeftEstimator = TimeLeftEstimator()
private val notificationThrottler = Throttler(400)
private val pausingReceiver = PausingReceiver(params.id, pausingHandle)
override suspend fun doWork(): Result {
setForeground(getForegroundInfo())
@@ -109,7 +112,9 @@ class DownloadWorker @AssistedInject constructor(
val chaptersIds = inputData.getLongArray(CHAPTERS_IDS)?.takeUnless { it.isEmpty() }
val downloadedIds = getDoneChapters(manga)
return try {
downloadMangaImpl(manga, chaptersIds, downloadedIds)
withContext(PausingHandle()) {
downloadMangaImpl(manga, chaptersIds, downloadedIds)
}
Result.success(currentState.toWorkData())
} catch (e: CancellationException) {
withContext(NonCancellable) {
@@ -153,6 +158,7 @@ class DownloadWorker @AssistedInject constructor(
) {
var manga = subject
val chaptersToSkip = excludedIds.toMutableSet()
val pausingReceiver = PausingReceiver(id, PausingHandle.current())
withMangaLock(manga) {
ContextCompat.registerReceiver(
applicationContext,
@@ -180,39 +186,47 @@ class DownloadWorker @AssistedInject constructor(
}
val chapters = getChapters(mangaDetails, includedIds)
for ((chapterIndex, chapter) in chapters.withIndex()) {
checkIsPaused()
if (chaptersToSkip.remove(chapter.id)) {
publishState(currentState.copy(downloadedChapters = currentState.downloadedChapters + 1))
continue
}
val pages = runFailsafe(pausingHandle) {
val pages = runFailsafe {
repo.getPages(chapter)
}
for ((pageIndex, page) in pages.withIndex()) {
runFailsafe(pausingHandle) {
val url = repo.getPageUrl(page)
val file = cache.get(url)
?: downloadFile(url, destination, tempFileName, repo.source)
output.addPage(
chapter = chapter,
file = file,
pageNumber = pageIndex,
ext = MimeTypeMap.getFileExtensionFromUrl(url),
)
val pageCounter = AtomicInteger(0)
channelFlow {
val semaphore = Semaphore(MAX_PAGES_PARALLELISM)
for ((pageIndex, page) in pages.withIndex()) {
checkIsPaused()
launch {
semaphore.withPermit {
runFailsafe {
val url = repo.getPageUrl(page)
val file = cache.get(url)
?: downloadFile(url, destination, tempFileName, repo.source)
output.addPage(
chapter = chapter,
file = file,
pageNumber = pageIndex,
ext = MimeTypeMap.getFileExtensionFromUrl(url),
)
}
send(pageIndex)
}
}
}
}.collect {
publishState(
currentState.copy(
totalChapters = chapters.size,
currentChapter = chapterIndex,
totalPages = pages.size,
currentPage = pageIndex,
currentPage = pageCounter.incrementAndGet(),
isIndeterminate = false,
eta = timeLeftEstimator.getEta(),
),
)
if (settings.isDownloadsSlowdownEnabled) {
delay(SLOWDOWN_DELAY)
}
}
if (output.flushChapter(chapter)) {
runCatchingCancellable {
@@ -244,14 +258,9 @@ class DownloadWorker @AssistedInject constructor(
}
private suspend fun <R> runFailsafe(
pausingHandle: PausingHandle,
block: suspend () -> R,
): R {
if (pausingHandle.isPaused) {
publishState(currentState.copy(isPaused = true, eta = -1L))
pausingHandle.awaitResumed()
publishState(currentState.copy(isPaused = false))
}
checkIsPaused()
var countDown = MAX_FAILSAFE_ATTEMPTS
failsafe@ while (true) {
try {
@@ -266,9 +275,13 @@ class DownloadWorker @AssistedInject constructor(
),
)
countDown = MAX_FAILSAFE_ATTEMPTS
val pausingHandle = PausingHandle.current()
pausingHandle.pause()
pausingHandle.awaitResumed()
publishState(currentState.copy(isPaused = false, error = null))
try {
pausingHandle.awaitResumed()
} finally {
publishState(currentState.copy(isPaused = false, error = null))
}
} else {
countDown--
val retryDelay = if (e is TooManyRequestExceptions) {
@@ -282,6 +295,18 @@ class DownloadWorker @AssistedInject constructor(
}
}
private suspend fun checkIsPaused() {
val pausingHandle = PausingHandle.current()
if (pausingHandle.isPaused) {
publishState(currentState.copy(isPaused = true, eta = -1L))
try {
pausingHandle.awaitResumed()
} finally {
publishState(currentState.copy(isPaused = false))
}
}
}
private suspend fun downloadFile(
url: String,
destination: File,
@@ -295,13 +320,19 @@ class DownloadWorker @AssistedInject constructor(
.cacheControl(CommonHeaders.CACHE_CONTROL_NO_STORE)
.get()
.build()
slowdownDispatcher.delay(source)
val call = okHttp.newCall(request)
val file = File(destination, tempFileName)
val response = call.clone().await()
checkNotNull(response.body).use { body ->
file.sink(append = false).buffer().use {
it.writeAllCancellable(body.source())
try {
val response = call.clone().await()
checkNotNull(response.body).use { body ->
file.sink(append = false).buffer().use {
it.writeAllCancellable(body.source())
}
}
} catch (e: CancellationException) {
file.delete()
throw e
}
return file
}
@@ -461,8 +492,9 @@ class DownloadWorker @AssistedInject constructor(
private companion object {
const val MAX_FAILSAFE_ATTEMPTS = 2
const val MAX_PAGES_PARALLELISM = 4
const val DOWNLOAD_ERROR_DELAY = 500L
const val SLOWDOWN_DELAY = 100L
const val SLOWDOWN_DELAY = 200L
const val MANGA_ID = "manga_id"
const val CHAPTERS_IDS = "chapters"
const val TAG = "download"

View File

@@ -1,11 +1,13 @@
package org.koitharu.kotatsu.download.ui.worker
import androidx.annotation.AnyThread
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
class PausingHandle {
class PausingHandle : AbstractCoroutineContextElement(PausingHandle) {
private val paused = MutableStateFlow(false)
@@ -15,7 +17,7 @@ class PausingHandle {
@AnyThread
suspend fun awaitResumed() {
paused.filter { !it }.first()
paused.first { !it }
}
@AnyThread
@@ -27,4 +29,17 @@ class PausingHandle {
fun resume() {
paused.value = false
}
suspend fun yield() {
if (paused.value) {
paused.first { !it }
}
}
companion object : CoroutineContext.Key<PausingHandle> {
suspend fun current() = checkNotNull(currentCoroutineContext()[this]) {
"PausingHandle not found in current context"
}
}
}