View Issue Details

IDProjectCategoryView StatusLast Update
0000021KafkaEsqueFeature Requestpublic2019-11-25 07:50
ReporterpatschuhAssigned Topatschuh 
PriorityhighSeverityfeatureReproducibilityhave not tried
Status closedResolutionfixed 
Product Version 
Target VersionFixed in Version 
Summary0000021: Add abbility to serialize and deserialize AVRO
Description

It should be configurable on a per topic level if the messages in the topic are serialized in avro

TagsNo tags attached.

Activities

Oliver G.

Oliver G.

2019-03-18 21:20

reporter   ~0000028

Please add an Extended JsonDecoder so that it is possible to publish Avro Messages by using the JSON format without the need for extra type definitions.

Oliver G.

Oliver G.

2019-03-20 15:26

reporter   ~0000029

Please apply following patch on your develop branch.



ExtendedJsonDecoder_for_publishing_Avro.patch (29,500 bytes)
Index: src/main/java/at/esque/kafka/handlers/ProducerHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/at/esque/kafka/handlers/ProducerHandler.java	(revision fd97d0bbeae497baa1fcc16818c7ff5af690b484)
+++ src/main/java/at/esque/kafka/handlers/ProducerHandler.java	(date 1553095428179)
@@ -3,6 +3,7 @@
 import at.esque.kafka.MessageType;
 import at.esque.kafka.cluster.ClusterConfig;
 import at.esque.kafka.cluster.TopicMessageTypeConfig;
+import at.esque.kafka.serialization.ExtendedJsonDecoder;
 import at.esque.kafka.serialization.KafkaEsqueSerializer;
 import com.google.inject.Inject;
 import io.confluent.kafka.schemaregistry.client.rest.RestService;
@@ -11,8 +12,7 @@
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.io.Decoder;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -124,7 +124,7 @@
 
         org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.getSchema());
 
-        JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(avroSchema, json);
+        Decoder jsonDecoder = new ExtendedJsonDecoder(avroSchema, json);
         DatumReader<GenericRecord> reader = new GenericDatumReader<>(avroSchema);
 
         return reader.read(null, jsonDecoder);
Index: src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java	(date 1553095428169)
+++ src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java	(date 1553095428169)
@@ -0,0 +1,783 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package at.esque.kafka.serialization;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ParsingDecoder;
+import org.apache.avro.io.parsing.JsonGrammarGenerator;
+import org.apache.avro.io.parsing.Parser;
+import org.apache.avro.io.parsing.Symbol;
+import org.apache.avro.util.Utf8;
+import org.codehaus.jackson.Base64Variant;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonLocation;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonStreamContext;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.ObjectCodec;
+import org.codehaus.jackson.node.NullNode;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * Original code is from: https://github.com/Celos/avro-json-decoder
+ *
+ * Additional change for KafkaEsque: a NullNode.getInstance() is returned for fields that are missing from the JSON,
+ * meaning they are not explicitly set to null in the JSON.
+ *
+ * A {@link Decoder} for Avro's JSON data encoding.
+ * </p>
+ * Construct using {@link DecoderFactory}.
+ * </p>
+ * ExtendedJsonDecoder is not thread-safe.
+ * <p>
+ * Based on {@link org.apache.avro.io.JsonDecoder JsonDecoder}
+ * and <a href="https://github.com/zolyfarkas/avro">ExtendedJsonDecoder</a>.
+ * Infers default arguments, if they are not present.
+ * </p>
+ **/
+public class ExtendedJsonDecoder extends ParsingDecoder
+        implements Parser.ActionHandler {
+    private JsonParser in;
+    private static JsonFactory jsonFactory = new JsonFactory();
+    Stack<ReorderBuffer> reorderBuffers = new Stack<ReorderBuffer>();
+    ReorderBuffer currentReorderBuffer;
+
+    private final Schema schema;
+
+    private static class ReorderBuffer {
+        public Map<String, List<JsonElement>> savedFields = new HashMap<String, List<JsonElement>>();
+        public JsonParser origParser = null;
+    }
+
+    static final String CHARSET = "ISO-8859-1";
+
+    public ExtendedJsonDecoder(Schema schema, InputStream in) throws IOException {
+        super(getSymbol(schema));
+        configure(in);
+        this.schema = schema;
+    }
+
+    public ExtendedJsonDecoder(Schema schema, String in) throws IOException {
+        super(getSymbol(schema));
+        configure(in);
+        this.schema = schema;
+    }
+
+    private static Symbol getSymbol(Schema schema) {
+        if (null == schema) {
+            throw new NullPointerException("Schema cannot be null!");
+        }
+        return new JsonGrammarGenerator().generate(schema);
+    }
+
+    /**
+     * Reconfigures this JsonDecoder to use the InputStream provided.
+     * <p/>
+     * If the InputStream provided is null, a NullPointerException is thrown.
+     * <p/>
+     * Otherwise, this JsonDecoder will reset its state and then
+     * reconfigure its input.
+     * @param in
+     *   The IntputStream to read from. Cannot be null.
+     * @throws IOException
+     * @return this JsonDecoder
+     */
+    public ExtendedJsonDecoder configure(InputStream in) throws IOException {
+        if (null == in) {
+            throw new NullPointerException("InputStream to read from cannot be null!");
+        }
+        parser.reset();
+        this.in = jsonFactory.createJsonParser(in);
+        this.in.nextToken();
+        return this;
+    }
+
+    /**
+     * Reconfigures this JsonDecoder to use the String provided for input.
+     * <p/>
+     * If the String provided is null, a NullPointerException is thrown.
+     * <p/>
+     * Otherwise, this JsonDecoder will reset its state and then
+     * reconfigure its input.
+     * @param in
+     *   The String to read from. Cannot be null.
+     * @throws IOException
+     * @return this JsonDecoder
+     */
+    public ExtendedJsonDecoder configure(String in) throws IOException {
+        if (null == in) {
+            throw new NullPointerException("String to read from cannot be null!");
+        }
+        parser.reset();
+        this.in = new JsonFactory().createJsonParser(in);
+        this.in.nextToken();
+        return this;
+    }
+
+    private void advance(Symbol symbol) throws IOException {
+        this.parser.processTrailingImplicitActions();
+        if (in.getCurrentToken() == null && this.parser.depth() == 1)
+            throw new EOFException();
+        parser.advance(symbol);
+    }
+
+    @Override
+    public void readNull() throws IOException {
+        advance(Symbol.NULL);
+        if (in.getCurrentToken() == JsonToken.VALUE_NULL) {
+            in.nextToken();
+        } else {
+            throw error("null");
+        }
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException {
+        advance(Symbol.BOOLEAN);
+        JsonToken t = in.getCurrentToken();
+        if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) {
+            in.nextToken();
+            return t == JsonToken.VALUE_TRUE;
+        } else {
+            throw error("boolean");
+        }
+    }
+
+    @Override
+    public int readInt() throws IOException {
+        advance(Symbol.INT);
+        if (in.getCurrentToken().isNumeric()) {
+            int result = in.getIntValue();
+            in.nextToken();
+            return result;
+        } else {
+            throw error("int");
+        }
+    }
+
+    @Override
+    public long readLong() throws IOException {
+        advance(Symbol.LONG);
+        if (in.getCurrentToken().isNumeric()) {
+            long result = in.getLongValue();
+            in.nextToken();
+            return result;
+        } else {
+            throw error("long");
+        }
+    }
+
+    @Override
+    public float readFloat() throws IOException {
+        advance(Symbol.FLOAT);
+        if (in.getCurrentToken().isNumeric()) {
+            float result = in.getFloatValue();
+            in.nextToken();
+            return result;
+        } else {
+            throw error("float");
+        }
+    }
+
+    @Override
+    public double readDouble() throws IOException {
+        advance(Symbol.DOUBLE);
+        if (in.getCurrentToken().isNumeric()) {
+            double result = in.getDoubleValue();
+            in.nextToken();
+            return result;
+        } else {
+            throw error("double");
+        }
+    }
+
+    @Override
+    public Utf8 readString(Utf8 old) throws IOException {
+        return new Utf8(readString());
+    }
+
+    @Override
+    public String readString() throws IOException {
+        advance(Symbol.STRING);
+        if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
+            parser.advance(Symbol.MAP_KEY_MARKER);
+            if (in.getCurrentToken() != JsonToken.FIELD_NAME) {
+                throw error("map-key");
+            }
+        } else {
+            if (in.getCurrentToken() != JsonToken.VALUE_STRING) {
+                throw error("string");
+            }
+        }
+        String result = in.getText();
+        in.nextToken();
+        return result;
+    }
+
+    @Override
+    public void skipString() throws IOException {
+        advance(Symbol.STRING);
+        if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
+            parser.advance(Symbol.MAP_KEY_MARKER);
+            if (in.getCurrentToken() != JsonToken.FIELD_NAME) {
+                throw error("map-key");
+            }
+        } else {
+            if (in.getCurrentToken() != JsonToken.VALUE_STRING) {
+                throw error("string");
+            }
+        }
+        in.nextToken();
+    }
+
+    @Override
+    public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+        advance(Symbol.BYTES);
+        if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+            byte[] result = readByteArray();
+            in.nextToken();
+            return ByteBuffer.wrap(result);
+        } else {
+            throw error("bytes");
+        }
+    }
+
+    private byte[] readByteArray() throws IOException {
+        byte[] result = in.getText().getBytes(CHARSET);
+        return result;
+    }
+
+    @Override
+    public void skipBytes() throws IOException {
+        advance(Symbol.BYTES);
+        if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+            in.nextToken();
+        } else {
+            throw error("bytes");
+        }
+    }
+
+    private void checkFixed(int size) throws IOException {
+        advance(Symbol.FIXED);
+        Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
+        if (size != top.size) {
+            throw new AvroTypeException(
+                    "Incorrect length for fixed binary: expected " +
+                            top.size + " but received " + size + " bytes.");
+        }
+    }
+
+    @Override
+    public void readFixed(byte[] bytes, int start, int len) throws IOException {
+        checkFixed(len);
+        if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+            byte[] result = readByteArray();
+            in.nextToken();
+            if (result.length != len) {
+                throw new AvroTypeException("Expected fixed length " + len
+                        + ", but got" + result.length);
+            }
+            System.arraycopy(result, 0, bytes, start, len);
+        } else {
+            throw error("fixed");
+        }
+    }
+
+    @Override
+    public void skipFixed(int length) throws IOException {
+        checkFixed(length);
+        doSkipFixed(length);
+    }
+
+    private void doSkipFixed(int length) throws IOException {
+        if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+            byte[] result = readByteArray();
+            in.nextToken();
+            if (result.length != length) {
+                throw new AvroTypeException("Expected fixed length " + length
+                        + ", but got" + result.length);
+            }
+        } else {
+            throw error("fixed");
+        }
+    }
+
+    @Override
+    protected void skipFixed() throws IOException {
+        advance(Symbol.FIXED);
+        Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
+        doSkipFixed(top.size);
+    }
+
+    @Override
+    public int readEnum() throws IOException {
+        advance(Symbol.ENUM);
+        Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
+        if (in.getCurrentToken() == JsonToken.VALUE_STRING) {
+            in.getText();
+            int n = top.findLabel(in.getText());
+            if (n >= 0) {
+                in.nextToken();
+                return n;
+            }
+            throw new AvroTypeException("Unknown symbol in enum " + in.getText());
+        } else {
+            throw error("fixed");
+        }
+    }
+
+    @Override
+    public long readArrayStart() throws IOException {
+        advance(Symbol.ARRAY_START);
+        if (in.getCurrentToken() == JsonToken.START_ARRAY) {
+            in.nextToken();
+            return doArrayNext();
+        } else {
+            throw error("array-start");
+        }
+    }
+
+    @Override
+    public long arrayNext() throws IOException {
+        advance(Symbol.ITEM_END);
+        return doArrayNext();
+    }
+
+    private long doArrayNext() throws IOException {
+        if (in.getCurrentToken() == JsonToken.END_ARRAY) {
+            parser.advance(Symbol.ARRAY_END);
+            in.nextToken();
+            return 0;
+        } else {
+            return 1;
+        }
+    }
+
+    @Override
+    public long skipArray() throws IOException {
+        advance(Symbol.ARRAY_START);
+        if (in.getCurrentToken() == JsonToken.START_ARRAY) {
+            in.skipChildren();
+            in.nextToken();
+            advance(Symbol.ARRAY_END);
+        } else {
+            throw error("array-start");
+        }
+        return 0;
+    }
+
+    @Override
+    public long readMapStart() throws IOException {
+        advance(Symbol.MAP_START);
+        if (in.getCurrentToken() == JsonToken.START_OBJECT) {
+            in.nextToken();
+            return doMapNext();
+        } else {
+            throw error("map-start");
+        }
+    }
+
+    @Override
+    public long mapNext() throws IOException {
+        advance(Symbol.ITEM_END);
+        return doMapNext();
+    }
+
+    private long doMapNext() throws IOException {
+        if (in.getCurrentToken() == JsonToken.END_OBJECT) {
+            in.nextToken();
+            advance(Symbol.MAP_END);
+            return 0;
+        } else {
+            return 1;
+        }
+    }
+
+    @Override
+    public long skipMap() throws IOException {
+        advance(Symbol.MAP_START);
+        if (in.getCurrentToken() == JsonToken.START_OBJECT) {
+            in.skipChildren();
+            in.nextToken();
+            advance(Symbol.MAP_END);
+        } else {
+            throw error("map-start");
+        }
+        return 0;
+    }
+
+    @Override
+    public int readIndex() throws IOException {
+        advance(Symbol.UNION);
+        Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol();
+
+        String label;
+        final JsonToken currentToken = in.getCurrentToken();
+        if (currentToken == JsonToken.VALUE_NULL) {
+            label = "null";
+        } else if (a.size() == 2 &&
+                ("null".equals(a.getLabel(0)) || "null".equals(a.getLabel(1)))) {
+            label = ("null".equals(a.getLabel(0)) ? a.getLabel(1) : a.getLabel(0));
+        } else if (currentToken == JsonToken.START_OBJECT
+                && in.nextToken() == JsonToken.FIELD_NAME) {
+            label = in.getText();
+            in.nextToken();
+            parser.pushSymbol(Symbol.UNION_END);
+        } else {
+            throw error("start-union");
+        }
+
+        int n = a.findLabel(label);
+        if (n < 0) {
+            // in order to allow the type specification for nullable fields to be avoided, we always choose index 1 (second type)
+            // works if you have only null on index 0, some type on index 1
+            // original code threw a new AvroTypeException("Unknown union branch " + label);
+            n = 1;
+        }
+        parser.pushSymbol(a.getSymbol(n));
+        return n;
+    }
+
+    @Override
+    public Symbol doAction(Symbol input, Symbol top) throws IOException {
+        if (top instanceof Symbol.FieldAdjustAction) {
+            Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
+            String name = fa.fname;
+            if (currentReorderBuffer != null) {
+                List<JsonElement> node = currentReorderBuffer.savedFields.get(name);
+                if (node != null) {
+                    currentReorderBuffer.savedFields.remove(name);
+                    currentReorderBuffer.origParser = in;
+                    in = makeParser(node);
+                    return null;
+                }
+            }
+            if (in.getCurrentToken() == JsonToken.FIELD_NAME) {
+                do {
+                    String fn = in.getText();
+                    in.nextToken();
+                    if (name.equals(fn)) {
+                        return null;
+                    } else {
+                        if (currentReorderBuffer == null) {
+                            currentReorderBuffer = new ReorderBuffer();
+                        }
+                        currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in));
+                    }
+                } while (in.getCurrentToken() == JsonToken.FIELD_NAME);
+                injectDefaultValueIfAvailable(in, fa.fname);
+            } else {
+                injectDefaultValueIfAvailable(in, fa.fname);
+            }
+        } else if (top == Symbol.FIELD_END) {
+            if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) {
+                in = currentReorderBuffer.origParser;
+                currentReorderBuffer.origParser = null;
+            }
+        } else if (top == Symbol.RECORD_START) {
+            if (in.getCurrentToken() == JsonToken.START_OBJECT) {
+                in.nextToken();
+                reorderBuffers.push(currentReorderBuffer);
+                currentReorderBuffer = null;
+            } else {
+                throw error("record-start");
+            }
+        } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) {
+            if (in.getCurrentToken() == JsonToken.END_OBJECT) {
+                in.nextToken();
+                if (top == Symbol.RECORD_END) {
+                    if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) {
+                        throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet());
+                    }
+                    currentReorderBuffer = reorderBuffers.pop();
+                }
+            } else {
+                throw error(top == Symbol.RECORD_END ? "record-end" : "union-end");
+            }
+        } else {
+            throw new AvroTypeException("Unknown action symbol " + top);
+        }
+        return null;
+    }
+
+    private static class JsonElement {
+        public final JsonToken token;
+        public final String value;
+
+        public JsonElement(JsonToken t, String value) {
+            this.token = t;
+            this.value = value;
+        }
+
+        public JsonElement(JsonToken t) {
+            this(t, null);
+        }
+    }
+
+    private static List<JsonElement> getVaueAsTree(JsonParser in) throws IOException {
+        int level = 0;
+        List<JsonElement> result = new ArrayList<JsonElement>();
+        do {
+            JsonToken t = in.getCurrentToken();
+            switch (t) {
+                case START_OBJECT:
+                case START_ARRAY:
+                    level++;
+                    result.add(new JsonElement(t));
+                    break;
+                case END_OBJECT:
+                case END_ARRAY:
+                    level--;
+                    result.add(new JsonElement(t));
+                    break;
+                case FIELD_NAME:
+                case VALUE_STRING:
+                case VALUE_NUMBER_INT:
+                case VALUE_NUMBER_FLOAT:
+                case VALUE_TRUE:
+                case VALUE_FALSE:
+                case VALUE_NULL:
+                    result.add(new JsonElement(t, in.getText()));
+                    break;
+            }
+            in.nextToken();
+        } while (level != 0);
+        result.add(new JsonElement(null));
+        return result;
+    }
+
+    private JsonParser makeParser(final List<JsonElement> elements) throws IOException {
+        return new JsonParser() {
+            int pos = 0;
+
+            @Override
+            public ObjectCodec getCodec() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void setCodec(ObjectCodec c) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void close() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public JsonToken nextToken() throws IOException {
+                pos++;
+                return elements.get(pos).token;
+            }
+
+            @Override
+            public JsonParser skipChildren() throws IOException {
+                JsonToken tkn = elements.get(pos).token;
+                int level = (tkn == JsonToken.START_ARRAY || tkn == JsonToken.END_ARRAY) ? 1 : 0;
+                while (level > 0) {
+                    switch (elements.get(++pos).token) {
+                        case START_ARRAY:
+                        case START_OBJECT:
+                            level++;
+                            break;
+                        case END_ARRAY:
+                        case END_OBJECT:
+                            level--;
+                            break;
+                    }
+                }
+                return this;
+            }
+
+            @Override
+            public boolean isClosed() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public String getCurrentName() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public JsonStreamContext getParsingContext() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public JsonLocation getTokenLocation() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public JsonLocation getCurrentLocation() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public String getText() throws IOException {
+                return elements.get(pos).value;
+            }
+
+            @Override
+            public char[] getTextCharacters() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getTextLength() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getTextOffset() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public Number getNumberValue() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public NumberType getNumberType() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getIntValue() throws IOException {
+                return Integer.parseInt(getText());
+            }
+
+            @Override
+            public long getLongValue() throws IOException {
+                return Long.parseLong(getText());
+            }
+
+            @Override
+            public BigInteger getBigIntegerValue() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public float getFloatValue() throws IOException {
+                return Float.parseFloat(getText());
+            }
+
+            @Override
+            public double getDoubleValue() throws IOException {
+                return Double.parseDouble(getText());
+            }
+
+            @Override
+            public BigDecimal getDecimalValue() throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public byte[] getBinaryValue(Base64Variant b64variant)
+                    throws IOException {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public JsonToken getCurrentToken() {
+                return elements.get(pos).token;
+            }
+        };
+    }
+
+    private AvroTypeException error(String type) {
+        return new AvroTypeException("Expected " + type +
+                ". Got " + in.getCurrentToken());
+    }
+
+    private static final JsonElement NULL_JSON_ELEMENT = new JsonElement(null);
+
+    private void injectDefaultValueIfAvailable(final JsonParser in, String fieldName) throws IOException {
+        Field field = findField(schema, fieldName);
+
+        boolean isNull = field == null;
+
+        JsonNode defVal = isNull ? NullNode.getInstance() : field.defaultValue();
+        if (defVal == null) {
+            throw new AvroTypeException("Expected field name not found: " + fieldName);
+        }
+
+        List<JsonElement> result = new ArrayList<>(2);
+        JsonParser traverse = defVal.traverse();
+        JsonToken nextToken;
+        while ((nextToken = traverse.nextToken()) != null) {
+            if (nextToken.isScalarValue()) {
+                result.add(new JsonElement(nextToken, traverse.getText()));
+            } else {
+                result.add(new JsonElement(nextToken));
+            }
+        }
+        result.add(NULL_JSON_ELEMENT);
+        if (currentReorderBuffer == null) {
+            currentReorderBuffer = new ReorderBuffer();
+        }
+        currentReorderBuffer.origParser = in;
+        this.in = makeParser(result);
+    }
+
+    private static Field findField(Schema schema, String name) {
+        if (schema.getField(name) != null) {
+            return schema.getField(name);
+        }
+
+        Field foundField = null;
+
+        for (Field field : schema.getFields()) {
+            Schema fieldSchema = field.schema();
+            if (Type.RECORD.equals(fieldSchema.getType())) {
+                foundField = findField(fieldSchema, name);
+            } else if (Type.ARRAY.equals(fieldSchema.getType())) {
+                foundField = findField(fieldSchema.getElementType(), name);
+            } else if (Type.MAP.equals(fieldSchema.getType())) {
+                foundField = findField(fieldSchema.getValueType(), name);
+            }
+
+            if (foundField != null) {
+                return foundField;
+            }
+        }
+
+        return foundField;
+    }
+}
+

Related Changesets

KafkaEsque_github: develop deb586f2

2019-02-25 10:44:23

patschuh

Details Diff
[ESQUE-12, ESQUE-21] Added configuration options for Producers and consumers; added configurations for topics; used topic configuration to enable use of avro Affected Issues
0000012, 0000021
mod - pom.xml Diff File
add - src/main/java/at/esque/kafka/ConfigHandler.java Diff File
mod - src/main/java/at/esque/kafka/ConsumerHandler.java Diff File
mod - src/main/java/at/esque/kafka/Controller.java Diff File
add - src/main/java/at/esque/kafka/MessageType.java Diff File
mod - src/main/java/at/esque/kafka/PublisherController.java Diff File
add - src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java Diff File

KafkaEsque_github: develop 6a6d9b73

2019-02-25 13:33:08

patschuh

Details Diff
[ESQUE-21] Make Message Types configurable from topicList Affected Issues
0000021
mod - src/main/java/at/esque/kafka/Controller.java Diff File
add - src/main/java/at/esque/kafka/dialogs/TopicMessageTypeConfigDialog.java Diff File

KafkaEsque_github: develop 50a3d0cf

2019-02-27 07:56:33

patschuh

Details Diff
[ESQUE-21] Fix silently dying consumer caused by NPE when consuming null/delete messages Affected Issues
0000021
mod - src/main/java/at/esque/kafka/Controller.java Diff File
mod - src/main/java/at/esque/kafka/PublisherController.java Diff File

KafkaEsque_github: develop fc2653dd

2019-02-27 08:23:14

patschuh

Details Diff
[ESQUE-21] Check if schemaRegistry is configured when trying to publish Affected Issues
0000021
mod - src/main/java/at/esque/kafka/PublisherController.java Diff File
mod - src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java Diff File

KafkaEsque_github: develop 0a315250

2019-03-04 15:30:14

patschuh

Details Diff
[ESQUE-21] display content as String if avro deserialization fails Affected Issues
0000021
mod - src/main/java/at/esque/kafka/ConsumerHandler.java Diff File
add - src/main/java/at/esque/kafka/serialization/ForgivingKafkaAvroDeserializer.java Diff File

KafkaEsque_github: develop 722898b9

2019-03-12 08:17:24

patschuh

Details Diff
[ESQUE-12, ESQUE-21] Add ProducerHandler; Add custom serializers Affected Issues
0000012, 0000021
mod - src/main/java/at/esque/kafka/Controller.java Diff File
mod - src/main/java/at/esque/kafka/Main.java Diff File
mod - src/main/java/at/esque/kafka/PublisherController.java Diff File
add - src/main/java/at/esque/kafka/handlers/ProducerHandler.java Diff File
add - src/main/java/at/esque/kafka/handlers/ProducerWrapper.java Diff File
add - src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java Diff File
add - src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java Diff File

KafkaEsque_github: develop 5e202031

2019-03-30 10:21:50

patschuh

Details Diff
[ESQUE-21] Add ExtendedJsonDecoder for more convenient AVRO publishing Affected Issues
0000021
mod - src/main/java/at/esque/kafka/handlers/ProducerHandler.java Diff File
add - src/main/java/at/esque/kafka/serialization/ExtendedJsonDecoder.java Diff File

Issue History

Date Modified Username Field Change
2019-02-21 09:37 patschuh New Issue
2019-02-25 10:46 patschuh Changeset attached => KafkaEsque_github develop deb586f2
2019-02-25 10:49 patschuh Assigned To => patschuh
2019-02-25 10:49 patschuh Status new => assigned
2019-02-25 13:33 patschuh Changeset attached => KafkaEsque_github develop 6a6d9b73
2019-02-27 07:57 patschuh Changeset attached => KafkaEsque_github develop 50a3d0cf
2019-02-27 08:23 patschuh Changeset attached => KafkaEsque_github develop fc2653dd
2019-03-04 15:30 patschuh Changeset attached => KafkaEsque_github develop 0a315250
2019-03-12 08:17 patschuh Changeset attached => KafkaEsque_github develop 722898b9
2019-03-18 21:20 Oliver G. Note Added: 0000028
2019-03-20 15:26 Oliver G. File Added: ExtendedJsonDecoder_for_publishing_Avro.patch
2019-03-20 15:26 Oliver G. Note Added: 0000029
2019-03-30 10:22 patschuh Changeset attached => KafkaEsque_github develop 5e202031
2019-11-25 07:50 patschuh Status assigned => closed
2019-11-25 07:50 patschuh Resolution open => fixed