View Issue Details
| ID | Project | Category | View Status | Date Submitted | Last Update |
|---|---|---|---|---|---|
| 0000021 | KafkaEsque | Feature Request | public | 2019-02-21 09:37 | 2019-11-25 07:50 |
| Reporter | patschuh | Assigned To | patschuh | ||
| Priority | high | Severity | feature | Reproducibility | have not tried |
| Status | closed | Resolution | fixed | ||
| Product Version | |||||
| Target Version | Fixed in Version | ||||
| Summary | 0000021: Add abbility to serialize and deserialize AVRO | ||||
| Description | It should be configurable on a per topic level if the messages in the topic are serialized in avro | ||||
| Tags | No tags attached. | ||||
|
Please add an Extended JsonDecoder so that it is possible to publish Avro Messages by using the JSON format without the need for extra type definitions. |
|
|
Please apply following patch on your develop branch.
ExtendedJsonDecoder_for_publishing_Avro.patch (29,500 bytes)
Index: src/main/java/at/esque/kafka/handlers/ProducerHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/at/esque/kafka/handlers/ProducerHandler.java (revision fd97d0bbeae497baa1fcc16818c7ff5af690b484)
+++ src/main/java/at/esque/kafka/handlers/ProducerHandler.java (date 1553095428179)
@@ -3,6 +3,7 @@
import at.esque.kafka.MessageType;
import at.esque.kafka.cluster.ClusterConfig;
import at.esque.kafka.cluster.TopicMessageTypeConfig;
+import at.esque.kafka.serialization.ExtendedJsonDecoder;
import at.esque.kafka.serialization.KafkaEsqueSerializer;
import com.google.inject.Inject;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
@@ -11,8 +12,7 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.io.Decoder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -124,7 +124,7 @@
org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.getSchema());
- JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(avroSchema, json);
+ Decoder jsonDecoder = new ExtendedJsonDecoder(avroSchema, json);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(avroSchema);
return reader.read(null, jsonDecoder);
Index: src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java (date 1553095428169)
+++ src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java (date 1553095428169)
@@ -0,0 +1,783 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package at.esque.kafka.serialization;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ParsingDecoder;
+import org.apache.avro.io.parsing.JsonGrammarGenerator;
+import org.apache.avro.io.parsing.Parser;
+import org.apache.avro.io.parsing.Symbol;
+import org.apache.avro.util.Utf8;
+import org.codehaus.jackson.Base64Variant;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonLocation;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonStreamContext;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.ObjectCodec;
+import org.codehaus.jackson.node.NullNode;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * Original code is from: https://github.com/Celos/avro-json-decoder
+ *
+ * Additional change for KafkaEsque: a NullNode.getInstance() is returned for fields that are missing from the JSON,
+ * meaning they are not explicitly set to null in the JSON.
+ *
+ * A {@link Decoder} for Avro's JSON data encoding.
+ * </p>
+ * Construct using {@link DecoderFactory}.
+ * </p>
+ * ExtendedJsonDecoder is not thread-safe.
+ * <p>
+ * Based on {@link org.apache.avro.io.JsonDecoder JsonDecoder}
+ * and <a href="https://github.com/zolyfarkas/avro">ExtendedJsonDecoder</a>.
+ * Infers default arguments, if they are not present.
+ * </p>
+ **/
+public class ExtendedJsonDecoder extends ParsingDecoder
+ implements Parser.ActionHandler {
+ private JsonParser in;
+ private static JsonFactory jsonFactory = new JsonFactory();
+ Stack<ReorderBuffer> reorderBuffers = new Stack<ReorderBuffer>();
+ ReorderBuffer currentReorderBuffer;
+
+ private final Schema schema;
+
+ private static class ReorderBuffer {
+ public Map<String, List<JsonElement>> savedFields = new HashMap<String, List<JsonElement>>();
+ public JsonParser origParser = null;
+ }
+
+ static final String CHARSET = "ISO-8859-1";
+
+ public ExtendedJsonDecoder(Schema schema, InputStream in) throws IOException {
+ super(getSymbol(schema));
+ configure(in);
+ this.schema = schema;
+ }
+
+ public ExtendedJsonDecoder(Schema schema, String in) throws IOException {
+ super(getSymbol(schema));
+ configure(in);
+ this.schema = schema;
+ }
+
+ private static Symbol getSymbol(Schema schema) {
+ if (null == schema) {
+ throw new NullPointerException("Schema cannot be null!");
+ }
+ return new JsonGrammarGenerator().generate(schema);
+ }
+
+ /**
+ * Reconfigures this JsonDecoder to use the InputStream provided.
+ * <p/>
+ * If the InputStream provided is null, a NullPointerException is thrown.
+ * <p/>
+ * Otherwise, this JsonDecoder will reset its state and then
+ * reconfigure its input.
+ * @param in
+ * The IntputStream to read from. Cannot be null.
+ * @throws IOException
+ * @return this JsonDecoder
+ */
+ public ExtendedJsonDecoder configure(InputStream in) throws IOException {
+ if (null == in) {
+ throw new NullPointerException("InputStream to read from cannot be null!");
+ }
+ parser.reset();
+ this.in = jsonFactory.createJsonParser(in);
+ this.in.nextToken();
+ return this;
+ }
+
+ /**
+ * Reconfigures this JsonDecoder to use the String provided for input.
+ * <p/>
+ * If the String provided is null, a NullPointerException is thrown.
+ * <p/>
+ * Otherwise, this JsonDecoder will reset its state and then
+ * reconfigure its input.
+ * @param in
+ * The String to read from. Cannot be null.
+ * @throws IOException
+ * @return this JsonDecoder
+ */
+ public ExtendedJsonDecoder configure(String in) throws IOException {
+ if (null == in) {
+ throw new NullPointerException("String to read from cannot be null!");
+ }
+ parser.reset();
+ this.in = new JsonFactory().createJsonParser(in);
+ this.in.nextToken();
+ return this;
+ }
+
+ private void advance(Symbol symbol) throws IOException {
+ this.parser.processTrailingImplicitActions();
+ if (in.getCurrentToken() == null && this.parser.depth() == 1)
+ throw new EOFException();
+ parser.advance(symbol);
+ }
+
+ @Override
+ public void readNull() throws IOException {
+ advance(Symbol.NULL);
+ if (in.getCurrentToken() == JsonToken.VALUE_NULL) {
+ in.nextToken();
+ } else {
+ throw error("null");
+ }
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ advance(Symbol.BOOLEAN);
+ JsonToken t = in.getCurrentToken();
+ if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) {
+ in.nextToken();
+ return t == JsonToken.VALUE_TRUE;
+ } else {
+ throw error("boolean");
+ }
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ advance(Symbol.INT);
+ if (in.getCurrentToken().isNumeric()) {
+ int result = in.getIntValue();
+ in.nextToken();
+ return result;
+ } else {
+ throw error("int");
+ }
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ advance(Symbol.LONG);
+ if (in.getCurrentToken().isNumeric()) {
+ long result = in.getLongValue();
+ in.nextToken();
+ return result;
+ } else {
+ throw error("long");
+ }
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ advance(Symbol.FLOAT);
+ if (in.getCurrentToken().isNumeric()) {
+ float result = in.getFloatValue();
+ in.nextToken();
+ return result;
+ } else {
+ throw error("float");
+ }
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ advance(Symbol.DOUBLE);
+ if (in.getCurrentToken().isNumeric()) {
+ double result = in.getDoubleValue();
+ in.nextToken();
+ return result;
+ } else {
+ throw error("double");
+ }
+ }
+
+ @Override
+ public Utf8 readString(Utf8 old) throws IOException {
+ return new Utf8(readString());
+ }
+
+ @Override
+ public String readString() throws IOException {
+ advance(Symbol.STRING);
+ if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
+ parser.advance(Symbol.MAP_KEY_MARKER);
+ if (in.getCurrentToken() != JsonToken.FIELD_NAME) {
+ throw error("map-key");
+ }
+ } else {
+ if (in.getCurrentToken() != JsonToken.VALUE_STRING) {
+ throw error("string");
+ }
+ }
+ String result = in.getText();
+ in.nextToken();
+ return result;
+ }
+
+ @Override
+ public void skipString() throws IOException {
+ advance(Symbol.STRING);
+ if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
+ parser.advance(Symbol.MAP_KEY_MARKER);
+ if (in.getCurrentToken() != JsonToken.FIELD_NAME) {
+ throw error("map-key");
+ }
+ } else {
+ if (in.getCurrentToken() != JsonToken.VALUE_STRING) {
+ throw error("string");
+ }
+ }
+ in.nextToken();
+ }
+
+ @Override
+ public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+ advance(Symbol.BYTES);
+ if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+ byte[] result = readByteArray();
+ in.nextToken();
+ return ByteBuffer.wrap(result);
+ } else {
+ throw error("bytes");
+ }
+ }
+
+ private byte[] readByteArray() throws IOException {
+ byte[] result = in.getText().getBytes(CHARSET);
+ return result;
+ }
+
+ @Override
+ public void skipBytes() throws IOException {
+ advance(Symbol.BYTES);
+ if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+ in.nextToken();
+ } else {
+ throw error("bytes");
+ }
+ }
+
+ private void checkFixed(int size) throws IOException {
+ advance(Symbol.FIXED);
+ Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
+ if (size != top.size) {
+ throw new AvroTypeException(
+ "Incorrect length for fixed binary: expected " +
+ top.size + " but received " + size + " bytes.");
+ }
+ }
+
+ @Override
+ public void readFixed(byte[] bytes, int start, int len) throws IOException {
+ checkFixed(len);
+ if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+ byte[] result = readByteArray();
+ in.nextToken();
+ if (result.length != len) {
+ throw new AvroTypeException("Expected fixed length " + len
+ + ", but got" + result.length);
+ }
+ System.arraycopy(result, 0, bytes, start, len);
+ } else {
+ throw error("fixed");
+ }
+ }
+
+ @Override
+ public void skipFixed(int length) throws IOException {
+ checkFixed(length);
+ doSkipFixed(length);
+ }
+
+ private void doSkipFixed(int length) throws IOException {
+ if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+ byte[] result = readByteArray();
+ in.nextToken();
+ if (result.length != length) {
+ throw new AvroTypeException("Expected fixed length " + length
+ + ", but got" + result.length);
+ }
+ } else {
+ throw error("fixed");
+ }
+ }
+
+ @Override
+ protected void skipFixed() throws IOException {
+ advance(Symbol.FIXED);
+ Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
+ doSkipFixed(top.size);
+ }
+
+ @Override
+ public int readEnum() throws IOException {
+ advance(Symbol.ENUM);
+ Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
+ if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+ in.getText();
+ int n = top.findLabel(in.getText());
+ if (n >= 0) {
+ in.nextToken();
+ return n;
+ }
+ throw new AvroTypeException("Unknown symbol in enum " + in.getText());
+ } else {
+ throw error("fixed");
+ }
+ }
+
+ @Override
+ public long readArrayStart() throws IOException {
+ advance(Symbol.ARRAY_START);
+ if (in.getCurrentToken() == JsonToken.START_ARRAY) {
+ in.nextToken();
+ return doArrayNext();
+ } else {
+ throw error("array-start");
+ }
+ }
+
+ @Override
+ public long arrayNext() throws IOException {
+ advance(Symbol.ITEM_END);
+ return doArrayNext();
+ }
+
+ private long doArrayNext() throws IOException {
+ if (in.getCurrentToken() == JsonToken.END_ARRAY) {
+ parser.advance(Symbol.ARRAY_END);
+ in.nextToken();
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ public long skipArray() throws IOException {
+ advance(Symbol.ARRAY_START);
+ if (in.getCurrentToken() == JsonToken.START_ARRAY) {
+ in.skipChildren();
+ in.nextToken();
+ advance(Symbol.ARRAY_END);
+ } else {
+ throw error("array-start");
+ }
+ return 0;
+ }
+
+ @Override
+ public long readMapStart() throws IOException {
+ advance(Symbol.MAP_START);
+ if (in.getCurrentToken() == JsonToken.START_OBJECT) {
+ in.nextToken();
+ return doMapNext();
+ } else {
+ throw error("map-start");
+ }
+ }
+
+ @Override
+ public long mapNext() throws IOException {
+ advance(Symbol.ITEM_END);
+ return doMapNext();
+ }
+
+ private long doMapNext() throws IOException {
+ if (in.getCurrentToken() == JsonToken.END_OBJECT) {
+ in.nextToken();
+ advance(Symbol.MAP_END);
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ public long skipMap() throws IOException {
+ advance(Symbol.MAP_START);
+ if (in.getCurrentToken() == JsonToken.START_OBJECT) {
+ in.skipChildren();
+ in.nextToken();
+ advance(Symbol.MAP_END);
+ } else {
+ throw error("map-start");
+ }
+ return 0;
+ }
+
+ @Override
+ public int readIndex() throws IOException {
+ advance(Symbol.UNION);
+ Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol();
+
+ String label;
+ final JsonToken currentToken = in.getCurrentToken();
+ if (currentToken == JsonToken.VALUE_NULL) {
+ label = "null";
+ } else if (a.size() == 2 &&
+ ("null".equals(a.getLabel(0)) || "null".equals(a.getLabel(1)))) {
+ label = ("null".equals(a.getLabel(0)) ? a.getLabel(1) : a.getLabel(0));
+ } else if (currentToken == JsonToken.START_OBJECT
+ && in.nextToken() == JsonToken.FIELD_NAME) {
+ label = in.getText();
+ in.nextToken();
+ parser.pushSymbol(Symbol.UNION_END);
+ } else {
+ throw error("start-union");
+ }
+
+ int n = a.findLabel(label);
+ if (n < 0) {
+ // in order to allow the type specification for nullable fields to be avoided, we always choose index 1 (second type)
+ // works if you have only null on index 0, some type on index 1
+ // original code threw a new AvroTypeException("Unknown union branch " + label);
+ n = 1;
+ }
+ parser.pushSymbol(a.getSymbol(n));
+ return n;
+ }
+
+ @Override
+ public Symbol doAction(Symbol input, Symbol top) throws IOException {
+ if (top instanceof Symbol.FieldAdjustAction) {
+ Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
+ String name = fa.fname;
+ if (currentReorderBuffer != null) {
+ List<JsonElement> node = currentReorderBuffer.savedFields.get(name);
+ if (node != null) {
+ currentReorderBuffer.savedFields.remove(name);
+ currentReorderBuffer.origParser = in;
+ in = makeParser(node);
+ return null;
+ }
+ }
+ if (in.getCurrentToken() == JsonToken.FIELD_NAME) {
+ do {
+ String fn = in.getText();
+ in.nextToken();
+ if (name.equals(fn)) {
+ return null;
+ } else {
+ if (currentReorderBuffer == null) {
+ currentReorderBuffer = new ReorderBuffer();
+ }
+ currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in));
+ }
+ } while (in.getCurrentToken() == JsonToken.FIELD_NAME);
+ injectDefaultValueIfAvailable(in, fa.fname);
+ } else {
+ injectDefaultValueIfAvailable(in, fa.fname);
+ }
+ } else if (top == Symbol.FIELD_END) {
+ if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) {
+ in = currentReorderBuffer.origParser;
+ currentReorderBuffer.origParser = null;
+ }
+ } else if (top == Symbol.RECORD_START) {
+ if (in.getCurrentToken() == JsonToken.START_OBJECT) {
+ in.nextToken();
+ reorderBuffers.push(currentReorderBuffer);
+ currentReorderBuffer = null;
+ } else {
+ throw error("record-start");
+ }
+ } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) {
+ if (in.getCurrentToken() == JsonToken.END_OBJECT) {
+ in.nextToken();
+ if (top == Symbol.RECORD_END) {
+ if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) {
+ throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet());
+ }
+ currentReorderBuffer = reorderBuffers.pop();
+ }
+ } else {
+ throw error(top == Symbol.RECORD_END ? "record-end" : "union-end");
+ }
+ } else {
+ throw new AvroTypeException("Unknown action symbol " + top);
+ }
+ return null;
+ }
+
+ private static class JsonElement {
+ public final JsonToken token;
+ public final String value;
+
+ public JsonElement(JsonToken t, String value) {
+ this.token = t;
+ this.value = value;
+ }
+
+ public JsonElement(JsonToken t) {
+ this(t, null);
+ }
+ }
+
+ private static List<JsonElement> getVaueAsTree(JsonParser in) throws IOException {
+ int level = 0;
+ List<JsonElement> result = new ArrayList<JsonElement>();
+ do {
+ JsonToken t = in.getCurrentToken();
+ switch (t) {
+ case START_OBJECT:
+ case START_ARRAY:
+ level++;
+ result.add(new JsonElement(t));
+ break;
+ case END_OBJECT:
+ case END_ARRAY:
+ level--;
+ result.add(new JsonElement(t));
+ break;
+ case FIELD_NAME:
+ case VALUE_STRING:
+ case VALUE_NUMBER_INT:
+ case VALUE_NUMBER_FLOAT:
+ case VALUE_TRUE:
+ case VALUE_FALSE:
+ case VALUE_NULL:
+ result.add(new JsonElement(t, in.getText()));
+ break;
+ }
+ in.nextToken();
+ } while (level != 0);
+ result.add(new JsonElement(null));
+ return result;
+ }
+
+ private JsonParser makeParser(final List<JsonElement> elements) throws IOException {
+ return new JsonParser() {
+ int pos = 0;
+
+ @Override
+ public ObjectCodec getCodec() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCodec(ObjectCodec c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JsonToken nextToken() throws IOException {
+ pos++;
+ return elements.get(pos).token;
+ }
+
+ @Override
+ public JsonParser skipChildren() throws IOException {
+ JsonToken tkn = elements.get(pos).token;
+ int level = (tkn == JsonToken.START_ARRAY || tkn == JsonToken.END_ARRAY) ? 1 : 0;
+ while (level > 0) {
+ switch (elements.get(++pos).token) {
+ case START_ARRAY:
+ case START_OBJECT:
+ level++;
+ break;
+ case END_ARRAY:
+ case END_OBJECT:
+ level--;
+ break;
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public boolean isClosed() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getCurrentName() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JsonStreamContext getParsingContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JsonLocation getTokenLocation() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JsonLocation getCurrentLocation() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getText() throws IOException {
+ return elements.get(pos).value;
+ }
+
+ @Override
+ public char[] getTextCharacters() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getTextLength() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getTextOffset() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Number getNumberValue() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NumberType getNumberType() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getIntValue() throws IOException {
+ return Integer.parseInt(getText());
+ }
+
+ @Override
+ public long getLongValue() throws IOException {
+ return Long.parseLong(getText());
+ }
+
+ @Override
+ public BigInteger getBigIntegerValue() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float getFloatValue() throws IOException {
+ return Float.parseFloat(getText());
+ }
+
+ @Override
+ public double getDoubleValue() throws IOException {
+ return Double.parseDouble(getText());
+ }
+
+ @Override
+ public BigDecimal getDecimalValue() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getBinaryValue(Base64Variant b64variant)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JsonToken getCurrentToken() {
+ return elements.get(pos).token;
+ }
+ };
+ }
+
+ private AvroTypeException error(String type) {
+ return new AvroTypeException("Expected " + type +
+ ". Got " + in.getCurrentToken());
+ }
+
+ private static final JsonElement NULL_JSON_ELEMENT = new JsonElement(null);
+
+ private void injectDefaultValueIfAvailable(final JsonParser in, String fieldName) throws IOException {
+ Field field = findField(schema, fieldName);
+
+ boolean isNull = field == null;
+
+ JsonNode defVal = isNull ? NullNode.getInstance() : field.defaultValue();
+ if (defVal == null) {
+ throw new AvroTypeException("Expected field name not found: " + fieldName);
+ }
+
+ List<JsonElement> result = new ArrayList<>(2);
+ JsonParser traverse = defVal.traverse();
+ JsonToken nextToken;
+ while ((nextToken = traverse.nextToken()) != null) {
+ if (nextToken.isScalarValue()) {
+ result.add(new JsonElement(nextToken, traverse.getText()));
+ } else {
+ result.add(new JsonElement(nextToken));
+ }
+ }
+ result.add(NULL_JSON_ELEMENT);
+ if (currentReorderBuffer == null) {
+ currentReorderBuffer = new ReorderBuffer();
+ }
+ currentReorderBuffer.origParser = in;
+ this.in = makeParser(result);
+ }
+
+ private static Field findField(Schema schema, String name) {
+ if (schema.getField(name) != null) {
+ return schema.getField(name);
+ }
+
+ Field foundField = null;
+
+ for (Field field : schema.getFields()) {
+ Schema fieldSchema = field.schema();
+ if (Type.RECORD.equals(fieldSchema.getType())) {
+ foundField = findField(fieldSchema, name);
+ } else if (Type.ARRAY.equals(fieldSchema.getType())) {
+ foundField = findField(fieldSchema.getElementType(), name);
+ } else if (Type.MAP.equals(fieldSchema.getType())) {
+ foundField = findField(fieldSchema.getValueType(), name);
+ }
+
+ if (foundField != null) {
+ return foundField;
+ }
+ }
+
+ return foundField;
+ }
+}
+
|
|
|
KafkaEsque_github: develop deb586f2 2019-02-25 10:44:23 Details Diff |
[ESQUE-12, ESQUE-21] Added configuration options for Producers and consumers; added configurations for topics; used topic configuration to enable use of avro |
Affected Issues 0000012, 0000021 |
|
| mod - pom.xml | Diff File | ||
| add - src/main/java/at/esque/kafka/ConfigHandler.java | Diff File | ||
| mod - src/main/java/at/esque/kafka/ConsumerHandler.java | Diff File | ||
| mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
| add - src/main/java/at/esque/kafka/MessageType.java | Diff File | ||
| mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
| add - src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java | Diff File | ||
|
KafkaEsque_github: develop 6a6d9b73 2019-02-25 13:33:08 Details Diff |
[ESQUE-21] Make Message Types configurable from topicList |
Affected Issues 0000021 |
|
| mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
| add - src/main/java/at/esque/kafka/dialogs/TopicMessageTypeConfigDialog.java | Diff File | ||
|
KafkaEsque_github: develop 50a3d0cf 2019-02-27 07:56:33 Details Diff |
[ESQUE-21] Fix silently dying consumer caused by NPE when consuming null/delete messages |
Affected Issues 0000021 |
|
| mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
| mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
|
KafkaEsque_github: develop fc2653dd 2019-02-27 08:23:14 Details Diff |
[ESQUE-21] Check if schemaRegistry is configured when trying to publish |
Affected Issues 0000021 |
|
| mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
| mod - src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java | Diff File | ||
|
KafkaEsque_github: develop 0a315250 2019-03-04 15:30:14 Details Diff |
[ESQUE-21] display content as String if avro deserialization fails |
Affected Issues 0000021 |
|
| mod - src/main/java/at/esque/kafka/ConsumerHandler.java | Diff File | ||
| add - src/main/java/at/esque/kafka/serialization/ForgivingKafkaAvroDeserializer.java | Diff File | ||
|
KafkaEsque_github: develop 722898b9 2019-03-12 08:17:24 Details Diff |
[ESQUE-12, ESQUE-21] Add ProducerHandler; Add custom serializers |
Affected Issues 0000012, 0000021 |
|
| mod - src/main/java/at/esque/kafka/Controller.java | Diff File | ||
| mod - src/main/java/at/esque/kafka/Main.java | Diff File | ||
| mod - src/main/java/at/esque/kafka/PublisherController.java | Diff File | ||
| add - src/main/java/at/esque/kafka/handlers/ProducerHandler.java | Diff File | ||
| add - src/main/java/at/esque/kafka/handlers/ProducerWrapper.java | Diff File | ||
| add - src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java | Diff File | ||
| add - src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java | Diff File | ||
|
KafkaEsque_github: develop 5e202031 2019-03-30 10:21:50 Details Diff |
[ESQUE-21] Add ExtendedJsonDecoder for more convenient AVRO publishing |
Affected Issues 0000021 |
|
| mod - src/main/java/at/esque/kafka/handlers/ProducerHandler.java | Diff File | ||
| add - src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java | Diff File | ||
| Date Modified | Username | Field | Change |
|---|---|---|---|
| 2019-02-21 09:37 | patschuh | New Issue | |
| 2019-02-25 10:46 | patschuh | Changeset attached | => KafkaEsque_github develop deb586f2 |
| 2019-02-25 10:49 | patschuh | Assigned To | => patschuh |
| 2019-02-25 10:49 | patschuh | Status | new => assigned |
| 2019-02-25 13:33 | patschuh | Changeset attached | => KafkaEsque_github develop 6a6d9b73 |
| 2019-02-27 07:57 | patschuh | Changeset attached | => KafkaEsque_github develop 50a3d0cf |
| 2019-02-27 08:23 | patschuh | Changeset attached | => KafkaEsque_github develop fc2653dd |
| 2019-03-04 15:30 | patschuh | Changeset attached | => KafkaEsque_github develop 0a315250 |
| 2019-03-12 08:17 | patschuh | Changeset attached | => KafkaEsque_github develop 722898b9 |
| 2019-03-18 21:20 | Oliver G. | Note Added: 0000028 | |
| 2019-03-20 15:26 | Oliver G. | File Added: ExtendedJsonDecoder_for_publishing_Avro.patch | |
| 2019-03-20 15:26 | Oliver G. | Note Added: 0000029 | |
| 2019-03-30 10:22 | patschuh | Changeset attached | => KafkaEsque_github develop 5e202031 |
| 2019-11-25 07:50 | patschuh | Status | assigned => closed |
| 2019-11-25 07:50 | patschuh | Resolution | open => fixed |