TweetDeserializer.java

package kafka_tweetoscope.tweetsSerializer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.twitter.clientlib.model.Tweet;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.time.format.DateTimeFormatter;

public class TweetDeserializer implements Deserializer<Tweet> {
    private final Gson gson = new GsonBuilder()
        .registerTypeAdapter(OffsetDateTime.class, new TypeAdapter<OffsetDateTime>() {
            @Override
            public void write(JsonWriter out, OffsetDateTime value) throws IOException {
                out.value(value.format(DateTimeFormatter.ISO_INSTANT));
            }

            @Override
            public OffsetDateTime read(JsonReader in) throws IOException {
                return Instant.parse(in.nextString()).atOffset(ZoneOffset.UTC);
            }
        })
        .create();

    @Override
    public Tweet deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        String json = new String(data, StandardCharsets.UTF_8);
        return gson.fromJson(json, Tweet.class);
    }
}