View Issue Details
ID | Project | Category | View Status | Date Submitted | Last Update |
---|---|---|---|---|---|
0000010 | KafkaEsque | Improvement | public | 2018-11-25 22:13 | 2018-11-27 20:57 |
Reporter | Oliver G. | Assigned To | patschuh | ||
Priority | normal | Severity | tweak | Reproducibility | N/A |
Status | resolved | Resolution | fixed | ||
Product Version | |||||
Target Version | 0.12.0 | Fixed in Version | 0.12.0 | ||
Summary | 0000010: WindowsEmbeddedKafka Patch | ||||
Description | Patch ist im Anhang.
| ||||
Tags | No tags attached. | ||||
WindowsEmbeddedKafka.patch (3,893 bytes)
Index: src/test/java/kafka/WindowsEmbeddedKafkaRule.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision Shelved Version) +++ src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision Shelved Version) @@ -0,0 +1,98 @@ +package kafka; + +import kafka.server.KafkaServer; +import kafka.server.NotRunning; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; + +import java.io.File; +import java.io.IOException; + +/** + * Fixes delete failure bug on Windows. + * <p> + * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on + * Windows. + * <p> + * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close(). + * <p> + * In addition, the number of try-catch blocks in the original implementation have been refactored to + * the {@link #swallow(SimpleFunction)} method. + * + * The solution is based on following Gist: + * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628 + */ +public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule { + + public WindowsEmbeddedKafkaRule(int count) { + super(count); + } + + public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { + super(count, controlledShutdown, topics); + } + + public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { + super(count, controlledShutdown, partitions, topics); + } + + @Override + public void before() { + /* really brutal hack for logs of topic partitions that have been received messages */ + for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) { + if (file.getName().startsWith("kafka-")) { + swallow(() -> Utils.delete(file)); + } + } + super.before(); + } + + @Override + public void after() { + EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka(); + System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS); + System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT); + for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) { + swallow(() -> shutdown(kafkaServer)); + swallow(() -> deleteLogDir(kafkaServer)); + } + swallow(this::closeZkClient); + swallow(this::shutdownZookeeper); + } + + private void shutdown(KafkaServer kafkaServer) { + if (kafkaServer.brokerState().currentState() != (NotRunning.state())) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + } + } + + private void deleteLogDir(KafkaServer kafkaServer) { + // no need to make this delete here because of the shutdown hook in Testutils + // CoreUtils.delete(kafkaServer.config().logDirs()); + } + + private void closeZkClient() { + getEmbeddedKafka().getZkClient().close(); + } + + private void shutdownZookeeper() throws IOException { + getEmbeddedKafka().getZookeeper().zookeeper().getZKDatabase().close(); + getEmbeddedKafka().getZookeeper().shutdown(); + } + + private void swallow(SimpleFunction function) { + try { + function.execute(); + } catch (Exception e) { + // do nothing + } + } + + @FunctionalInterface + interface SimpleFunction { + void execute() throws Exception; + } +} |
|
Attaching patch 2nd try.
WindowsEmbeddedKafka1.patch (6,104 bytes)
Index: pom.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- pom.xml (revision 5403c717d55ee18a5cd03a66103c8b6db2aa29f5) +++ pom.xml (revision ) @@ -72,6 +72,13 @@ <version>2.2.0.RELEASE</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.code.tempus-fugit</groupId> + <artifactId>tempus-fugit</artifactId> + <version>1.1</version> + <scope>test</scope> + </dependency> + </dependencies> <build> Index: src/test/java/kafka/StartEmbeddedKafka.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/java/kafka/StartEmbeddedKafka.java (revision 5403c717d55ee18a5cd03a66103c8b6db2aa29f5) +++ src/test/java/kafka/StartEmbeddedKafka.java (revision ) @@ -5,26 +5,18 @@ import org.junit.Test; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; +import static com.google.code.tempusfugit.concurrency.ThreadUtils.sleep; +import static com.google.code.tempusfugit.temporal.Duration.seconds; @Ignore("currently just started for quick manual testing") public class StartEmbeddedKafka { - /* - * Info: Windows folder deletion bug seems to be still there, so following Gist might be still relevant - * - * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628 - * - */ @ClassRule - public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "test.me"); + public static EmbeddedKafkaRule embeddedKafkaRule = + new WindowsEmbeddedKafkaRule(1, true, "test.me").kafkaPorts(59000); @Test public void startKafkaForManualTestsLaterWriterRealTests() { - try { - System.out.println(embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString()); - Thread.sleep(120000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + sleep(seconds(60)); } } Index: src/test/java/kafka/WindowsEmbeddedKafkaRule.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision ) +++ src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision ) @@ -0,0 +1,98 @@ +package kafka; + +import kafka.server.KafkaServer; +import kafka.server.NotRunning; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; + +import java.io.File; +import java.io.IOException; + +/** + * Fixes delete failure bug on Windows. + * <p> + * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on + * Windows. + * <p> + * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close(). + * <p> + * In addition, the number of try-catch blocks in the original implementation have been refactored to + * the {@link #swallow(SimpleFunction)} method. + * + * The solution is based on following Gist: + * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628 + */ +public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule { + + public WindowsEmbeddedKafkaRule(int count) { + super(count); + } + + public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { + super(count, controlledShutdown, topics); + } + + public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { + super(count, controlledShutdown, partitions, topics); + } + + @Override + public void before() { + /* really brutal hack for logs of topic partitions that have been received messages */ + for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) { + if (file.getName().startsWith("kafka-")) { + swallow(() -> Utils.delete(file)); + } + } + super.before(); + } + + @Override + public void after() { + EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka(); + System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS); + System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT); + for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) { + swallow(() -> shutdown(kafkaServer)); + swallow(() -> deleteLogDir(kafkaServer)); + } + swallow(this::closeZkClient); + swallow(this::shutdownZookeeper); + } + + private void shutdown(KafkaServer kafkaServer) { + if (kafkaServer.brokerState().currentState() != (NotRunning.state())) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + } + } + + private void deleteLogDir(KafkaServer kafkaServer) { + // no need to make this delete here because of the shutdown hook in Testutils + // CoreUtils.delete(kafkaServer.config().logDirs()); + } + + private void closeZkClient() { + getEmbeddedKafka().getZkClient().close(); + } + + private void shutdownZookeeper() throws IOException { + getEmbeddedKafka().getZookeeper().zookeeper().getZKDatabase().close(); + getEmbeddedKafka().getZookeeper().shutdown(); + } + + private void swallow(SimpleFunction function) { + try { + function.execute(); + } catch (Exception e) { + // do nothing + } + } + + @FunctionalInterface + interface SimpleFunction { + void execute() throws Exception; + } +} |
|
Reviewed and applied patch |
|
Date Modified | Username | Field | Change |
---|---|---|---|
2018-11-25 22:13 | Oliver G. | New Issue | |
2018-11-25 22:13 | Oliver G. | File Added: WindowsEmbeddedKafka.patch | |
2018-11-26 06:39 | patschuh | Target Version | => 0.12.0 |
2018-11-26 06:39 | patschuh | Description Updated | View Revisions |
2018-11-26 20:20 | Oliver G. | File Added: WindowsEmbeddedKafka1.patch | |
2018-11-26 20:20 | Oliver G. | Note Added: 0000011 | |
2018-11-27 20:56 | patschuh | Changeset attached | => KafkaEsque master 43f2bc6e |
2018-11-27 20:57 | patschuh | Assigned To | => patschuh |
2018-11-27 20:57 | patschuh | Status | new => resolved |
2018-11-27 20:57 | patschuh | Resolution | open => fixed |
2018-11-27 20:57 | patschuh | Fixed in Version | => 0.12.0 |
2018-11-27 20:57 | patschuh | Note Added: 0000012 |