Using play framewok and akka to map diagnostic context records in Java
I'm trying MDC logging all requests in Java. I follow Scala in this tutorial and try to convert to Java http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework/
However, MDC is still not propagated to all execution contexts I use this dispatcher as the default scheduler, but it has many execution contexts I need MDC to propagate to all execution contexts
Here is my java code
import java.util.Map; import org.slf4j.MDC; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import akka.dispatch.Dispatcher; import akka.dispatch.ExecutorServiceFactoryProvider; import akka.dispatch.MessageDispatcherConfigurator; public class MDCPropagatingDispatcher extends Dispatcher { public MDCPropagatingDispatcher( MessageDispatcherConfigurator _configurator,String id,int throughput,Duration throughputDeadlineTime,ExecutorServiceFactoryProvider executorServiceFactoryProvider,FiniteDuration shutdownTimeout) { super(_configurator,id,throughput,throughputDeadlineTime,executorServiceFactoryProvider,shutdownTimeout); } @Override public ExecutionContext prepare() { final Map<String,String> mdcContext = MDC.getCopyOfContextMap(); return new ExecutionContext() { @Override public void execute(Runnable r) { Map<String,String> oldMDCContext = MDC.getCopyOfContextMap(); setContextMap(mdcContext); try { r.run(); } finally { setContextMap(oldMDCContext); } } @Override public ExecutionContext prepare() { return this; } @Override public void reportFailure(Throwable t) { play.Logger.info("error occured in dispatcher"); } }; } private void setContextMap(Map<String,String> context) { if (context == null) { MDC.clear(); } else { play.Logger.info("set context "+ context.toString()); MDC.setContextMap(context); } } } import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import com.typesafe.config.Config; import akka.dispatch.DispatcherPrerequisites; import akka.dispatch.MessageDispatcher; import akka.dispatch.MessageDispatcherConfigurator; public class MDCPropagatingDispatcherConfigurator extends MessageDispatcherConfigurator { private MessageDispatcher instance; public MDCPropagatingDispatcherConfigurator(Config config,DispatcherPrerequisites prerequisites) { super(config,prerequisites); Duration throughputDeadlineTime = new FiniteDuration(-1,TimeUnit.MILLISECONDS); FiniteDuration shutDownDuration = new FiniteDuration(1,TimeUnit.MILLISECONDS); instance = new MDCPropagatingDispatcher(this,"play.akka.actor.contexts.play-filter-context",100,configureExecutor(),shutDownDuration); } public MessageDispatcher dispatcher() { return instance; } }
Filter interceptor
public class MdcLogFilter implements EssentialFilter { @Override public EssentialAction apply(final EssentialAction next) { return new MdcLogAction() { @Override public Iteratee<byte[],SimpleResult> apply( final RequestHeader requestHeader) { final String uuid = Utils.generateRandomUUID(); MDC.put("uuid",uuid); play.Logger.info("request started"+uuid); final ExecutionContext playFilterContext = Akka.system() .dispatchers() .lookup("play.akka.actor.contexts.play-custom-filter-context"); return next.apply(requestHeader).map( new AbstractFunction1<SimpleResult,SimpleResult>() { @Override public SimpleResult apply(SimpleResult simpleResult) { play.Logger.info("request ended"+uuid); MDC.remove("uuid"); return simpleResult; } },playFilterContext); } @Override public EssentialAction apply() { return next.apply(); } }; }
}
Solution
The following is my solution, which has been proved in real life It is in Scala, not play, but scalatra, but its basic concept is the same I hope you can find out how to port it to Java
import org.slf4j.MDC import java.util.{Map => JMap} import scala.concurrent.{ExecutionContextExecutor,ExecutionContext} object MDCHttpExecutionContext { def fromExecutionContextWithCurrentMDC(delegate: ExecutionContext): ExecutionContextExecutor = new MDCHttpExecutionContext(MDC.getCopyOfContextMap(),delegate) } class MDCHttpExecutionContext(mdcContext: JMap[String,String],delegate: ExecutionContext) extends ExecutionContextExecutor { def execute(runnable: Runnable): Unit = { val callingThreadMDC = MDC.getCopyOfContextMap() delegate.execute(new Runnable { def run() { val currentThreadMDC = MDC.getCopyOfContextMap() setContextMap(callingThreadMDC) try { runnable.run() } finally { setContextMap(currentThreadMDC) } } }) } private[this] def setContextMap(context: JMap[String,String]): Unit = { Option(context) match { case Some(ctx) => { MDC.setContextMap(context) } case None => { MDC.clear() } } } def reportFailure(t: Throwable): Unit = delegate.reportFailure(t) }
You must ensure that this ExecutionContext is used in all asynchronous calls I do this through dependency injection, but in different ways This is what I do with subcut:
bind[ExecutionContext] idBy BindingIds.GlobalExecutionContext toSingle { MDCHttpExecutionContext.fromExecutionContextWithCurrentMDC( ExecutionContext.fromExecutorService( Executors.newFixedThreadPool(globalThreadPoolSize) ) ) }
The idea behind this approach is as follows MDC uses thread local storage to obtain attributes and their values If your single request can run on multiple threads, you need to ensure that the new thread you start uses the correct MDC To do this, you create a custom executor to ensure that the MDC value is correctly copied to the new thread before starting the task you assign to it You must also ensure that when a thread completes its task and continues to perform other operations, the old value is placed in its MDC, because threads from the pool can switch between different requests