optimized http

This commit is contained in:
mertalev
2026-01-20 13:08:50 -05:00
parent 00ea98479f
commit 95fad974c1
5 changed files with 387 additions and 155 deletions

View File

@@ -1,28 +1,38 @@
#include <jni.h>
#include <stdlib.h>
#include <string.h>
JNIEXPORT jlong JNICALL
Java_app_alextran_immich_images_LocalImagesImpl_allocateNative(
Java_app_alextran_immich_NativeBuffer_allocate(
JNIEnv *env, jclass clazz, jint size) {
void *ptr = malloc(size);
return (jlong) ptr;
}
JNIEXPORT void JNICALL
Java_app_alextran_immich_images_LocalImagesImpl_freeNative(
Java_app_alextran_immich_NativeBuffer_free(
JNIEnv *env, jclass clazz, jlong address) {
free((void *) address);
}
JNIEXPORT jlong JNICALL
Java_app_alextran_immich_images_LocalImagesImpl_reallocNative(
Java_app_alextran_immich_NativeBuffer_realloc(
JNIEnv *env, jclass clazz, jlong address, jint size) {
void *ptr = realloc((void *) address, size);
return (jlong) ptr;
}
JNIEXPORT jobject JNICALL
Java_app_alextran_immich_images_LocalImagesImpl_wrapAsBuffer(
Java_app_alextran_immich_NativeBuffer_wrap(
JNIEnv *env, jclass clazz, jlong address, jint capacity) {
return (*env)->NewDirectByteBuffer(env, (void *) address, capacity);
}
JNIEXPORT void JNICALL
Java_app_alextran_immich_NativeBuffer_copy(
JNIEnv *env, jclass clazz, jobject buffer, jlong destAddress, jint offset, jint length) {
void *src = (*env)->GetDirectBufferAddress(env, buffer);
if (src != NULL) {
memcpy((void *) destAddress, (char *) src + offset, length);
}
}

View File

@@ -0,0 +1,28 @@
package app.alextran.immich
import java.nio.ByteBuffer
/**
* JNI interface for native memory operations.
* Used by HTTP responses and image processing to avoid copies.
*/
object NativeBuffer {
init {
System.loadLibrary("native_buffer")
}
@JvmStatic
external fun allocate(size: Int): Long
@JvmStatic
external fun free(address: Long)
@JvmStatic
external fun realloc(address: Long, size: Int): Long
@JvmStatic
external fun wrap(address: Long, capacity: Int): ByteBuffer
@JvmStatic
external fun copy(buffer: ByteBuffer, destAddress: Long, offset: Int, length: Int)
}

View File

@@ -12,7 +12,7 @@ import android.provider.MediaStore.Images
import android.provider.MediaStore.Video
import android.util.Size
import androidx.annotation.RequiresApi
import java.nio.ByteBuffer
import app.alextran.immich.NativeBuffer
import kotlin.math.*
import java.util.concurrent.Executors
import com.bumptech.glide.Glide
@@ -44,9 +44,9 @@ inline fun ImageDecoder.Source.decodeBitmap(target: Size = Size(0, 0)): Bitmap {
fun Bitmap.toNativeBuffer(): Map<String, Long> {
val size = width * height * 4
val pointer = LocalImagesImpl.allocateNative(size)
val pointer = NativeBuffer.allocate(size)
try {
val buffer = LocalImagesImpl.wrapAsBuffer(pointer, size)
val buffer = NativeBuffer.wrap(pointer, size)
copyPixelsToBuffer(buffer)
recycle()
return mapOf(
@@ -56,7 +56,7 @@ fun Bitmap.toNativeBuffer(): Map<String, Long> {
"rowBytes" to (width * 4).toLong()
)
} catch (e: Exception) {
LocalImagesImpl.freeNative(pointer)
NativeBuffer.free(pointer)
recycle()
throw e
}
@@ -73,22 +73,6 @@ class LocalImagesImpl(context: Context) : LocalImageApi {
companion object {
val CANCELLED = Result.success<Map<String, Long>>(mapOf())
val OPTIONS = BitmapFactory.Options().apply { inPreferredConfig = Bitmap.Config.ARGB_8888 }
init {
System.loadLibrary("native_buffer")
}
@JvmStatic
external fun allocateNative(size: Int): Long
@JvmStatic
external fun freeNative(pointer: Long)
@JvmStatic
external fun reallocNative(pointer: Long, size: Int): Long
@JvmStatic
external fun wrapAsBuffer(address: Long, capacity: Int): ByteBuffer
}
override fun getThumbhash(thumbhash: String, callback: (Result<Map<String, Long>>) -> Unit) {

View File

@@ -2,41 +2,66 @@ package app.alextran.immich.images
import android.content.Context
import android.os.CancellationSignal
import android.os.OperationCanceledException
import app.alextran.immich.BuildConfig
import app.alextran.immich.NativeBuffer
import app.alextran.immich.core.SSLConfig
import com.google.net.cronet.okhttptransport.CronetCallFactory
import okhttp3.Cache
import okhttp3.Call
import okhttp3.Callback
import okhttp3.ConnectionPool
import okhttp3.Dispatcher
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.Cache
import okhttp3.ConnectionPool
import okhttp3.Dispatcher
import org.chromium.net.CronetEngine
import org.chromium.net.CronetException
import org.chromium.net.UrlRequest
import org.chromium.net.UrlResponseInfo
import java.io.File
import java.io.IOException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import okhttp3.Interceptor
import java.util.concurrent.atomic.AtomicInteger
import javax.net.ssl.SSLSocketFactory
import javax.net.ssl.X509TrustManager
import kotlin.concurrent.Volatile
data class RemoteRequest(
val callback: (Result<Map<String, Long>>) -> Unit,
private const val USER_AGENT = "Immich_Android_${BuildConfig.VERSION_NAME}"
private const val MAX_REQUESTS_PER_HOST = 16
private const val KEEP_ALIVE_CONNECTIONS = 10
private const val KEEP_ALIVE_DURATION_MINUTES = 5L
private const val CACHE_SIZE_BYTES = 1024L * 1024 * 1024
private const val INITIAL_BUFFER_SIZE = 64 * 1024
private class RemoteRequest(
val cancellationSignal: CancellationSignal,
var pointer: Long = 0L,
)
class UserAgentInterceptor : Interceptor {
companion object {
const val USER_AGENT = "Immich_Android_${BuildConfig.VERSION_NAME}"
private class NativeByteBuffer(initialCapacity: Int) {
var pointer = NativeBuffer.allocate(initialCapacity)
var capacity = initialCapacity
var offset = 0
fun ensureHeadroom(needed: Int = INITIAL_BUFFER_SIZE) {
if (offset + needed > capacity) {
capacity = (capacity * 2).coerceAtLeast(offset + needed)
pointer = NativeBuffer.realloc(pointer, capacity)
}
}
override fun intercept(chain: Interceptor.Chain): Response {
return chain.proceed(
chain.request().newBuilder()
.header("User-Agent", USER_AGENT)
.build()
)
fun wrapRemaining() = NativeBuffer.wrap(pointer + offset, capacity - offset)
fun advance(bytesRead: Int) { offset += bytesRead }
fun free() {
if (pointer != 0L) {
NativeBuffer.free(pointer)
pointer = 0L
}
}
}
@@ -46,89 +71,46 @@ class RemoteImagesImpl(context: Context) : RemoteImageApi {
init {
appContext = context.applicationContext
cacheDir = context.cacheDir
client = buildClient()
fetcher = buildFetcher()
}
companion object {
private const val MAX_REQUESTS_PER_HOST = 16
private const val KEEP_ALIVE_CONNECTIONS = 10
private const val KEEP_ALIVE_DURATION_MINUTES = 5L
private const val CACHE_SIZE_BYTES = 1024L * 1024 * 1024
private const val INITIAL_BUFFER_SIZE = 64 * 1024
val CANCELLED = Result.success<Map<String, Long>>(emptyMap())
private var appContext: Context? = null
private var cacheDir: File? = null
private var client: Call.Factory? = null
private var cronetEngine: CronetEngine? = null
private var fetcher: ImageFetcher? = null
init {
System.loadLibrary("native_buffer")
SSLConfig.addListener(::invalidateClient)
SSLConfig.addListener(::invalidateFetcher)
}
private fun invalidateClient() {
(client as? OkHttpClient)?.let {
it.dispatcher.cancelAll()
it.connectionPool.evictAll()
it.cache?.close()
private fun invalidateFetcher() {
val oldFetcher = fetcher
val needsOkHttp = SSLConfig.requiresCustomSSL
fetcher = when {
// OkHttp → OkHttp: reconfigure, sharing cache/dispatcher
oldFetcher is OkHttpImageFetcher && needsOkHttp -> {
oldFetcher.reconfigure(SSLConfig.sslSocketFactory, SSLConfig.trustManager)
}
// Any other transition: graceful drain, create new
else -> {
oldFetcher?.drain()
buildFetcher()
}
}
cronetEngine?.shutdown()
cronetEngine = null
client = buildClient()
}
private fun buildClient(): Call.Factory {
private fun buildFetcher(): ImageFetcher {
val ctx = appContext ?: throw IllegalStateException("Context not set")
val dir = cacheDir ?: throw IllegalStateException("Cache dir not set")
return if (SSLConfig.requiresCustomSSL) {
buildOkHttpClient(dir)
OkHttpImageFetcher.create(dir, SSLConfig.sslSocketFactory, SSLConfig.trustManager)
} else {
buildCronetClient(dir)
CronetImageFetcher(ctx, dir)
}
}
private fun buildCronetClient(cacheDir: File): Call.Factory {
val ctx = appContext ?: throw IllegalStateException("Context not set")
val storageDir = File(cacheDir, "cronet").apply { mkdirs() }
val engine = CronetEngine.Builder(ctx)
.enableHttp2(true)
.enableQuic(true)
.enableBrotli(true)
.setStoragePath(storageDir.absolutePath)
// .enableHttpCache(CronetEngine.Builder.HTTP_CACHE_DISK, CACHE_SIZE_BYTES)
.setUserAgent(UserAgentInterceptor.USER_AGENT)
.build()
.also { cronetEngine = it }
return CronetCallFactory.newBuilder(engine).build()
}
private fun buildOkHttpClient(cacheDir: File): OkHttpClient {
val dir = File(cacheDir, "okhttp")
val connectionPool = ConnectionPool(
maxIdleConnections = KEEP_ALIVE_CONNECTIONS,
keepAliveDuration = KEEP_ALIVE_DURATION_MINUTES,
timeUnit = TimeUnit.MINUTES
)
val builder = OkHttpClient.Builder()
.addInterceptor(UserAgentInterceptor())
.dispatcher(Dispatcher().apply { maxRequestsPerHost = MAX_REQUESTS_PER_HOST })
.connectionPool(connectionPool)
builder.cache(Cache((File(dir, "thumbnails")), CACHE_SIZE_BYTES))
val sslSocketFactory = SSLConfig.sslSocketFactory
val trustManager = SSLConfig.trustManager
if (sslSocketFactory != null && trustManager != null) {
builder.sslSocketFactory(sslSocketFactory, trustManager)
}
return builder.build()
}
}
override fun requestImage(
@@ -137,74 +119,300 @@ class RemoteImagesImpl(context: Context) : RemoteImageApi {
requestId: Long,
callback: (Result<Map<String, Long>>) -> Unit
) {
val client = client ?: return callback(Result.failure(RuntimeException("No client")))
val fetcher = fetcher ?: return callback(Result.failure(RuntimeException("No fetcher")))
val signal = CancellationSignal()
val requestBuilder = Request.Builder().url(url)
headers.forEach(requestBuilder::addHeader)
val call = client.newCall(requestBuilder.build())
signal.setOnCancelListener(call::cancel)
val request = RemoteRequest(callback, signal)
val request = RemoteRequest(signal)
requestMap[requestId] = request
fetcher.fetch(
url,
headers,
signal,
onSuccess = { buffer ->
requestMap.remove(requestId)
if (signal.isCanceled) {
NativeBuffer.free(buffer.pointer)
return@fetch callback(CANCELLED)
}
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
callback(Result.success(mapOf(
"pointer" to buffer.pointer,
"length" to buffer.offset.toLong()
)))
},
onFailure = { e ->
requestMap.remove(requestId)
val result = if (signal.isCanceled) CANCELLED else Result.failure(e)
callback(result)
}
)
}
override fun cancelRequest(requestId: Long) {
requestMap.remove(requestId)?.cancellationSignal?.cancel()
}
override fun releaseImage(requestId: Long) {}
}
private sealed interface ImageFetcher {
fun fetch(
url: String,
headers: Map<String, String>,
signal: CancellationSignal,
onSuccess: (NativeByteBuffer) -> Unit,
onFailure: (Exception) -> Unit,
)
fun drain()
}
private class CronetImageFetcher(
context: Context,
cacheDir: File,
) : ImageFetcher {
private val engine: CronetEngine
private val executor = Executors.newSingleThreadExecutor()
val active = AtomicInteger(0)
@Volatile
private var draining = false
init {
val storageDir = File(cacheDir, "cronet").apply { mkdirs() }
engine = CronetEngine.Builder(context)
.enableHttp2(true)
.enableQuic(true)
.enableBrotli(true)
.setStoragePath(storageDir.absolutePath)
.setUserAgent(USER_AGENT)
.enableHttpCache(CronetEngine.Builder.HTTP_CACHE_DISK, CACHE_SIZE_BYTES)
.build()
}
override fun fetch(
url: String,
headers: Map<String, String>,
signal: CancellationSignal,
onSuccess: (NativeByteBuffer) -> Unit,
onFailure: (Exception) -> Unit,
) {
active.incrementAndGet()
if (draining) {
active.decrementAndGet()
throw IllegalStateException("Engine is draining")
}
val callback = FetchCallback(onSuccess, onFailure, ::onComplete)
val requestBuilder = engine.newUrlRequestBuilder(url, callback, executor)
headers.forEach { (key, value) -> requestBuilder.addHeader(key, value) }
val request = requestBuilder.build()
signal.setOnCancelListener { request.cancel() }
request.start()
}
private fun onComplete() {
// Runs on executor thread
active.decrementAndGet()
tryShutdown()
}
private fun tryShutdown() {
// Also on executor thread—no race with onRequestDone
if (draining && active.get() == 0) {
engine.shutdown()
executor.shutdown()
}
}
override fun drain() {
draining = true
// Post check to executor—runs after any already-queued callbacks
executor.execute(this::tryShutdown)
}
private class FetchCallback(
private val onSuccess: (NativeByteBuffer) -> Unit,
private val onFailure: (Exception) -> Unit,
private val onComplete: () -> Unit,
) : UrlRequest.Callback() {
private var buffer: NativeByteBuffer? = null
private var httpError: IOException? = null
override fun onRedirectReceived(request: UrlRequest, info: UrlResponseInfo, newUrl: String) {
request.followRedirect()
}
override fun onResponseStarted(request: UrlRequest, info: UrlResponseInfo) {
if (info.httpStatusCode !in 200..299) {
httpError = IOException("HTTP ${info.httpStatusCode}: ${info.httpStatusText}")
return request.cancel()
}
// Pre-size from Content-Length when available, otherwise use reasonable default
val capacity = info.allHeaders["content-length"]?.firstOrNull()?.toIntOrNull()
?.takeIf { it > 0 } ?: INITIAL_BUFFER_SIZE
buffer = NativeByteBuffer(capacity)
request.read(buffer!!.wrapRemaining())
}
override fun onReadCompleted(request: UrlRequest, info: UrlResponseInfo, byteBuffer: ByteBuffer) {
buffer!!.apply {
advance(byteBuffer.remaining())
ensureHeadroom()
}
request.read(buffer!!.wrapRemaining())
}
override fun onSucceeded(request: UrlRequest, info: UrlResponseInfo) {
onSuccess(buffer!!)
onComplete()
}
override fun onFailed(request: UrlRequest, info: UrlResponseInfo?, error: CronetException) {
buffer?.free()
onFailure(error)
onComplete()
}
override fun onCanceled(request: UrlRequest, info: UrlResponseInfo?) {
buffer?.free()
onFailure(httpError ?: OperationCanceledException())
onComplete()
}
}
}
private class OkHttpImageFetcher private constructor(
private val client: OkHttpClient,
) : ImageFetcher {
private val active = AtomicInteger(0)
@Volatile private var draining = false
companion object {
fun create(
cacheDir: File,
sslSocketFactory: SSLSocketFactory?,
trustManager: X509TrustManager?,
): OkHttpImageFetcher {
val dir = File(cacheDir, "okhttp")
val connectionPool = ConnectionPool(
maxIdleConnections = KEEP_ALIVE_CONNECTIONS,
keepAliveDuration = KEEP_ALIVE_DURATION_MINUTES,
timeUnit = TimeUnit.MINUTES
)
val builder = OkHttpClient.Builder()
.addInterceptor { chain ->
chain.proceed(
chain.request().newBuilder()
.header("User-Agent", USER_AGENT)
.build()
)
}
.dispatcher(Dispatcher().apply { maxRequestsPerHost = MAX_REQUESTS_PER_HOST })
.connectionPool(connectionPool)
.cache(Cache(File(dir, "thumbnails"), CACHE_SIZE_BYTES))
if (sslSocketFactory != null && trustManager != null) {
builder.sslSocketFactory(sslSocketFactory, trustManager)
}
return OkHttpImageFetcher(builder.build())
}
}
fun reconfigure(
sslSocketFactory: SSLSocketFactory?,
trustManager: X509TrustManager?,
): OkHttpImageFetcher {
val builder = client.newBuilder()
if (sslSocketFactory != null && trustManager != null) {
builder.sslSocketFactory(sslSocketFactory, trustManager)
}
// Evict idle connections using old SSL config
client.connectionPool.evictAll()
return OkHttpImageFetcher(builder.build())
}
private fun onComplete() {
if (active.decrementAndGet() == 0 && draining) {
client.cache?.close()
}
}
override fun fetch(
url: String,
headers: Map<String, String>,
signal: CancellationSignal,
onSuccess: (NativeByteBuffer) -> Unit,
onFailure: (Exception) -> Unit,
) {
active.incrementAndGet()
if (draining) {
active.decrementAndGet()
throw IllegalStateException("Client is draining")
}
val requestBuilder = Request.Builder().url(url)
headers.forEach { (key, value) -> requestBuilder.addHeader(key, value) }
val call = client.newCall(requestBuilder.build())
signal.setOnCancelListener { call.cancel() }
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
onFailure(e)
onComplete()
}
override fun onResponse(call: Call, response: Response) {
var pointer = 0L
var capacity: Int
try {
signal.throwIfCanceled()
val body = response.takeIf { it.isSuccessful }?.body
?: return callback(Result.failure(IOException(response.toString())))
response.use {
if (call.isCanceled()) {
return onFailure(OperationCanceledException())
}
val contentLength = body.contentLength()
capacity = if (contentLength > 0) contentLength.toInt() else INITIAL_BUFFER_SIZE
pointer = LocalImagesImpl.allocateNative(capacity)
request.pointer = pointer
if (!response.isSuccessful) {
return onFailure(IOException("HTTP ${response.code}: ${response.message}"))
}
var position = 0
body.source().use { source ->
while (!source.exhausted()) {
signal.throwIfCanceled()
if (position >= capacity) {
capacity = maxOf(capacity * 2, position + 8192)
pointer = LocalImagesImpl.reallocNative(pointer, capacity)
request.pointer = pointer
val body = response.body ?: return onFailure(IOException("Empty response body"))
val contentLength = body.contentLength()
val capacity = if (contentLength > 0 && contentLength <= Int.MAX_VALUE) {
contentLength.toInt()
} else {
INITIAL_BUFFER_SIZE
}
val buffer = NativeByteBuffer(capacity)
try {
body.source().use { source ->
while (true) {
signal.throwIfCanceled()
buffer.ensureHeadroom()
val bytesRead = source.read(buffer.wrapRemaining())
if (bytesRead == -1) break
buffer.advance(bytesRead)
}
}
val buffer = LocalImagesImpl.wrapAsBuffer(pointer + position, capacity - position)
val read = source.read(buffer)
if (read == -1) break
position += read
onSuccess(buffer)
} catch (e: Exception) {
buffer.free()
onFailure(e)
}
}
signal.throwIfCanceled()
request.pointer = 0L // Transfer ownership to Dart before callback
callback(Result.success(mapOf(
"pointer" to pointer,
"length" to position.toLong()
)))
} catch (e: Exception) {
if (pointer != 0L) LocalImagesImpl.freeNative(pointer)
val result = if (signal.isCanceled) CANCELLED else Result.failure(e)
callback(result)
} finally {
requestMap.remove(requestId)
response.close()
onComplete()
}
}
})
}
override fun cancelRequest(requestId: Long) {
// Just cancel the signal - memory cleanup happens in onResponse/onFailure
requestMap[requestId]?.cancellationSignal?.cancel()
override fun drain() {
draining = true
client.connectionPool.evictAll()
if (active.get() == 0) {
client.cache?.close()
}
}
override fun releaseImage(requestId: Long) {}
}

View File

@@ -7,6 +7,8 @@ package app.alextran.immich.images;
import java.nio.ByteBuffer;
import app.alextran.immich.NativeBuffer;
// modified to use native allocations
public final class ThumbHash {
/**
@@ -56,8 +58,8 @@ public final class ThumbHash {
int w = Math.round(ratio > 1.0f ? 32.0f : 32.0f * ratio);
int h = Math.round(ratio > 1.0f ? 32.0f / ratio : 32.0f);
int size = w * h * 4;
long pointer = LocalImagesImpl.allocateNative(size);
ByteBuffer rgba = LocalImagesImpl.wrapAsBuffer(pointer, size);
long pointer = NativeBuffer.allocate(size);
ByteBuffer rgba = NativeBuffer.wrap(pointer, size);
int cx_stop = Math.max(lx, hasAlpha ? 5 : 3);
int cy_stop = Math.max(ly, hasAlpha ? 5 : 3);
float[] fx = new float[cx_stop];