Index: src/test/java/kafka/WindowsEmbeddedKafkaRule.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision Shelved Version) +++ src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision Shelved Version) @@ -0,0 +1,98 @@ +package kafka; + +import kafka.server.KafkaServer; +import kafka.server.NotRunning; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; + +import java.io.File; +import java.io.IOException; + +/** + * Fixes delete failure bug on Windows. + *
+ * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on + * Windows. + *
+ * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close(). + *
+ * In addition, the number of try-catch blocks in the original implementation have been refactored to + * the {@link #swallow(SimpleFunction)} method. + * + * The solution is based on following Gist: + * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628 + */ +public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule { + + public WindowsEmbeddedKafkaRule(int count) { + super(count); + } + + public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { + super(count, controlledShutdown, topics); + } + + public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { + super(count, controlledShutdown, partitions, topics); + } + + @Override + public void before() { + /* really brutal hack for logs of topic partitions that have been received messages */ + for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) { + if (file.getName().startsWith("kafka-")) { + swallow(() -> Utils.delete(file)); + } + } + super.before(); + } + + @Override + public void after() { + EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka(); + System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS); + System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT); + for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) { + swallow(() -> shutdown(kafkaServer)); + swallow(() -> deleteLogDir(kafkaServer)); + } + swallow(this::closeZkClient); + swallow(this::shutdownZookeeper); + } + + private void shutdown(KafkaServer kafkaServer) { + if (kafkaServer.brokerState().currentState() != (NotRunning.state())) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + } + } + + private void deleteLogDir(KafkaServer kafkaServer) { + // no need to make this delete here because of the shutdown hook in Testutils + // CoreUtils.delete(kafkaServer.config().logDirs()); + } + + private void closeZkClient() { + getEmbeddedKafka().getZkClient().close(); + } + + private void shutdownZookeeper() throws IOException { + getEmbeddedKafka().getZookeeper().zookeeper().getZKDatabase().close(); + getEmbeddedKafka().getZookeeper().shutdown(); + } + + private void swallow(SimpleFunction function) { + try { + function.execute(); + } catch (Exception e) { + // do nothing + } + } + + @FunctionalInterface + interface SimpleFunction { + void execute() throws Exception; + } +}