View Issue Details

IDProjectCategoryView StatusLast Update
0000010KafkaEsqueImprovementpublic2018-11-27 20:57
ReporterOliver G.Assigned Topatschuh 
PrioritynormalSeveritytweakReproducibilityN/A
Status resolvedResolutionfixed 
Product Version 
Target Version0.12.0Fixed in Version0.12.0 
Summary0000010: WindowsEmbeddedKafka Patch
Description

Patch ist im Anhang.

  • Broker Port ist Fix und nicht mehr random.
  • Hacks, damit Kafka Ordner unter Windows gelöscht werden.
  • Tempus Fugit mit Test Scope.
TagsNo tags attached.

Activities

Oliver G.

Oliver G.

2018-11-25 22:13

reporter  

WindowsEmbeddedKafka.patch (3,893 bytes)
Index: src/test/java/kafka/WindowsEmbeddedKafkaRule.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/test/java/kafka/WindowsEmbeddedKafkaRule.java	(revision Shelved Version)
+++ src/test/java/kafka/WindowsEmbeddedKafkaRule.java	(revision Shelved Version)
@@ -0,0 +1,98 @@
+package kafka;
+
+import kafka.server.KafkaServer;
+import kafka.server.NotRunning;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Fixes delete failure bug on Windows.
+ * <p>
+ * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on
+ * Windows.
+ * <p>
+ * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close().
+ * <p>
+ * In addition, the number of try-catch blocks in the original implementation have been refactored to
+ * the {@link #swallow(SimpleFunction)} method.
+ *
+ * The solution is based on following Gist:
+ * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628
+ */
+public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule {
+
+    public WindowsEmbeddedKafkaRule(int count) {
+        super(count);
+    }
+
+    public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) {
+        super(count, controlledShutdown, topics);
+    }
+
+    public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) {
+        super(count, controlledShutdown, partitions, topics);
+    }
+
+    @Override
+    public void before() {
+        /* really brutal hack for logs of topic partitions that have been received messages */
+        for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) {
+            if (file.getName().startsWith("kafka-")) {
+                swallow(() -> Utils.delete(file));
+            }
+        }
+        super.before();
+    }
+
+    @Override
+    public void after() {
+        EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka();
+        System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS);
+        System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
+        for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) {
+            swallow(() -> shutdown(kafkaServer));
+            swallow(() -> deleteLogDir(kafkaServer));
+        }
+        swallow(this::closeZkClient);
+        swallow(this::shutdownZookeeper);
+    }
+
+    private void shutdown(KafkaServer kafkaServer) {
+        if (kafkaServer.brokerState().currentState() != (NotRunning.state())) {
+            kafkaServer.shutdown();
+            kafkaServer.awaitShutdown();
+        }
+    }
+
+    private void deleteLogDir(KafkaServer kafkaServer) {
+        // no need to make this delete here because of the shutdown hook in Testutils
+        // CoreUtils.delete(kafkaServer.config().logDirs());
+    }
+
+    private void closeZkClient() {
+        getEmbeddedKafka().getZkClient().close();
+    }
+
+    private void shutdownZookeeper() throws IOException {
+        getEmbeddedKafka().getZookeeper().zookeeper().getZKDatabase().close();
+        getEmbeddedKafka().getZookeeper().shutdown();
+    }
+
+    private void swallow(SimpleFunction function) {
+        try {
+            function.execute();
+        } catch (Exception e) {
+            // do nothing
+        }
+    }
+
+    @FunctionalInterface
+    interface SimpleFunction {
+        void execute() throws Exception;
+    }
+}
Oliver G.

Oliver G.

2018-11-26 20:20

reporter   ~0000011

Attaching patch 2nd try.



WindowsEmbeddedKafka1.patch (6,104 bytes)
Index: pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- pom.xml	(revision 5403c717d55ee18a5cd03a66103c8b6db2aa29f5)
+++ pom.xml	(revision )
@@ -72,6 +72,13 @@
             <version>2.2.0.RELEASE</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.tempus-fugit</groupId>
+            <artifactId>tempus-fugit</artifactId>
+            <version>1.1</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
Index: src/test/java/kafka/StartEmbeddedKafka.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/test/java/kafka/StartEmbeddedKafka.java	(revision 5403c717d55ee18a5cd03a66103c8b6db2aa29f5)
+++ src/test/java/kafka/StartEmbeddedKafka.java	(revision )
@@ -5,26 +5,18 @@
 import org.junit.Test;
 import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
 
+import static com.google.code.tempusfugit.concurrency.ThreadUtils.sleep;
+import static com.google.code.tempusfugit.temporal.Duration.seconds;
 
 @Ignore("currently just started for quick manual testing")
 public class StartEmbeddedKafka {
 
-    /*
-     * Info: Windows folder deletion bug seems to be still there, so following Gist might be still relevant
-     *
-     * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628
-     *
-     */
     @ClassRule
-    public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "test.me");
+    public static EmbeddedKafkaRule embeddedKafkaRule =
+            new WindowsEmbeddedKafkaRule(1, true, "test.me").kafkaPorts(59000);
 
     @Test
     public void startKafkaForManualTestsLaterWriterRealTests() {
-        try {
-            System.out.println(embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
-            Thread.sleep(120000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
+        sleep(seconds(60));
     }
 }
Index: src/test/java/kafka/WindowsEmbeddedKafkaRule.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/test/java/kafka/WindowsEmbeddedKafkaRule.java	(revision )
+++ src/test/java/kafka/WindowsEmbeddedKafkaRule.java	(revision )
@@ -0,0 +1,98 @@
+package kafka;
+
+import kafka.server.KafkaServer;
+import kafka.server.NotRunning;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Fixes delete failure bug on Windows.
+ * <p>
+ * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on
+ * Windows.
+ * <p>
+ * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close().
+ * <p>
+ * In addition, the number of try-catch blocks in the original implementation have been refactored to
+ * the {@link #swallow(SimpleFunction)} method.
+ *
+ * The solution is based on following Gist:
+ * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628
+ */
+public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule {
+
+    public WindowsEmbeddedKafkaRule(int count) {
+        super(count);
+    }
+
+    public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) {
+        super(count, controlledShutdown, topics);
+    }
+
+    public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) {
+        super(count, controlledShutdown, partitions, topics);
+    }
+
+    @Override
+    public void before() {
+        /* really brutal hack for logs of topic partitions that have been received messages */
+        for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) {
+            if (file.getName().startsWith("kafka-")) {
+                swallow(() -> Utils.delete(file));
+            }
+        }
+        super.before();
+    }
+
+    @Override
+    public void after() {
+        EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka();
+        System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS);
+        System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
+        for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) {
+            swallow(() -> shutdown(kafkaServer));
+            swallow(() -> deleteLogDir(kafkaServer));
+        }
+        swallow(this::closeZkClient);
+        swallow(this::shutdownZookeeper);
+    }
+
+    private void shutdown(KafkaServer kafkaServer) {
+        if (kafkaServer.brokerState().currentState() != (NotRunning.state())) {
+            kafkaServer.shutdown();
+            kafkaServer.awaitShutdown();
+        }
+    }
+
+    private void deleteLogDir(KafkaServer kafkaServer) {
+        // no need to make this delete here because of the shutdown hook in Testutils
+        // CoreUtils.delete(kafkaServer.config().logDirs());
+    }
+
+    private void closeZkClient() {
+        getEmbeddedKafka().getZkClient().close();
+    }
+
+    private void shutdownZookeeper() throws IOException {
+        getEmbeddedKafka().getZookeeper().zookeeper().getZKDatabase().close();
+        getEmbeddedKafka().getZookeeper().shutdown();
+    }
+
+    private void swallow(SimpleFunction function) {
+        try {
+            function.execute();
+        } catch (Exception e) {
+            // do nothing
+        }
+    }
+
+    @FunctionalInterface
+    interface SimpleFunction {
+        void execute() throws Exception;
+    }
+}
patschuh

patschuh

2018-11-27 20:57

developer   ~0000012

Reviewed and applied patch

Related Changesets

KafkaEsque: master 43f2bc6e

2018-11-27 20:55:32

patschuh

Details Diff
[ESQUE-10]: Applied provided patch
Affected Issues
0000010

Issue History

Date Modified Username Field Change
2018-11-25 22:13 Oliver G. New Issue
2018-11-25 22:13 Oliver G. File Added: WindowsEmbeddedKafka.patch
2018-11-26 06:39 patschuh Target Version => 0.12.0
2018-11-26 06:39 patschuh Description Updated View Revisions
2018-11-26 20:20 Oliver G. File Added: WindowsEmbeddedKafka1.patch
2018-11-26 20:20 Oliver G. Note Added: 0000011
2018-11-27 20:56 patschuh Changeset attached => KafkaEsque master 43f2bc6e
2018-11-27 20:57 patschuh Assigned To => patschuh
2018-11-27 20:57 patschuh Status new => resolved
2018-11-27 20:57 patschuh Resolution open => fixed
2018-11-27 20:57 patschuh Fixed in Version => 0.12.0
2018-11-27 20:57 patschuh Note Added: 0000012