We have a long running app, the kerberos renews expires every 7 days
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
I have this class which is creating a UGICache at the app start time, and we are relogin the UGI every 8hrs by a background thread
forceReloginFromKeytab is there any issue using the API ?
case class UGIEntry(ugi: UserGroupInformation, principal: String)
/** Manages the caching and refreshing of UserGroupInformation (UGI) for HDFS access. */
object UGICache extends Logging {
private val cache: TrieMap[String, UGIEntry] = TrieMap.empty
/**
* Normalize a given path by removing trailing slashes.
*
* @param path The HDFS path.
* @return Normalized path.
*/
private def normalizePath(path: String): String = path.replaceAll("/+$", "")
/**
* Pre-warm the UGI cache with a predefined list of principals and keytab paths, when the application starts.
*
* @param preWarmData A map of base HDFS path -> (keytabPath, principal).
*/
def preWarmUGICache(preWarmData: Map[String, (String, String)]): Unit = {
logger.info("Starting UGI cache pre-warming...")
preWarmData.foreach { case (hdfsBasePath, (keytabPath, principal)) =>
try {
val normalizedPath = normalizePath(hdfsBasePath)
val ugi = KerberosHadoopAuthenticator.getAuthenticatedUGI(None, KerberosHadoopAuthenticator.hadoopConfig)
val ugiEntry = UGIEntry(ugi, principal)
cache.put(normalizedPath, ugiEntry)
logger.info(s"Pre-warmed UGI cache for HDFS Path: $normalizedPath, Principal: $principal")
} catch {
case ex: Exception =>
logger.error(s"Failed to pre-warm UGI cache for HDFS Path: $hdfsBasePath, Principal: $principal. Error: ${ex.getMessage}")
}
}
logger.info("UGI cache pre-warming completed.")
}
/**
* Add a UGI to the cache dynamically.
*
* @param hdfsPath HDFS base path.
* @param ugiEntry The UGI entry to cache.
*/
def addUGI(hdfsPath: String, ugiEntry: UGIEntry): Unit = {
val normalizedPath = normalizePath(hdfsPath)
cache.put(normalizedPath, ugiEntry)
logger.info(s"Added UGI for HDFS Path: $normalizedPath, Principal: ${ugiEntry.principal}")
}
/**
* Retrieve a UGI from the cache.
*
* @param hdfsPath The HDFS path to look up.
* @return The UGIEntry if found.
*/
def getAuthenticatedUGI(hdfsPath: String): UGIEntry = {
val normalizedPath = normalizePath(hdfsPath)
cache.find { case (key, _) => normalizedPath.startsWith(key) } match {
case Some((key, entry)) =>
if (logger.isDebugEnabled) {
logger.debug(s"Retrieved UGI for base path: $key (requested path: $normalizedPath)")
}
entry
case None =>
throw new IllegalArgumentException(s"No UGI found for HDFS Path: $hdfsPath")
}
}
/**
* Refresh (re-login) all UGIs in the cache.
*/
def refreshAllUGIs(): Unit = {
cache.foreach { case (hdfsPath, entry) =>
try {
entry.ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
UserGroupInformation.getCurrentUser.forceReloginFromKeytab()
logger.info(s"Kerberos ticket force renewed for HDFS Path: $hdfsPath, Principal: ${entry.principal}")
}
})
} catch {
case ex: Exception =>
logger.error(s"Failed to renew Kerberos ticket for HDFS Path: $hdfsPath, Principal: ${entry.principal}. Error: ${ex.getMessage}")
}
}
}
}
I have to wait for 7 days to see if the logic is actually working