TweetDeserializer.java
package kafka_tweetoscope.tweetsSerializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.twitter.clientlib.model.Tweet;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.time.format.DateTimeFormatter;
public class TweetDeserializer implements Deserializer<Tweet> {
private final Gson gson = new GsonBuilder()
.registerTypeAdapter(OffsetDateTime.class, new TypeAdapter<OffsetDateTime>() {
@Override
public void write(JsonWriter out, OffsetDateTime value) throws IOException {
out.value(value.format(DateTimeFormatter.ISO_INSTANT));
}
@Override
public OffsetDateTime read(JsonReader in) throws IOException {
return Instant.parse(in.nextString()).atOffset(ZoneOffset.UTC);
}
})
.create();
@Override
public Tweet deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
String json = new String(data, StandardCharsets.UTF_8);
return gson.fromJson(json, Tweet.class);
}
}