Using play framewok and akka to map diagnostic context records in Java

I'm trying MDC logging all requests in Java. I follow Scala in this tutorial and try to convert to Java http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework/

However, MDC is still not propagated to all execution contexts I use this dispatcher as the default scheduler, but it has many execution contexts I need MDC to propagate to all execution contexts

Here is my java code

import java.util.Map;

import org.slf4j.MDC;

import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.dispatch.Dispatcher;
import akka.dispatch.ExecutorServiceFactoryProvider;
import akka.dispatch.MessageDispatcherConfigurator;

public class MDCPropagatingDispatcher extends Dispatcher {
    public MDCPropagatingDispatcher(
            MessageDispatcherConfigurator _configurator,String id,int throughput,Duration throughputDeadlineTime,ExecutorServiceFactoryProvider executorServiceFactoryProvider,FiniteDuration shutdownTimeout) {
        super(_configurator,id,throughput,throughputDeadlineTime,executorServiceFactoryProvider,shutdownTimeout);

    }

    @Override
    public ExecutionContext prepare() {
        final Map<String,String> mdcContext = MDC.getCopyOfContextMap();
        return new ExecutionContext() {

            @Override
            public void execute(Runnable r) {
                Map<String,String> oldMDCContext = MDC.getCopyOfContextMap();
                setContextMap(mdcContext);
                try {
                    r.run();
                } finally {
                    setContextMap(oldMDCContext);
                }
            }

            @Override
            public ExecutionContext prepare() {
                return this;
            }

            @Override
            public void reportFailure(Throwable t) {
                play.Logger.info("error occured in dispatcher");
            }

        };
    }

    private void setContextMap(Map<String,String> context) {
        if (context == null) {
            MDC.clear();
        } else {
            play.Logger.info("set context "+ context.toString());
            MDC.setContextMap(context);
        }
    }
}



import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import com.typesafe.config.Config;

import akka.dispatch.DispatcherPrerequisites;
import akka.dispatch.MessageDispatcher;
import akka.dispatch.MessageDispatcherConfigurator;

public class MDCPropagatingDispatcherConfigurator extends
        MessageDispatcherConfigurator {
    private MessageDispatcher instance;

    public MDCPropagatingDispatcherConfigurator(Config config,DispatcherPrerequisites prerequisites) {
        super(config,prerequisites);
        Duration throughputDeadlineTime = new FiniteDuration(-1,TimeUnit.MILLISECONDS);
        FiniteDuration shutDownDuration = new FiniteDuration(1,TimeUnit.MILLISECONDS);
        instance = new MDCPropagatingDispatcher(this,"play.akka.actor.contexts.play-filter-context",100,configureExecutor(),shutDownDuration);
    }

    public MessageDispatcher dispatcher() {
        return instance;
    }

}

Filter interceptor

public class MdcLogFilter implements EssentialFilter {
@Override
public EssentialAction apply(final EssentialAction next) {
    return new MdcLogAction() {
        @Override
        public Iteratee<byte[],SimpleResult> apply(
                final RequestHeader requestHeader) {
            final String  uuid = Utils.generateRandomUUID();
            MDC.put("uuid",uuid);
            play.Logger.info("request started"+uuid);
            final ExecutionContext playFilterContext = Akka.system()
                    .dispatchers()
                    .lookup("play.akka.actor.contexts.play-custom-filter-context");
            return next.apply(requestHeader).map(
                    new AbstractFunction1<SimpleResult,SimpleResult>() {
                        @Override
                        public SimpleResult apply(SimpleResult simpleResult) {
                            play.Logger.info("request ended"+uuid);
                            MDC.remove("uuid");
                            return simpleResult;
                        }
                    },playFilterContext);

        }

        @Override
        public EssentialAction apply() {
            return next.apply();
        }
    };
}

}

Solution

The following is my solution, which has been proved in real life It is in Scala, not play, but scalatra, but its basic concept is the same I hope you can find out how to port it to Java

import org.slf4j.MDC
import java.util.{Map => JMap}
import scala.concurrent.{ExecutionContextExecutor,ExecutionContext}

object MDCHttpExecutionContext {

  def fromExecutionContextWithCurrentMDC(delegate: ExecutionContext): ExecutionContextExecutor =
    new MDCHttpExecutionContext(MDC.getCopyOfContextMap(),delegate)
}

class MDCHttpExecutionContext(mdcContext: JMap[String,String],delegate: ExecutionContext)
  extends ExecutionContextExecutor {

  def execute(runnable: Runnable): Unit = {
    val callingThreadMDC = MDC.getCopyOfContextMap()
    delegate.execute(new Runnable {
      def run() {
        val currentThreadMDC = MDC.getCopyOfContextMap()
        setContextMap(callingThreadMDC)
        try {
          runnable.run()
        } finally {
          setContextMap(currentThreadMDC)
        }
      }
    })
  }

  private[this] def setContextMap(context: JMap[String,String]): Unit = {
    Option(context) match {
      case Some(ctx) => {
        MDC.setContextMap(context)
      }
      case None => {
        MDC.clear()
      }
    }
  }

  def reportFailure(t: Throwable): Unit = delegate.reportFailure(t)
}

You must ensure that this ExecutionContext is used in all asynchronous calls I do this through dependency injection, but in different ways This is what I do with subcut:

bind[ExecutionContext] idBy BindingIds.GlobalExecutionContext toSingle {
    MDCHttpExecutionContext.fromExecutionContextWithCurrentMDC(
      ExecutionContext.fromExecutorService(
        Executors.newFixedThreadPool(globalThreadPoolSize)
      )
    )
  }

The idea behind this approach is as follows MDC uses thread local storage to obtain attributes and their values If your single request can run on multiple threads, you need to ensure that the new thread you start uses the correct MDC To do this, you create a custom executor to ensure that the MDC value is correctly copied to the new thread before starting the task you assign to it You must also ensure that when a thread completes its task and continues to perform other operations, the old value is placed in its MDC, because threads from the pool can switch between different requests

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>