TweetSerializer.java

package kafka_tweetoscope.tweetsSerializer;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;

import org.apache.kafka.common.serialization.Serializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import com.twitter.clientlib.model.Tweet;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;

import java.time.format.DateTimeFormatter;

public class TweetSerializer implements Serializer<Tweet> {
    private final Gson gson = new GsonBuilder()
		.setPrettyPrinting()
        .registerTypeAdapter(OffsetDateTime.class, new TypeAdapter<OffsetDateTime>() {
            @Override
            public void write(JsonWriter out, OffsetDateTime value) throws IOException {
                out.value(value.format(DateTimeFormatter.ISO_INSTANT));
            }

            @Override
            public OffsetDateTime read(JsonReader in) throws IOException {
                return Instant.parse(in.nextString()).atOffset(ZoneOffset.UTC);
            }
        })
        .create();

    @Override
    public byte[] serialize(String topic, Tweet tweet) {
        return tweet == null ? null : gson.toJson(tweet).getBytes(StandardCharsets.UTF_8);
    }
}