Improve wrokers parallelism

This commit is contained in:
Koitharu
2023-07-21 15:46:28 +03:00
parent 6dcb537a9a
commit 03b92c4898
2 changed files with 27 additions and 18 deletions

View File

@@ -36,6 +36,8 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import org.koitharu.kotatsu.R
import org.koitharu.kotatsu.core.model.distinctById
import org.koitharu.kotatsu.core.parser.MangaRepository
@@ -133,10 +135,13 @@ class SuggestionsWorker @AssistedInject constructor(
val tagsBlacklist = TagsBlacklist(appSettings.suggestionsTagsBlacklist, TAG_EQ_THRESHOLD)
val tags = seed.flatMap { it.tags.map { x -> x.title } }.takeMostFrequent(10)
val semaphore = Semaphore(MAX_PARALLELISM)
val producer = channelFlow {
for (it in sources.shuffled()) {
launch {
send(getList(it, tags, tagsBlacklist))
semaphore.withPermit {
send(getList(it, tags, tagsBlacklist))
}
}
}
}
@@ -376,6 +381,7 @@ class SuggestionsWorker @AssistedInject constructor(
const val MANGA_CHANNEL_ID = "suggestions"
const val WORKER_NOTIFICATION_ID = 36
const val MAX_RESULTS = 80
const val MAX_PARALLELISM = 3
const val MAX_SOURCE_RESULTS = 14
const val MAX_RAW_RESULTS = 200
const val TAG_EQ_THRESHOLD = 0.4f

View File

@@ -32,13 +32,14 @@ import coil.request.ImageRequest
import dagger.Reusable
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import org.koitharu.kotatsu.R
import org.koitharu.kotatsu.core.logs.FileLogger
@@ -119,23 +120,25 @@ class TrackWorker @AssistedInject constructor(
}
private suspend fun checkUpdatesAsync(tracks: List<TrackingItem>): List<MangaUpdates?> {
val dispatcher = Dispatchers.Default.limitedParallelism(MAX_PARALLELISM)
val semaphore = Semaphore(MAX_PARALLELISM)
return supervisorScope {
tracks.map { (track, channelId) ->
async(dispatcher) {
runCatchingCancellable {
tracker.fetchUpdates(track, commit = true)
}.onFailure {
logger.log("checkUpdatesAsync", it)
}.onSuccess { updates ->
if (updates.isValid && updates.isNotEmpty()) {
showNotification(
manga = updates.manga,
channelId = channelId,
newChapters = updates.newChapters,
)
}
}.getOrNull()
async {
semaphore.withPermit {
runCatchingCancellable {
tracker.fetchUpdates(track, commit = true)
}.onFailure {
logger.log("checkUpdatesAsync", it)
}.onSuccess { updates ->
if (updates.isValid && updates.isNotEmpty()) {
showNotification(
manga = updates.manga,
channelId = channelId,
newChapters = updates.newChapters,
)
}
}.getOrNull()
}
}
}.awaitAll()
}
@@ -299,7 +302,7 @@ class TrackWorker @AssistedInject constructor(
const val WORKER_NOTIFICATION_ID = 35
const val TAG = "tracking"
const val TAG_ONESHOT = "tracking_oneshot"
const val MAX_PARALLELISM = 4
const val MAX_PARALLELISM = 3
const val DATA_KEY_SUCCESS = "success"
const val DATA_KEY_FAILED = "failed"
}