DefaultIdempotentLeaseManager.kt

package io.github.lishangbu.avalon.idempotent.lease

import io.github.lishangbu.avalon.idempotent.properties.IdempotentProperties
import io.github.lishangbu.avalon.idempotent.store.IdempotentStore
import java.time.Duration
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit

/**
 * Default background lease renewer backed by a scheduled executor.
 */
class DefaultIdempotentLeaseManager(
    private val properties: IdempotentProperties,
    private val idempotentStore: IdempotentStore,
    private val scheduledExecutorService: ScheduledExecutorService,
) : IdempotentLeaseManager {
    override fun start(
        key: String,
        token: String,
    ): IdempotentLeaseManager.LeaseHandle {
        val interval = resolveRenewInterval(properties.processingTtl, properties.renewInterval)
        val future =
            scheduledExecutorService.scheduleAtFixedRate(
                {
                    val renewed =
                        runCatching {
                            idempotentStore.renew(
                                key = key,
                                token = token,
                                processingTtl = properties.processingTtl,
                            )
                        }.getOrDefault(false)
                    if (!renewed) {
                        throw StopRenewalSignal
                    }
                },
                interval.toMillis(),
                interval.toMillis(),
                TimeUnit.MILLISECONDS,
            )
        return IdempotentLeaseManager.LeaseHandle {
            future.cancel(false)
        }
    }

    private fun resolveRenewInterval(
        processingTtl: Duration,
        configuredRenewInterval: Duration?,
    ): Duration {
        val interval = configuredRenewInterval ?: processingTtl.dividedBy(3)
        return if (interval < MIN_RENEW_INTERVAL) MIN_RENEW_INTERVAL else interval
    }

    private companion object {
        val MIN_RENEW_INTERVAL: Duration = Duration.ofMillis(100)

        val StopRenewalSignal: RuntimeException =
            object : RuntimeException() {
                override fun fillInStackTrace(): Throwable = this
            }
    }
}