EpisimRunner.java
/*-
* #%L
* MATSim Episim
* %%
* Copyright (C) 2020 matsim-org
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
package org.matsim.episim;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.apache.commons.compress.archivers.*;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.matsim.api.core.v01.events.Event;
import org.matsim.core.api.experimental.events.EventsManager;
import org.matsim.core.config.Config;
import org.matsim.core.config.ConfigUtils;
import org.matsim.core.controler.ControlerUtils;
import org.matsim.core.gbl.Gbl;
import org.matsim.episim.model.AntibodyModel;
import org.matsim.episim.model.ProgressionModel;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.DayOfWeek;
import java.util.List;
import java.util.Map;
/**
* Main entry point and runner of one epidemic simulation.
* <p>
* Using the {@link #run(int)} method, this class will repeatedly loop over {@link InfectionEventHandler} with
* events provided by the {@link ReplayHandler}.
*/
public final class EpisimRunner {
private static final Logger log = LogManager.getLogger(EpisimRunner.class);
private final Config config;
private final EventsManager manager;
private final Provider<InfectionEventHandler> handlerProvider;
private final Provider<ReplayHandler> replayProvider;
private final Provider<EpisimReporting> reportingProvider;
private final Provider<ProgressionModel> progressionProvider;
private final Provider<AntibodyModel> antibodyModelProvider;
@Inject
public EpisimRunner(Config config, EventsManager manager, Provider<InfectionEventHandler> handlerProvider, Provider<ReplayHandler> replay,
Provider<EpisimReporting> reportingProvider, Provider<ProgressionModel> progressionProvider, Provider<AntibodyModel> antibodyModelProvider) {
this.config = config;
this.handlerProvider = handlerProvider;
this.manager = manager;
this.replayProvider = replay;
this.reportingProvider = reportingProvider;
this.progressionProvider = progressionProvider;
this.antibodyModelProvider = antibodyModelProvider;
}
/**
* Main loop that performs the iterations of the simulation.
*
* @param maxIterations maximum number of iterations (inclusive)
*/
public void run(int maxIterations) {
// Construct these dependencies as late as possible, so all other configs etc have been fully configured
final ReplayHandler replay = replayProvider.get();
final InfectionEventHandler handler = handlerProvider.get();
final EpisimReporting reporting = reportingProvider.get();
final AntibodyModel antibodyModel = antibodyModelProvider.get();
reporting.reportCpuTime(0, "Init", "start", -1);
// reporting will write events if necessary
EpisimConfigGroup episimConfig = ConfigUtils.addOrGetModule(config, EpisimConfigGroup.class);
if (episimConfig.getWriteEvents() != EpisimConfigGroup.WriteEvents.none)
manager.addHandler(reporting);
ControlerUtils.checkConfigConsistencyAndWriteToLog(config, "Just before starting iterations");
handler.init(replay.getEvents());
Path output = Path.of(config.controler().getOutputDirectory());
int iteration = 1;
if (episimConfig.getStartFromSnapshot() != null && episimConfig.getStartFromImmunization() != null) {
throw new RuntimeException("Cannot start from snapshot and immunization history simultaneously. Choose one.");
} else if (episimConfig.getStartFromSnapshot() != null) {
reporting.close();
iteration = readSnapshot(output, Path.of(episimConfig.getStartFromSnapshot()));
try {
reporting.append(episimConfig.getStartDate().plusDays(iteration - 1).toString());
} catch (IOException e) {
log.error("Snapshot output could not be created", e);
return;
}
handler.onSnapshotLoaded(iteration);
// recalculate antibodies for every agent if starting from snapshot.
// The antibodies profile is generated using the immunity event history in the
// snapshot; the antibody model config of the snapshot simulation will
// be superceded by the config of the current simulation. Thus, the antibody development
// during the snapshot can be rewritten without modifying the immunity event history.
antibodyModel.recalculateAntibodiesAfterSnapshot(handler.getPersons(), iteration);
} else if (episimConfig.getStartFromImmunization() != null) {
antibodyModel.init(handler.getPersons(), iteration);
handler.initImmunization(Path.of(episimConfig.getStartFromImmunization()));
} else {
antibodyModel.init(handler.getPersons(), iteration);
}
reporting.reportCpuTime(0, "Init", "finished", -1);
log.info("Starting from iteration {}...", iteration);
for (; iteration <= maxIterations; iteration++) {
if (episimConfig.getSnapshotInterval() > 0 && iteration % episimConfig.getSnapshotInterval() == 0) {
writeSnapshot(output, iteration);
}
if (iteration % 10 == 0)
Gbl.printMemoryUsage();
if (!doStep(replay, handler, reporting, iteration))
break;
}
handler.finish();
reporting.close();
}
/**
* Update events data and internal person data structure.
*
* @param events
*/
public void updateEvents(Map<DayOfWeek, List<Event>> events) {
ReplayHandler replay = replayProvider.get();
replay.setEvents(events);
InfectionEventHandler handler = handlerProvider.get();
handler.updateEvents(events);
}
/**
* Reads and updates events as defined in given config.
*/
public void updateEvents(EpisimConfigGroup config) {
ReplayHandler replay = replayProvider.get();
Map<DayOfWeek, List<Event>> events = replay.readEvents(config);
updateEvents(events);
}
/**
* Perform one iteration of simulation.
*
* @return false, when the simulation should end
*/
boolean doStep(final ReplayHandler replay, final InfectionEventHandler handler, final EpisimReporting reporting, int iteration) {
manager.resetHandlers(iteration);
handler.reset(iteration);
EpisimConfigGroup episimConfig = ConfigUtils.addOrGetModule(this.config, EpisimConfigGroup.class);
if (episimConfig.isEndEarly() && handler.isFinished())
return false;
DayOfWeek day = EpisimUtils.getDayOfWeek(episimConfig, iteration);
// Process all events
replay.replayEvents(handler, day);
reporting.flushEvents();
return true;
}
/**
* Write snapshot into output directory.
*
* @param output target output directory
* @param iteration current iteration
*/
private void writeSnapshot(Path output, int iteration) {
InfectionEventHandler handler = handlerProvider.get();
EpisimReporting reporting = reportingProvider.get();
ProgressionModel progressionModel = progressionProvider.get();
EpisimConfigGroup episimConfig = ConfigUtils.addOrGetModule(config, EpisimConfigGroup.class);
String date = episimConfig.getStartDate().plusDays(iteration - 1).toString();
Path path = output.resolve(episimConfig.getSnapshotPrefix() + String.format("-%03d-%s.zip", iteration, date));
log.info("Writing snapshot to {}", path);
try (var out = Files.newOutputStream(path)) {
ArchiveOutputStream archive = new ArchiveStreamFactory()
.createArchiveOutputStream("zip", out);
// Copy whole output to the snapshot
EpisimUtils.compressDirectory(output.toString(), output.toString(), config.controler().getRunId(), archive);
archive.putArchiveEntry(new ZipArchiveEntry("iteration"));
ObjectOutputStream oos = new ObjectOutputStream(archive);
oos.writeInt(iteration);
oos.flush();
archive.closeArchiveEntry();
writeObject(handler, "state", archive);
writeObject(reporting, "reporting", archive);
if (progressionModel instanceof Externalizable)
writeObject((Externalizable) progressionModel, "progression", archive);
archive.finish();
archive.close();
} catch (IOException | ArchiveException e) {
log.error("Could not write snapshot", e);
}
log.info("Snapshot for day {} written successfully", iteration);
}
/**
* Read snapshot from disk and initialize simulation state
*
* @param path path to snapshot archive
* @return starting iteration
*/
private int readSnapshot(Path output, Path path) {
if (!Files.exists(path))
throw new IllegalArgumentException("Snapshot " + path + " does not exist.");
InfectionEventHandler handler = handlerProvider.get();
EpisimReporting reporting = reportingProvider.get();
ProgressionModel progressionModel = progressionProvider.get();
int iteration = -1;
try (var in = Files.newInputStream(path)) {
ArchiveInputStream archive = new ArchiveStreamFactory()
.createArchiveInputStream("zip", in);
log.info("Copying output from {} into {}", path, output);
ArchiveEntry entry;
while ((entry = archive.getNextEntry()) != null) {
String name = entry.getName();
// copy to output
if (name.startsWith("output"))
Files.copy(archive, output.resolve(name.replace("output/", "")), StandardCopyOption.REPLACE_EXISTING);
if (name.equals("iteration")) {
ObjectInputStream ois = new ObjectInputStream(archive);
iteration = ois.readInt();
}
if (name.equals("state")) {
ObjectInputStream ois = new ObjectInputStream(archive);
handler.readExternal(ois);
}
if (name.equals("reporting")) {
ObjectInputStream ois = new ObjectInputStream(archive);
reporting.readExternal(ois);
}
if (name.equals("progression")) {
ObjectInputStream ois = new ObjectInputStream(archive);
if (progressionModel instanceof Externalizable)
((Externalizable) progressionModel).readExternal(ois);
else
log.warn("Progression state present, but model is not Externalizable");
}
}
archive.close();
return iteration;
} catch (IOException | ArchiveException | ClassNotFoundException e) {
throw new IllegalStateException("Could not read snapshot", e);
}
}
/**
* Helper method to write object into archive,
*/
private void writeObject(Externalizable obj, String name, ArchiveOutputStream archive) throws IOException {
archive.putArchiveEntry(new ZipArchiveEntry(name));
ObjectOutputStream oos = new ObjectOutputStream(archive);
obj.writeExternal(oos);
oos.flush();
archive.closeArchiveEntry();
}
}