AsyncEpisimWriter.java

package org.matsim.episim.reporting;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import com.lmax.disruptor.util.Util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.matsim.api.core.v01.events.Event;

import java.io.Closeable;
import java.io.IOException;
import java.io.Writer;

/**
 * Overwrites the default episim writer to do all IO in an extra thread using the {@link Disruptor} library.
 */
public final class AsyncEpisimWriter extends EpisimWriter implements EventHandler<AsyncEpisimWriter.LogEvent>,
		EventTranslatorThreeArg<AsyncEpisimWriter.LogEvent, Writer, Event, Double>, Closeable {

	private static final Logger log = LogManager.getLogger(AsyncEpisimWriter.class);
	private final Disruptor<LogEvent> disruptor;
	private final StringEventTranslator translator = new StringEventTranslator();
	private final StringArrayEventTranslator arrayTranslator = new StringArrayEventTranslator();

	/**
	 * Constructor.
	 *
	 * @param numProducer Expected number of producer. Does not need to be exact, but has to be larger 1 if there are multiple.
	 */
	public AsyncEpisimWriter(int numProducer) {

		// Specify the size of the ring buffer, must be power of 2.
		int bufferSize = Math.max(16384, Util.ceilingNextPowerOfTwo(4096 * numProducer));

		disruptor = new Disruptor<>(LogEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new SleepingWaitStrategy());

		// Connect the handler
		disruptor.handleEventsWith(this);

		log.info("Using async writer with producer={}, bufferSize={}", numProducer, bufferSize);

		disruptor.start();
	}

	@Override
	public void append(Writer writer, String[] array) {
		disruptor.publishEvent(arrayTranslator, writer, array);
	}

	@Override
	public void append(Writer writer, String content) {
		disruptor.publishEvent(translator, writer, content, false);
	}

	@Override
	public void append(Writer writer, Event event) {
		disruptor.publishEvent(this, writer, event, -1d);
	}

	@Override
	public void append(Writer writer, Event event, double correctedTime) {
		disruptor.publishEvent(this, writer, event, correctedTime);
	}

	@Override
	public void close(Writer writer) {
		disruptor.publishEvent(translator, writer, null, true);
	}

	@Override
	public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception {

		if (event.close) {
			event.writer.close();
		} else {
			event.writer.append(event.content);
			// Flushing is not enabled
			// Events are async anyway, flushing does not make much sense
			//if (event.flush)
			//	event.writer.flush();
		}

		event.reset();
	}

	@Override
	public void translateTo(LogEvent event, long sequence, Writer arg0, Event arg1, Double arg2) {
		event.writer = arg0;
		event.flush = false;
		try {
			EpisimWriter.writeEvent(event.content, arg1, arg2);
		} catch (IOException e) {
			log.error("Could not append event");
		}
	}

	@Override
	public void close() throws IOException {
		log.info("Shutting down...");
		disruptor.shutdown();
	}

	protected static class LogEvent {

		private static final int BUFFER_SIZE = 120;

		private final StringBuilder content = new StringBuilder(BUFFER_SIZE);
		private Writer writer;
		private boolean close = false;
		private boolean flush = true;

		private void reset() {
			close = false;
			flush = true;
			if (content.capacity() > BUFFER_SIZE) {
				content.setLength(BUFFER_SIZE);
				content.trimToSize();
			}

			content.setLength(0);
		}
	}

	/**
	 * Copy the string to buffer and also set close attribute.
	 */
	public static final class StringEventTranslator implements EventTranslatorThreeArg<LogEvent, Writer, String, Boolean> {

		@Override
		public void translateTo(LogEvent event, long sequence, Writer arg0, String arg1, Boolean arg2) {

			event.writer = arg0;
			if (arg2) {
				event.close = true;
			} else {
				event.content.append(arg1);
				event.flush = false;
			}
		}
	}

	/**
	 * Convert MATSim event to log event.
	 */
	public static final class StringArrayEventTranslator implements EventTranslatorTwoArg<LogEvent, Writer, String[]> {

		/**
		 * Write one line with content separated by separator.
		 */
		@Override
		public void translateTo(LogEvent event, long sequence, Writer arg0, String[] arg1) {
			event.writer = arg0;

			for (int i = 0; i < arg1.length; i++) {
				event.content.append(arg1[i]);
				if (i < arg1.length - 1) event.content.append(SEPARATOR);
			}

			event.content.append("\n");
		}

	}

}