TweetSerializer.java
package kafka_tweetoscope.tweetsSerializer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.apache.kafka.common.serialization.Serializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.twitter.clientlib.model.Tweet;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.time.format.DateTimeFormatter;
public class TweetSerializer implements Serializer<Tweet> {
private final Gson gson = new GsonBuilder()
.setPrettyPrinting()
.registerTypeAdapter(OffsetDateTime.class, new TypeAdapter<OffsetDateTime>() {
@Override
public void write(JsonWriter out, OffsetDateTime value) throws IOException {
out.value(value.format(DateTimeFormatter.ISO_INSTANT));
}
@Override
public OffsetDateTime read(JsonReader in) throws IOException {
return Instant.parse(in.nextString()).atOffset(ZoneOffset.UTC);
}
})
.create();
@Override
public byte[] serialize(String topic, Tweet tweet) {
return tweet == null ? null : gson.toJson(tweet).getBytes(StandardCharsets.UTF_8);
}
}