Migrate to Okio somewhere
This commit is contained in:
@@ -32,6 +32,8 @@ import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.internal.closeQuietly
|
||||
import okio.IOException
|
||||
import okio.buffer
|
||||
import okio.sink
|
||||
import org.koitharu.kotatsu.R
|
||||
import org.koitharu.kotatsu.base.domain.MangaDataRepository
|
||||
import org.koitharu.kotatsu.core.network.CommonHeaders
|
||||
@@ -50,12 +52,12 @@ import org.koitharu.kotatsu.parsers.model.MangaSource
|
||||
import org.koitharu.kotatsu.parsers.util.await
|
||||
import org.koitharu.kotatsu.parsers.util.mapToSet
|
||||
import org.koitharu.kotatsu.utils.WorkManagerHelper
|
||||
import org.koitharu.kotatsu.utils.ext.copyToSuspending
|
||||
import org.koitharu.kotatsu.utils.ext.deleteAwait
|
||||
import org.koitharu.kotatsu.utils.ext.getDisplayMessage
|
||||
import org.koitharu.kotatsu.utils.ext.ifNullOrEmpty
|
||||
import org.koitharu.kotatsu.utils.ext.printStackTraceDebug
|
||||
import org.koitharu.kotatsu.utils.ext.runCatchingCancellable
|
||||
import org.koitharu.kotatsu.utils.ext.writeAllCancellable
|
||||
import org.koitharu.kotatsu.utils.progress.TimeLeftEstimator
|
||||
import java.io.File
|
||||
import java.util.UUID
|
||||
@@ -261,8 +263,10 @@ class DownloadWorker @AssistedInject constructor(
|
||||
val call = okHttp.newCall(request)
|
||||
val file = File(destination, tempFileName)
|
||||
val response = call.clone().await()
|
||||
file.outputStream().use { out ->
|
||||
checkNotNull(response.body).byteStream().copyToSuspending(out)
|
||||
checkNotNull(response.body).use { body ->
|
||||
file.sink(append = false).buffer().use {
|
||||
it.writeAllCancellable(body.source())
|
||||
}
|
||||
}
|
||||
return file
|
||||
}
|
||||
|
||||
@@ -6,17 +6,19 @@ import dagger.hilt.android.qualifiers.ApplicationContext
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.runInterruptible
|
||||
import kotlinx.coroutines.withContext
|
||||
import okio.Source
|
||||
import okio.buffer
|
||||
import okio.sink
|
||||
import org.koitharu.kotatsu.parsers.util.SuspendLazy
|
||||
import org.koitharu.kotatsu.utils.FileSize
|
||||
import org.koitharu.kotatsu.utils.ext.copyToSuspending
|
||||
import org.koitharu.kotatsu.utils.ext.longHashCode
|
||||
import org.koitharu.kotatsu.utils.ext.printStackTraceDebug
|
||||
import org.koitharu.kotatsu.utils.ext.runCatchingCancellable
|
||||
import org.koitharu.kotatsu.utils.ext.subdir
|
||||
import org.koitharu.kotatsu.utils.ext.takeIfReadable
|
||||
import org.koitharu.kotatsu.utils.ext.takeIfWriteable
|
||||
import org.koitharu.kotatsu.utils.ext.writeAllCancellable
|
||||
import java.io.File
|
||||
import java.io.InputStream
|
||||
import javax.inject.Inject
|
||||
import javax.inject.Singleton
|
||||
|
||||
@@ -49,11 +51,11 @@ class PagesCache @Inject constructor(@ApplicationContext context: Context) {
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun put(url: String, inputStream: InputStream): File = withContext(Dispatchers.IO) {
|
||||
suspend fun put(url: String, source: Source): File = withContext(Dispatchers.IO) {
|
||||
val file = File(cacheDir.get().parentFile, url.longHashCode().toString())
|
||||
try {
|
||||
file.outputStream().use { out ->
|
||||
inputStream.copyToSuspending(out)
|
||||
file.sink(append = false).buffer().use {
|
||||
it.writeAllCancellable(source)
|
||||
}
|
||||
lruCache.get().put(url, file)
|
||||
} finally {
|
||||
|
||||
@@ -7,16 +7,19 @@ import dagger.Reusable
|
||||
import dagger.hilt.android.qualifiers.ApplicationContext
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.runInterruptible
|
||||
import kotlinx.coroutines.withContext
|
||||
import okio.buffer
|
||||
import okio.sink
|
||||
import okio.source
|
||||
import org.koitharu.kotatsu.core.exceptions.UnsupportedFileException
|
||||
import org.koitharu.kotatsu.local.data.CbzFilter
|
||||
import org.koitharu.kotatsu.local.data.LocalManga
|
||||
import org.koitharu.kotatsu.local.data.LocalStorageChanges
|
||||
import org.koitharu.kotatsu.local.data.LocalStorageManager
|
||||
import org.koitharu.kotatsu.local.data.input.LocalMangaInput
|
||||
import org.koitharu.kotatsu.utils.ext.copyToSuspending
|
||||
import org.koitharu.kotatsu.utils.ext.resolveName
|
||||
import org.koitharu.kotatsu.utils.ext.writeAllCancellable
|
||||
import java.io.File
|
||||
import java.io.IOException
|
||||
import javax.inject.Inject
|
||||
@@ -30,17 +33,17 @@ class SingleMangaImporter @Inject constructor(
|
||||
|
||||
private val contentResolver = context.contentResolver
|
||||
|
||||
suspend fun import(uri: Uri, progressState: MutableStateFlow<Float>?): LocalManga {
|
||||
suspend fun import(uri: Uri): LocalManga {
|
||||
val result = if (isDirectory(uri)) {
|
||||
importDirectory(uri, progressState)
|
||||
importDirectory(uri)
|
||||
} else {
|
||||
importFile(uri, progressState)
|
||||
importFile(uri)
|
||||
}
|
||||
localStorageChanges.emit(result)
|
||||
return result
|
||||
}
|
||||
|
||||
private suspend fun importFile(uri: Uri, progressState: MutableStateFlow<Float>?): LocalManga {
|
||||
private suspend fun importFile(uri: Uri): LocalManga = withContext(Dispatchers.IO) {
|
||||
val contentResolver = storageManager.contentResolver
|
||||
val name = contentResolver.resolveName(uri) ?: throw IOException("Cannot fetch name from uri: $uri")
|
||||
if (!CbzFilter.isFileSupported(name)) {
|
||||
@@ -50,14 +53,14 @@ class SingleMangaImporter @Inject constructor(
|
||||
runInterruptible {
|
||||
contentResolver.openInputStream(uri)
|
||||
}?.use { source ->
|
||||
dest.outputStream().use { output ->
|
||||
source.copyToSuspending(output, progressState = progressState)
|
||||
dest.sink().buffer().use { output ->
|
||||
output.writeAllCancellable(source.source())
|
||||
}
|
||||
} ?: throw IOException("Cannot open input stream: $uri")
|
||||
return LocalMangaInput.of(dest).getManga()
|
||||
LocalMangaInput.of(dest).getManga()
|
||||
}
|
||||
|
||||
private suspend fun importDirectory(uri: Uri, progressState: MutableStateFlow<Float>?): LocalManga {
|
||||
private suspend fun importDirectory(uri: Uri): LocalManga {
|
||||
val root = requireNotNull(DocumentFile.fromTreeUri(context, uri)) {
|
||||
"Provided uri $uri is not a tree"
|
||||
}
|
||||
@@ -80,9 +83,9 @@ class SingleMangaImporter @Inject constructor(
|
||||
docFile.copyTo(subDir)
|
||||
}
|
||||
} else {
|
||||
inputStream().use { input ->
|
||||
File(destDir, requireName()).outputStream().use { output ->
|
||||
input.copyToSuspending(output)
|
||||
inputStream().source().use { input ->
|
||||
File(destDir, requireName()).sink().buffer().use { output ->
|
||||
output.writeAllCancellable(input)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ class ImportWorker @AssistedInject constructor(
|
||||
val uri = inputData.getString(DATA_URI)?.toUriOrNull() ?: return Result.failure()
|
||||
setForeground(getForegroundInfo())
|
||||
val result = runCatchingCancellable {
|
||||
importer.import(uri, null).manga
|
||||
importer.import(uri).manga
|
||||
}
|
||||
val notification = buildNotification(result)
|
||||
notificationManager.notify(uri.hashCode(), notification)
|
||||
|
||||
@@ -20,6 +20,7 @@ import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okio.source
|
||||
import org.koitharu.kotatsu.core.network.CommonHeaders
|
||||
import org.koitharu.kotatsu.core.parser.MangaRepository
|
||||
import org.koitharu.kotatsu.core.parser.RemoteMangaRepository
|
||||
@@ -186,7 +187,7 @@ class PageLoader @Inject constructor(
|
||||
val entry = zip.getEntry(uri.fragment)
|
||||
zip.getInputStream(entry)
|
||||
}.use {
|
||||
cache.put(pageUrl, it)
|
||||
cache.put(pageUrl, it.source())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -204,8 +205,8 @@ class PageLoader @Inject constructor(
|
||||
val body = checkNotNull(response.body) {
|
||||
"Null response"
|
||||
}
|
||||
body.withProgress(progress).byteStream().use {
|
||||
cache.put(pageUrl, it)
|
||||
body.withProgress(progress).use {
|
||||
cache.put(pageUrl, it.source())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,14 @@ import kotlinx.coroutines.suspendCancellableCoroutine
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.HttpUrl.Companion.toHttpUrl
|
||||
import okio.IOException
|
||||
import okio.buffer
|
||||
import okio.sink
|
||||
import okio.source
|
||||
import org.koitharu.kotatsu.base.domain.MangaDataRepository
|
||||
import org.koitharu.kotatsu.parsers.model.MangaPage
|
||||
import org.koitharu.kotatsu.parsers.util.toFileNameSafe
|
||||
import org.koitharu.kotatsu.reader.domain.PageLoader
|
||||
import org.koitharu.kotatsu.utils.ext.copyToSuspending
|
||||
import org.koitharu.kotatsu.utils.ext.writeAllCancellable
|
||||
import java.io.File
|
||||
import javax.inject.Inject
|
||||
import kotlin.coroutines.Continuation
|
||||
@@ -49,10 +52,10 @@ class PageSaveHelper @Inject constructor(
|
||||
}
|
||||
}
|
||||
runInterruptible(Dispatchers.IO) {
|
||||
contentResolver.openOutputStream(destination)
|
||||
contentResolver.openOutputStream(destination)?.sink()?.buffer()
|
||||
}?.use { output ->
|
||||
pageFile.inputStream().use { input ->
|
||||
input.copyToSuspending(output)
|
||||
pageFile.source().use { input ->
|
||||
output.writeAllCancellable(input)
|
||||
}
|
||||
} ?: throw IOException("Output stream is null")
|
||||
return destination
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
package org.koitharu.kotatsu.utils
|
||||
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import okio.Buffer
|
||||
import okio.ForwardingSource
|
||||
import okio.Source
|
||||
|
||||
class CancellableSource(
|
||||
private val job: Job?,
|
||||
delegate: Source,
|
||||
) : ForwardingSource(delegate) {
|
||||
|
||||
override fun read(sink: Buffer, byteCount: Long): Long {
|
||||
job?.ensureActive()
|
||||
return super.read(sink, byteCount)
|
||||
}
|
||||
}
|
||||
@@ -3,37 +3,23 @@ package org.koitharu.kotatsu.utils.ext
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.currentCoroutineContext
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.ResponseBody
|
||||
import okio.BufferedSink
|
||||
import okio.Source
|
||||
import org.koitharu.kotatsu.utils.CancellableSource
|
||||
import org.koitharu.kotatsu.utils.progress.ProgressResponseBody
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
|
||||
suspend fun InputStream.copyToSuspending(
|
||||
out: OutputStream,
|
||||
bufferSize: Int = DEFAULT_BUFFER_SIZE,
|
||||
progressState: MutableStateFlow<Float>? = null,
|
||||
): Long = withContext(Dispatchers.IO) {
|
||||
val job = currentCoroutineContext()[Job]
|
||||
val total = available()
|
||||
var bytesCopied: Long = 0
|
||||
val buffer = ByteArray(bufferSize)
|
||||
var bytes = read(buffer)
|
||||
while (bytes >= 0) {
|
||||
out.write(buffer, 0, bytes)
|
||||
bytesCopied += bytes
|
||||
job?.ensureActive()
|
||||
bytes = read(buffer)
|
||||
job?.ensureActive()
|
||||
if (progressState != null && total > 0) {
|
||||
progressState.value = bytesCopied / total.toFloat()
|
||||
}
|
||||
}
|
||||
bytesCopied
|
||||
}
|
||||
|
||||
fun ResponseBody.withProgress(progressState: MutableStateFlow<Float>): ResponseBody {
|
||||
return ProgressResponseBody(this, progressState)
|
||||
}
|
||||
|
||||
suspend fun Source.cancellable(): Source {
|
||||
val job = currentCoroutineContext()[Job]
|
||||
return CancellableSource(job, this)
|
||||
}
|
||||
|
||||
suspend fun BufferedSink.writeAllCancellable(source: Source) = withContext(Dispatchers.IO) {
|
||||
writeAll(source.cancellable())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user