View Issue Details
ID | Project | Category | View Status | Date Submitted | Last Update |
---|---|---|---|---|---|
0000021 | KafkaEsque | Feature Request | public | 2019-02-21 09:37 | 2019-11-25 07:50 |
Reporter | patschuh | Assigned To | patschuh | ||
Priority | high | Severity | feature | Reproducibility | have not tried |
Status | closed | Resolution | fixed | ||
Product Version | |||||
Target Version | Fixed in Version | ||||
Summary | 0000021: Add abbility to serialize and deserialize AVRO | ||||
Description | It should be configurable on a per topic level if the messages in the topic are serialized in avro | ||||
Tags | No tags attached. | ||||
Please add an Extended JsonDecoder so that it is possible to publish Avro Messages by using the JSON format without the need for extra type definitions. |
|
Please apply following patch on your develop branch.
ExtendedJsonDecoder_for_publishing_Avro.patch (29,500 bytes)
Index: src/main/java/at/esque/kafka/handlers/ProducerHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/main/java/at/esque/kafka/handlers/ProducerHandler.java (revision fd97d0bbeae497baa1fcc16818c7ff5af690b484) +++ src/main/java/at/esque/kafka/handlers/ProducerHandler.java (date 1553095428179) @@ -3,6 +3,7 @@ import at.esque.kafka.MessageType; import at.esque.kafka.cluster.ClusterConfig; import at.esque.kafka.cluster.TopicMessageTypeConfig; +import at.esque.kafka.serialization.ExtendedJsonDecoder; import at.esque.kafka.serialization.KafkaEsqueSerializer; import com.google.inject.Inject; import io.confluent.kafka.schemaregistry.client.rest.RestService; @@ -11,8 +12,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.JsonDecoder; +import org.apache.avro.io.Decoder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -124,7 +124,7 @@ org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.getSchema()); - JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(avroSchema, json); + Decoder jsonDecoder = new ExtendedJsonDecoder(avroSchema, json); DatumReader<GenericRecord> reader = new GenericDatumReader<>(avroSchema); return reader.read(null, jsonDecoder); Index: src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java (date 1553095428169) +++ src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java (date 1553095428169) @@ -0,0 +1,783 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package at.esque.kafka.serialization; + +import org.apache.avro.AvroTypeException; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.ParsingDecoder; +import org.apache.avro.io.parsing.JsonGrammarGenerator; +import org.apache.avro.io.parsing.Parser; +import org.apache.avro.io.parsing.Symbol; +import org.apache.avro.util.Utf8; +import org.codehaus.jackson.Base64Variant; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonLocation; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonStreamContext; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.ObjectCodec; +import org.codehaus.jackson.node.NullNode; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +/** + * Original code is from: https://github.com/Celos/avro-json-decoder + * + * Additional change for KafkaEsque: a NullNode.getInstance() is returned for fields that are missing from the JSON, + * meaning they are not explicitly set to null in the JSON. + * + * A {@link Decoder} for Avro's JSON data encoding. + * </p> + * Construct using {@link DecoderFactory}. + * </p> + * ExtendedJsonDecoder is not thread-safe. + * <p> + * Based on {@link org.apache.avro.io.JsonDecoder JsonDecoder} + * and <a href="https://github.com/zolyfarkas/avro">ExtendedJsonDecoder</a>. + * Infers default arguments, if they are not present. + * </p> + **/ +public class ExtendedJsonDecoder extends ParsingDecoder + implements Parser.ActionHandler { + private JsonParser in; + private static JsonFactory jsonFactory = new JsonFactory(); + Stack<ReorderBuffer> reorderBuffers = new Stack<ReorderBuffer>(); + ReorderBuffer currentReorderBuffer; + + private final Schema schema; + + private static class ReorderBuffer { + public Map<String, List<JsonElement>> savedFields = new HashMap<String, List<JsonElement>>(); + public JsonParser origParser = null; + } + + static final String CHARSET = "ISO-8859-1"; + + public ExtendedJsonDecoder(Schema schema, InputStream in) throws IOException { + super(getSymbol(schema)); + configure(in); + this.schema = schema; + } + + public ExtendedJsonDecoder(Schema schema, String in) throws IOException { + super(getSymbol(schema)); + configure(in); + this.schema = schema; + } + + private static Symbol getSymbol(Schema schema) { + if (null == schema) { + throw new NullPointerException("Schema cannot be null!"); + } + return new JsonGrammarGenerator().generate(schema); + } + + /** + * Reconfigures this JsonDecoder to use the InputStream provided. + * <p/> + * If the InputStream provided is null, a NullPointerException is thrown. + * <p/> + * Otherwise, this JsonDecoder will reset its state and then + * reconfigure its input. + * @param in + * The IntputStream to read from. Cannot be null. + * @throws IOException + * @return this JsonDecoder + */ + public ExtendedJsonDecoder configure(InputStream in) throws IOException { + if (null == in) { + throw new NullPointerException("InputStream to read from cannot be null!"); + } + parser.reset(); + this.in = jsonFactory.createJsonParser(in); + this.in.nextToken(); + return this; + } + + /** + * Reconfigures this JsonDecoder to use the String provided for input. + * <p/> + * If the String provided is null, a NullPointerException is thrown. + * <p/> + * Otherwise, this JsonDecoder will reset its state and then + * reconfigure its input. + * @param in + * The String to read from. Cannot be null. + * @throws IOException + * @return this JsonDecoder + */ + public ExtendedJsonDecoder configure(String in) throws IOException { + if (null == in) { + throw new NullPointerException("String to read from cannot be null!"); + } + parser.reset(); + this.in = new JsonFactory().createJsonParser(in); + this.in.nextToken(); + return this; + } + + private void advance(Symbol symbol) throws IOException { + this.parser.processTrailingImplicitActions(); + if (in.getCurrentToken() == null && this.parser.depth() == 1) + throw new EOFException(); + parser.advance(symbol); + } + + @Override + public void readNull() throws IOException { + advance(Symbol.NULL); + if (in.getCurrentToken() == JsonToken.VALUE_NULL) { + in.nextToken(); + } else { + throw error("null"); + } + } + + @Override + public boolean readBoolean() throws IOException { + advance(Symbol.BOOLEAN); + JsonToken t = in.getCurrentToken(); + if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) { + in.nextToken(); + return t == JsonToken.VALUE_TRUE; + } else { + throw error("boolean"); + } + } + + @Override + public int readInt() throws IOException { + advance(Symbol.INT); + if (in.getCurrentToken().isNumeric()) { + int result = in.getIntValue(); + in.nextToken(); + return result; + } else { + throw error("int"); + } + } + + @Override + public long readLong() throws IOException { + advance(Symbol.LONG); + if (in.getCurrentToken().isNumeric()) { + long result = in.getLongValue(); + in.nextToken(); + return result; + } else { + throw error("long"); + } + } + + @Override + public float readFloat() throws IOException { + advance(Symbol.FLOAT); + if (in.getCurrentToken().isNumeric()) { + float result = in.getFloatValue(); + in.nextToken(); + return result; + } else { + throw error("float"); + } + } + + @Override + public double readDouble() throws IOException { + advance(Symbol.DOUBLE); + if (in.getCurrentToken().isNumeric()) { + double result = in.getDoubleValue(); + in.nextToken(); + return result; + } else { + throw error("double"); + } + } + + @Override + public Utf8 readString(Utf8 old) throws IOException { + return new Utf8(readString()); + } + + @Override + public String readString() throws IOException { + advance(Symbol.STRING); + if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { + parser.advance(Symbol.MAP_KEY_MARKER); + if (in.getCurrentToken() != JsonToken.FIELD_NAME) { + throw error("map-key"); + } + } else { + if (in.getCurrentToken() != JsonToken.VALUE_STRING) { + throw error("string"); + } + } + String result = in.getText(); + in.nextToken(); + return result; + } + + @Override + public void skipString() throws IOException { + advance(Symbol.STRING); + if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { + parser.advance(Symbol.MAP_KEY_MARKER); + if (in.getCurrentToken() != JsonToken.FIELD_NAME) { + throw error("map-key"); + } + } else { + if (in.getCurrentToken() != JsonToken.VALUE_STRING) { + throw error("string"); + } + } + in.nextToken(); + } + + @Override + public ByteBuffer readBytes(ByteBuffer old) throws IOException { + advance(Symbol.BYTES); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + byte[] result = readByteArray(); + in.nextToken(); + return ByteBuffer.wrap(result); + } else { + throw error("bytes"); + } + } + + private byte[] readByteArray() throws IOException { + byte[] result = in.getText().getBytes(CHARSET); + return result; + } + + @Override + public void skipBytes() throws IOException { + advance(Symbol.BYTES); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + in.nextToken(); + } else { + throw error("bytes"); + } + } + + private void checkFixed(int size) throws IOException { + advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + if (size != top.size) { + throw new AvroTypeException( + "Incorrect length for fixed binary: expected " + + top.size + " but received " + size + " bytes."); + } + } + + @Override + public void readFixed(byte[] bytes, int start, int len) throws IOException { + checkFixed(len); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + byte[] result = readByteArray(); + in.nextToken(); + if (result.length != len) { + throw new AvroTypeException("Expected fixed length " + len + + ", but got" + result.length); + } + System.arraycopy(result, 0, bytes, start, len); + } else { + throw error("fixed"); + } + } + + @Override + public void skipFixed(int length) throws IOException { + checkFixed(length); + doSkipFixed(length); + } + + private void doSkipFixed(int length) throws IOException { + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + byte[] result = readByteArray(); + in.nextToken(); + if (result.length != length) { + throw new AvroTypeException("Expected fixed length " + length + + ", but got" + result.length); + } + } else { + throw error("fixed"); + } + } + + @Override + protected void skipFixed() throws IOException { + advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + doSkipFixed(top.size); + } + + @Override + public int readEnum() throws IOException { + advance(Symbol.ENUM); + Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol(); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + in.getText(); + int n = top.findLabel(in.getText()); + if (n >= 0) { + in.nextToken(); + return n; + } + throw new AvroTypeException("Unknown symbol in enum " + in.getText()); + } else { + throw error("fixed"); + } + } + + @Override + public long readArrayStart() throws IOException { + advance(Symbol.ARRAY_START); + if (in.getCurrentToken() == JsonToken.START_ARRAY) { + in.nextToken(); + return doArrayNext(); + } else { + throw error("array-start"); + } + } + + @Override + public long arrayNext() throws IOException { + advance(Symbol.ITEM_END); + return doArrayNext(); + } + + private long doArrayNext() throws IOException { + if (in.getCurrentToken() == JsonToken.END_ARRAY) { + parser.advance(Symbol.ARRAY_END); + in.nextToken(); + return 0; + } else { + return 1; + } + } + + @Override + public long skipArray() throws IOException { + advance(Symbol.ARRAY_START); + if (in.getCurrentToken() == JsonToken.START_ARRAY) { + in.skipChildren(); + in.nextToken(); + advance(Symbol.ARRAY_END); + } else { + throw error("array-start"); + } + return 0; + } + + @Override + public long readMapStart() throws IOException { + advance(Symbol.MAP_START); + if (in.getCurrentToken() == JsonToken.START_OBJECT) { + in.nextToken(); + return doMapNext(); + } else { + throw error("map-start"); + } + } + + @Override + public long mapNext() throws IOException { + advance(Symbol.ITEM_END); + return doMapNext(); + } + + private long doMapNext() throws IOException { + if (in.getCurrentToken() == JsonToken.END_OBJECT) { + in.nextToken(); + advance(Symbol.MAP_END); + return 0; + } else { + return 1; + } + } + + @Override + public long skipMap() throws IOException { + advance(Symbol.MAP_START); + if (in.getCurrentToken() == JsonToken.START_OBJECT) { + in.skipChildren(); + in.nextToken(); + advance(Symbol.MAP_END); + } else { + throw error("map-start"); + } + return 0; + } + + @Override + public int readIndex() throws IOException { + advance(Symbol.UNION); + Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol(); + + String label; + final JsonToken currentToken = in.getCurrentToken(); + if (currentToken == JsonToken.VALUE_NULL) { + label = "null"; + } else if (a.size() == 2 && + ("null".equals(a.getLabel(0)) || "null".equals(a.getLabel(1)))) { + label = ("null".equals(a.getLabel(0)) ? a.getLabel(1) : a.getLabel(0)); + } else if (currentToken == JsonToken.START_OBJECT + && in.nextToken() == JsonToken.FIELD_NAME) { + label = in.getText(); + in.nextToken(); + parser.pushSymbol(Symbol.UNION_END); + } else { + throw error("start-union"); + } + + int n = a.findLabel(label); + if (n < 0) { + // in order to allow the type specification for nullable fields to be avoided, we always choose index 1 (second type) + // works if you have only null on index 0, some type on index 1 + // original code threw a new AvroTypeException("Unknown union branch " + label); + n = 1; + } + parser.pushSymbol(a.getSymbol(n)); + return n; + } + + @Override + public Symbol doAction(Symbol input, Symbol top) throws IOException { + if (top instanceof Symbol.FieldAdjustAction) { + Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; + String name = fa.fname; + if (currentReorderBuffer != null) { + List<JsonElement> node = currentReorderBuffer.savedFields.get(name); + if (node != null) { + currentReorderBuffer.savedFields.remove(name); + currentReorderBuffer.origParser = in; + in = makeParser(node); + return null; + } + } + if (in.getCurrentToken() == JsonToken.FIELD_NAME) { + do { + String fn = in.getText(); + in.nextToken(); + if (name.equals(fn)) { + return null; + } else { + if (currentReorderBuffer == null) { + currentReorderBuffer = new ReorderBuffer(); + } + currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in)); + } + } while (in.getCurrentToken() == JsonToken.FIELD_NAME); + injectDefaultValueIfAvailable(in, fa.fname); + } else { + injectDefaultValueIfAvailable(in, fa.fname); + } + } else if (top == Symbol.FIELD_END) { + if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) { + in = currentReorderBuffer.origParser; + currentReorderBuffer.origParser = null; + } + } else if (top == Symbol.RECORD_START) { + if (in.getCurrentToken() == JsonToken.START_OBJECT) { + in.nextToken(); + reorderBuffers.push(currentReorderBuffer); + currentReorderBuffer = null; + } else { + throw error("record-start"); + } + } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) { + if (in.getCurrentToken() == JsonToken.END_OBJECT) { + in.nextToken(); + if (top == Symbol.RECORD_END) { + if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) { + throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet()); + } + currentReorderBuffer = reorderBuffers.pop(); + } + } else { + throw error(top == Symbol.RECORD_END ? "record-end" : "union-end"); + } + } else { + throw new AvroTypeException("Unknown action symbol " + top); + } + return null; + } + + private static class JsonElement { + public final JsonToken token; + public final String value; + + public JsonElement(JsonToken t, String value) { + this.token = t; + this.value = value; + } + + public JsonElement(JsonToken t) { + this(t, null); + } + } + + private static List<JsonElement> getVaueAsTree(JsonParser in) throws IOException { + int level = 0; + List<JsonElement> result = new ArrayList<JsonElement>(); + do { + JsonToken t = in.getCurrentToken(); + switch (t) { + case START_OBJECT: + case START_ARRAY: + level++; + result.add(new JsonElement(t)); + break; + case END_OBJECT: + case END_ARRAY: + level--; + result.add(new JsonElement(t)); + break; + case FIELD_NAME: + case VALUE_STRING: + case VALUE_NUMBER_INT: + case VALUE_NUMBER_FLOAT: + case VALUE_TRUE: + case VALUE_FALSE: + case VALUE_NULL: + result.add(new JsonElement(t, in.getText())); + break; + } + in.nextToken(); + } while (level != 0); + result.add(new JsonElement(null)); + return result; + } + + private JsonParser makeParser(final List<JsonElement> elements) throws IOException { + return new JsonParser() { + int pos = 0; + + @Override + public ObjectCodec getCodec() { + throw new UnsupportedOperationException(); + } + + @Override + public void setCodec(ObjectCodec c) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public JsonToken nextToken() throws IOException { + pos++; + return elements.get(pos).token; + } + + @Override + public JsonParser skipChildren() throws IOException { + JsonToken tkn = elements.get(pos).token; + int level = (tkn == JsonToken.START_ARRAY || tkn == JsonToken.END_ARRAY) ? 1 : 0; + while (level > 0) { + switch (elements.get(++pos).token) { + case START_ARRAY: + case START_OBJECT: + level++; + break; + case END_ARRAY: + case END_OBJECT: + level--; + break; + } + } + return this; + } + + @Override + public boolean isClosed() { + throw new UnsupportedOperationException(); + } + + @Override + public String getCurrentName() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public JsonStreamContext getParsingContext() { + throw new UnsupportedOperationException(); + } + + @Override + public JsonLocation getTokenLocation() { + throw new UnsupportedOperationException(); + } + + @Override + public JsonLocation getCurrentLocation() { + throw new UnsupportedOperationException(); + } + + @Override + public String getText() throws IOException { + return elements.get(pos).value; + } + + @Override + public char[] getTextCharacters() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getTextLength() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getTextOffset() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Number getNumberValue() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public NumberType getNumberType() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getIntValue() throws IOException { + return Integer.parseInt(getText()); + } + + @Override + public long getLongValue() throws IOException { + return Long.parseLong(getText()); + } + + @Override + public BigInteger getBigIntegerValue() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloatValue() throws IOException { + return Float.parseFloat(getText()); + } + + @Override + public double getDoubleValue() throws IOException { + return Double.parseDouble(getText()); + } + + @Override + public BigDecimal getDecimalValue() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBinaryValue(Base64Variant b64variant) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public JsonToken getCurrentToken() { + return elements.get(pos).token; + } + }; + } + + private AvroTypeException error(String type) { + return new AvroTypeException("Expected " + type + + ". Got " + in.getCurrentToken()); + } + + private static final JsonElement NULL_JSON_ELEMENT = new JsonElement(null); + + private void injectDefaultValueIfAvailable(final JsonParser in, String fieldName) throws IOException { + Field field = findField(schema, fieldName); + + boolean isNull = field == null; + + JsonNode defVal = isNull ? NullNode.getInstance() : field.defaultValue(); + if (defVal == null) { + throw new AvroTypeException("Expected field name not found: " + fieldName); + } + + List<JsonElement> result = new ArrayList<>(2); + JsonParser traverse = defVal.traverse(); + JsonToken nextToken; + while ((nextToken = traverse.nextToken()) != null) { + if (nextToken.isScalarValue()) { + result.add(new JsonElement(nextToken, traverse.getText())); + } else { + result.add(new JsonElement(nextToken)); + } + } + result.add(NULL_JSON_ELEMENT); + if (currentReorderBuffer == null) { + currentReorderBuffer = new ReorderBuffer(); + } + currentReorderBuffer.origParser = in; + this.in = makeParser(result); + } + + private static Field findField(Schema schema, String name) { + if (schema.getField(name) != null) { + return schema.getField(name); + } + + Field foundField = null; + + for (Field field : schema.getFields()) { + Schema fieldSchema = field.schema(); + if (Type.RECORD.equals(fieldSchema.getType())) { + foundField = findField(fieldSchema, name); + } else if (Type.ARRAY.equals(fieldSchema.getType())) { + foundField = findField(fieldSchema.getElementType(), name); + } else if (Type.MAP.equals(fieldSchema.getType())) { + foundField = findField(fieldSchema.getValueType(), name); + } + + if (foundField != null) { + return foundField; + } + } + + return foundField; + } +} + |
|
KafkaEsque_github: develop deb586f2 2019-02-25 10:44:23 Details Diff |
[ESQUE-12, ESQUE-21] Added configuration options for Producers and consumers; added configurations for topics; used topic configuration to enable use of avro |
Affected Issues 0000012, 0000021 |
|
mod - pom.xml | Diff File | ||
add - src/main/java/at/esque/kafka/ConfigHandler.java | Diff File | ||
mod - src/main/java/at/esque/kafka/ConsumerHandler.java | Diff File | ||
mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
add - src/main/java/at/esque/kafka/MessageType.java | Diff File | ||
mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
add - src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java | Diff File | ||
KafkaEsque_github: develop 6a6d9b73 2019-02-25 13:33:08 Details Diff |
[ESQUE-21] Make Message Types configurable from topicList |
Affected Issues 0000021 |
|
mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
add - src/main/java/at/esque/kafka/dialogs/TopicMessageTypeConfigDialog.java | Diff File | ||
KafkaEsque_github: develop 50a3d0cf 2019-02-27 07:56:33 Details Diff |
[ESQUE-21] Fix silently dying consumer caused by NPE when consuming null/delete messages |
Affected Issues 0000021 |
|
mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
KafkaEsque_github: develop fc2653dd 2019-02-27 08:23:14 Details Diff |
[ESQUE-21] Check if schemaRegistry is configured when trying to publish |
Affected Issues 0000021 |
|
mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
mod - src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java | Diff File | ||
KafkaEsque_github: develop 0a315250 2019-03-04 15:30:14 Details Diff |
[ESQUE-21] display content as String if avro deserialization fails |
Affected Issues 0000021 |
|
mod - src/main/java/at/esque/kafka/ConsumerHandler.java | Diff File | ||
add - src/main/java/at/esque/kafka/serialization/ForgivingKafkaAvroDeserializer.java | Diff File | ||
KafkaEsque_github: develop 722898b9 2019-03-12 08:17:24 Details Diff |
[ESQUE-12, ESQUE-21] Add ProducerHandler; Add custom serializers |
Affected Issues 0000012, 0000021 |
|
mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
mod - src/main/java/at/esque/kafka/Main.java | Diff File | ||
mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
add - src/main/java/at/esque/kafka/handlers/ProducerHandler.java | Diff File | ||
add - src/main/java/at/esque/kafka/handlers/ProducerWrapper.java | Diff File | ||
add - src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java | Diff File | ||
add - src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java | Diff File | ||
KafkaEsque_github: develop 5e202031 2019-03-30 10:21:50 Details Diff |
[ESQUE-21] Add ExtendedJsonDecoder for more convenient AVRO publishing |
Affected Issues 0000021 |
|
mod - src/main/java/at/esque/kafka/handlers/ProducerHandler.java | Diff File | ||
add - src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java | Diff File |
Date Modified | Username | Field | Change |
---|---|---|---|
2019-02-21 09:37 | patschuh | New Issue | |
2019-02-25 10:46 | patschuh | Changeset attached | => KafkaEsque_github develop deb586f2 |
2019-02-25 10:49 | patschuh | Assigned To | => patschuh |
2019-02-25 10:49 | patschuh | Status | new => assigned |
2019-02-25 13:33 | patschuh | Changeset attached | => KafkaEsque_github develop 6a6d9b73 |
2019-02-27 07:57 | patschuh | Changeset attached | => KafkaEsque_github develop 50a3d0cf |
2019-02-27 08:23 | patschuh | Changeset attached | => KafkaEsque_github develop fc2653dd |
2019-03-04 15:30 | patschuh | Changeset attached | => KafkaEsque_github develop 0a315250 |
2019-03-12 08:17 | patschuh | Changeset attached | => KafkaEsque_github develop 722898b9 |
2019-03-18 21:20 | Oliver G. | Note Added: 0000028 | |
2019-03-20 15:26 | Oliver G. | File Added: ExtendedJsonDecoder_for_publishing_Avro.patch | |
2019-03-20 15:26 | Oliver G. | Note Added: 0000029 | |
2019-03-30 10:22 | patschuh | Changeset attached | => KafkaEsque_github develop 5e202031 |
2019-11-25 07:50 | patschuh | Status | assigned => closed |
2019-11-25 07:50 | patschuh | Resolution | open => fixed |