View Issue Details
| ID | Project | Category | View Status | Date Submitted | Last Update |
|---|---|---|---|---|---|
| 0000010 | KafkaEsque | Improvement | public | 2018-11-25 22:13 | 2018-11-27 20:57 |
| Reporter | Oliver G. | Assigned To | patschuh | ||
| Priority | normal | Severity | tweak | Reproducibility | N/A |
| Status | resolved | Resolution | fixed | ||
| Product Version | |||||
| Target Version | 0.12.0 | Fixed in Version | 0.12.0 | ||
| Summary | 0000010: WindowsEmbeddedKafka Patch | ||||
| Description | Patch ist im Anhang.
| ||||
| Tags | No tags attached. | ||||
|
WindowsEmbeddedKafka.patch (3,893 bytes)
Index: src/test/java/kafka/WindowsEmbeddedKafkaRule.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision Shelved Version)
+++ src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision Shelved Version)
@@ -0,0 +1,98 @@
+package kafka;
+
+import kafka.server.KafkaServer;
+import kafka.server.NotRunning;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Fixes delete failure bug on Windows.
+ * <p>
+ * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on
+ * Windows.
+ * <p>
+ * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close().
+ * <p>
+ * In addition, the number of try-catch blocks in the original implementation have been refactored to
+ * the {@link #swallow(SimpleFunction)} method.
+ *
+ * The solution is based on following Gist:
+ * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628
+ */
+public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule {
+
+ public WindowsEmbeddedKafkaRule(int count) {
+ super(count);
+ }
+
+ public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) {
+ super(count, controlledShutdown, topics);
+ }
+
+ public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) {
+ super(count, controlledShutdown, partitions, topics);
+ }
+
+ @Override
+ public void before() {
+ /* really brutal hack for logs of topic partitions that have been received messages */
+ for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) {
+ if (file.getName().startsWith("kafka-")) {
+ swallow(() -> Utils.delete(file));
+ }
+ }
+ super.before();
+ }
+
+ @Override
+ public void after() {
+ EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka();
+ System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS);
+ System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
+ for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) {
+ swallow(() -> shutdown(kafkaServer));
+ swallow(() -> deleteLogDir(kafkaServer));
+ }
+ swallow(this::closeZkClient);
+ swallow(this::shutdownZookeeper);
+ }
+
+ private void shutdown(KafkaServer kafkaServer) {
+ if (kafkaServer.brokerState().currentState() != (NotRunning.state())) {
+ kafkaServer.shutdown();
+ kafkaServer.awaitShutdown();
+ }
+ }
+
+ private void deleteLogDir(KafkaServer kafkaServer) {
+ // no need to make this delete here because of the shutdown hook in Testutils
+ // CoreUtils.delete(kafkaServer.config().logDirs());
+ }
+
+ private void closeZkClient() {
+ getEmbeddedKafka().getZkClient().close();
+ }
+
+ private void shutdownZookeeper() throws IOException {
+ getEmbeddedKafka().getZookeeper().zookeeper().getZKDatabase().close();
+ getEmbeddedKafka().getZookeeper().shutdown();
+ }
+
+ private void swallow(SimpleFunction function) {
+ try {
+ function.execute();
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+
+ @FunctionalInterface
+ interface SimpleFunction {
+ void execute() throws Exception;
+ }
+}
|
|
|
Attaching patch 2nd try.
WindowsEmbeddedKafka1.patch (6,104 bytes)
Index: pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- pom.xml (revision 5403c717d55ee18a5cd03a66103c8b6db2aa29f5)
+++ pom.xml (revision )
@@ -72,6 +72,13 @@
<version>2.2.0.RELEASE</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.tempus-fugit</groupId>
+ <artifactId>tempus-fugit</artifactId>
+ <version>1.1</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
Index: src/test/java/kafka/StartEmbeddedKafka.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/test/java/kafka/StartEmbeddedKafka.java (revision 5403c717d55ee18a5cd03a66103c8b6db2aa29f5)
+++ src/test/java/kafka/StartEmbeddedKafka.java (revision )
@@ -5,26 +5,18 @@
import org.junit.Test;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+import static com.google.code.tempusfugit.concurrency.ThreadUtils.sleep;
+import static com.google.code.tempusfugit.temporal.Duration.seconds;
@Ignore("currently just started for quick manual testing")
public class StartEmbeddedKafka {
- /*
- * Info: Windows folder deletion bug seems to be still there, so following Gist might be still relevant
- *
- * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628
- *
- */
@ClassRule
- public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "test.me");
+ public static EmbeddedKafkaRule embeddedKafkaRule =
+ new WindowsEmbeddedKafkaRule(1, true, "test.me").kafkaPorts(59000);
@Test
public void startKafkaForManualTestsLaterWriterRealTests() {
- try {
- System.out.println(embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
- Thread.sleep(120000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ sleep(seconds(60));
}
}
Index: src/test/java/kafka/WindowsEmbeddedKafkaRule.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision )
+++ src/test/java/kafka/WindowsEmbeddedKafkaRule.java (revision )
@@ -0,0 +1,98 @@
+package kafka;
+
+import kafka.server.KafkaServer;
+import kafka.server.NotRunning;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Fixes delete failure bug on Windows.
+ * <p>
+ * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on
+ * Windows.
+ * <p>
+ * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close().
+ * <p>
+ * In addition, the number of try-catch blocks in the original implementation have been refactored to
+ * the {@link #swallow(SimpleFunction)} method.
+ *
+ * The solution is based on following Gist:
+ * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628
+ */
+public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule {
+
+ public WindowsEmbeddedKafkaRule(int count) {
+ super(count);
+ }
+
+ public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) {
+ super(count, controlledShutdown, topics);
+ }
+
+ public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) {
+ super(count, controlledShutdown, partitions, topics);
+ }
+
+ @Override
+ public void before() {
+ /* really brutal hack for logs of topic partitions that have been received messages */
+ for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) {
+ if (file.getName().startsWith("kafka-")) {
+ swallow(() -> Utils.delete(file));
+ }
+ }
+ super.before();
+ }
+
+ @Override
+ public void after() {
+ EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka();
+ System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS);
+ System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
+ for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) {
+ swallow(() -> shutdown(kafkaServer));
+ swallow(() -> deleteLogDir(kafkaServer));
+ }
+ swallow(this::closeZkClient);
+ swallow(this::shutdownZookeeper);
+ }
+
+ private void shutdown(KafkaServer kafkaServer) {
+ if (kafkaServer.brokerState().currentState() != (NotRunning.state())) {
+ kafkaServer.shutdown();
+ kafkaServer.awaitShutdown();
+ }
+ }
+
+ private void deleteLogDir(KafkaServer kafkaServer) {
+ // no need to make this delete here because of the shutdown hook in Testutils
+ // CoreUtils.delete(kafkaServer.config().logDirs());
+ }
+
+ private void closeZkClient() {
+ getEmbeddedKafka().getZkClient().close();
+ }
+
+ private void shutdownZookeeper() throws IOException {
+ getEmbeddedKafka().getZookeeper().zookeeper().getZKDatabase().close();
+ getEmbeddedKafka().getZookeeper().shutdown();
+ }
+
+ private void swallow(SimpleFunction function) {
+ try {
+ function.execute();
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+
+ @FunctionalInterface
+ interface SimpleFunction {
+ void execute() throws Exception;
+ }
+}
|
|
|
Reviewed and applied patch |
|
| Date Modified | Username | Field | Change |
|---|---|---|---|
| 2018-11-25 22:13 | Oliver G. | New Issue | |
| 2018-11-25 22:13 | Oliver G. | File Added: WindowsEmbeddedKafka.patch | |
| 2018-11-26 06:39 | patschuh | Target Version | => 0.12.0 |
| 2018-11-26 06:39 | patschuh | Description Updated | View Revisions |
| 2018-11-26 20:20 | Oliver G. | File Added: WindowsEmbeddedKafka1.patch | |
| 2018-11-26 20:20 | Oliver G. | Note Added: 0000011 | |
| 2018-11-27 20:56 | patschuh | Changeset attached | => KafkaEsque master 43f2bc6e |
| 2018-11-27 20:57 | patschuh | Assigned To | => patschuh |
| 2018-11-27 20:57 | patschuh | Status | new => resolved |
| 2018-11-27 20:57 | patschuh | Resolution | open => fixed |
| 2018-11-27 20:57 | patschuh | Fixed in Version | => 0.12.0 |
| 2018-11-27 20:57 | patschuh | Note Added: 0000012 |