Kafka消息序列化和反序列化

Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。

首先我们通过一段示例代码来看下普通情况下Kafka Producer如何编写:

public class ProducerJavaDemo {
    public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("client.id", "hidden-producer-client-id-1");
        properties.put("bootstrap.servers", brokerList);

        Producer<String,String> producer = new KafkaProducer<String,String>(properties);

        while (true) {
            String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
            try {
                Future<RecordMetadata> future =  producer.send(producerRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.print(metadata.offset()+"    ");
                        System.out.print(metadata.topic()+"    ");
                        System.out.println(metadata.partition());
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

这里采用的客户端不是0.8.x.x时代的Scala版本,而是Java编写的新Kafka Producer, 相应的Maven依赖如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

上面的程序中使用的是Kafka客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三种方法:

  1. public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  2. public byte[] serialize(String topic, T data):用来执行序列化。
  3. public void close():用来关闭当前序列化器。一般情况下这个方法都是个空方法,如果实现了此方法,必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。

下面我们来看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具体实现,源码如下:

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

首先看下StringSerializer中的configure(Map configs, boolean isKey)方法,这个方法的执行是在创建KafkaProducer实例的时候调用的,即执行代码Producer producer = new KafkaProducer(properties)时调用,主要用来确定编码类型,不过一般key.serializer.encoding或serializer.encoding都不会配置,更确切的来说在Kafka Producer Configs列表里都没有此项,所以一般情况下encoding的值就是UTF-8。serialize(String topic, String data)方法非常的直观,就是将String类型的data转为byte[]类型即可。

如果Kafka自身提供的诸如String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long这些类型的Serializer都不能满足需求,读者可以选择使用如Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具来实现,亦或者是使用自定义类型的Serializer来实现。下面就以一个简单的例子来介绍下如何自定义类型的使用方法。

假设我们要发送的消息都是Company对象,这个Company的定义很简单,只有名称name和地址address,具体如下:

public class Company {
    private String name;
    private String address;
    //省略Getter, Setter, Constructor & toString方法
}

接下去我们来实现Company类型的Serializer,即下面代码示例中的DemoSerializer。

package com.hidden.client;
public class DemoSerializer implements Serializer<Company> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }
    public void close() {}
}

使用时只需要在Kafka Producer的config中修改value.serializer属性即可,示例如下:

properties.put("value.serializer", "com.hidden.client.DemoSerializer");
//记得也要将相应的String类型改为Company类型,如:
//Producer<String,Company> producer = new KafkaProducer<String,Company>(properties);
//Company company = new Company();
//company.setName("hidden.cooperation-" + new Date().getTime());
//company.setAddress("Shanghai, China");
//ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);

示例中只修改了value.serializer,而key.serializer和value.serializer没有什么区别,如果有真实需要,修改以下也未尝不可。

有序列化就会有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起来只需要配置一下key.deserializer和value.deseriaizer。对应上面自定义的Company类型的Deserializer就需要实现org.apache.kafka.common.serialization.Deserializer接口,这个接口同样有三个方法:

  1. public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  2. public byte[] serialize(String topic, T data):用来执行反序列化。如果data为null建议处理的时候直接返回null而不是抛出一个异常。
  3. public void close():用来关闭当前序列化器。

下面就来看一下DemoSerializer对应的反序列化的DemoDeserializer,详细代码如下:

public class DemoDeserializer implements Deserializer<Company> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        if (data.length < 8) {
            throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressLen);
        try {
            name = new String(nameBytes, "UTF-8");
            address = new String(addressBytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error occur when deserializing!");
        }
        return new Company(name,address);
    }
    public void close() {}
}

有些读者可能对新版的Consumer不是很熟悉,这里顺带着举一个完整的消费示例,并以DemoDeserializer作为消息Value的反序列化器。

Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", consumerGroup);
properties.put("session.timeout.ms", 10000);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.hidden.client.DemoDeserializer");
properties.put("client.id", "hidden-consumer-client-id-zzh-2");
KafkaConsumer<String, Company> consumer = new KafkaConsumer<String, Company>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
    while (true) {
        ConsumerRecords<String, Company> records = consumer.poll(100);
        for (ConsumerRecord<String, Company> record : records) {
            String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
            System.out.println(info);
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    String error = String.format("Commit failed for offsets {}", offsets, exception);
                    System.out.println(error);
                }
            }
        });
    }
} finally {
    consumer.close();
}

有些时候自定义的类型还可以和Avro、ProtoBuf等联合使用,而且这样更加的方便快捷,比如我们将前面Company的Serializer和Deserializer用Protostuff包装一下,由于篇幅限制,笔者这里只罗列出对应的serialize和deserialize方法,详细参考如下:

public byte[] serialize(String topic, Company data) {
    if (data == null) {
        return null;
    }
    Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    byte[] protostuff = null;
    try {
        protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
    return protostuff;
}

public Company deserialize(String topic, byte[] data) {
    if (data == null) {
        return null;
    }
    Schema schema = RuntimeSchema.getSchema(Company.class);
    Company ans = new Company();
    ProtostuffIOUtil.mergeFrom(data, ans, schema);
    return ans;
}

如果Company的字段很多,我们使用Protostuff进一步封装一下的方式就显得简洁很多。不过这个不是最主要的,而最主要的是经过Protostuff包装之后,这个Serializer和Deserializer可以向前兼容(新加字段采用默认值)和向后兼容(忽略新加字段),这个特性Avro和Protobuf也都具备。

自定义的类型有一个不得不面对的问题就是Kafka Producer和Kafka Consumer之间的序列化和反序列化的兼容性,试想对于StringSerializer来说,Kafka Consumer可以顺其自然的采用StringDeserializer,不过对于Company这种专用类型,某个服务使用DemoSerializer进行了序列化之后,那么下游的消费者服务必须也要实现对应的DemoDeserializer。再者,如果上游的Company类型改变,下游也需要跟着重新实现一个新的DemoSerializer,这个后面所面临的难题可想而知。所以,如无特殊需要,笔者不建议使用自定义的序列化和反序列化器;如有业务需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包装,尽可能的实现得更加通用且向前后兼容。

题外话,对于Kafka的“深耕者”Confluent来说,还有其自身的一套序列化和反序列化解决方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相关资料,读者如有兴趣可以自行扩展学习。



参考资料

  1. protostuff序列化/反序列化


PS:消息中间件(Kafka、RabbitMQ)交流可加微信:hiddenzzh
欢迎支持笔者新书:《RabbitMQ实战指南》以及关注微信公众号:Kafka技术专栏。

时间: 2024-10-29 05:45:25

Kafka消息序列化和反序列化的相关文章

.NET对象的XML序列化和反序列化

 序列化的概念 序列化是指一个对象的实例可以被保存,保存成一个二进制串,当然,一旦被保存成二进制串,那么也可以保存成文本串了.比如,一个计数器,数值为2,我们可以用字符串"2"表示.如果有个对象,叫做connter,当前值为2,那么可以序列化成"2",反向的,也可以从"2"得到值为2的计数器实例.这样,关机时序列化它,开机时反序列化它,每次开机都是延续的.不会都是从头开始. 序列化概念的提出和实现,可以使我们的应用程序的设置信息保存和读取更加方便

[Java开发之路](9)对象序列化与反序列化

1. 对象序列化 当你创建对象时,只要你需要,它会一直存在,但是程序终止时,无论何时它都不会继续存在.尽管这样做是非常有意义的,但是在某些情况下,如果程序不运行时扔能存在并且保存其信息,那将对我们非常有用.这样,在下次程序运行时,该对象将被重建并且拥有的信息与程序上次运行时它所拥有的信息相同.当然,我们也可以通过将信息写入文件或者数据库,但是如果能将一个对象声明为是"持久性"的,并为我们处理掉所有的细节,这将会显得十分方便. Java的序列化是将那些实现了Serializable接口的

C#序列化与反序列化(Serializable and Deserialize)

下面进行验证 将Person的Name属性改成Private,然后查看生成的personInfo.text,其内容如下: <?xml version="1.0"?><Person xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">  <Sno>200719</

一起谈.NET技术,C#序列化与反序列化(Serializable and Deserialize)

     序列化是指将对象实例的状态存储到存储媒体的过程.在此过程中,先将对象的公共字段和私有字段以及类的名称(包括类所在的程序集)转换为字节流,然后再把字节流写入数据流.在随后对对象进行反序列化时,将创建出与原对象完全相同的副本.      我们经常需要将对象的字段值保存到磁盘中,并在以后检索此数据.尽管不使用序列化也能完成这项工作,但这种方法通常很繁琐而且容易出错,并且在需要跟踪对象的层次结构时,会变得越来越复杂.可以想象一下编写包含大量对象的大型业务应用程序的情形,程序员不得不为每一个对象

C#对象序列化和反序列化

C#对象序列化和反序列化,如下代码示例: using System;  using System.Text;  using System.Collections.Generic;  using System.IO;  using System.Runtime.Serialization.Formatters.Binary;    class SerializableOperate  {      private static void ObjectSerializable(object obj,

ASP.NET中JSON的序列化和反序列化

导读:JSON是专门为浏览器中的网页上运行的JavaScript代码而设计的一种数据格式.在网站应用中使用JSON的场景越来越多,本文介绍 ASP.NET中JSON的序列化和反序列化,主要对JSON的简单介绍,ASP.NET如何序列化和反序列化的处理,在序列化和反序列化对日期时间.集合.字典的处理. 一.JSON简介 JSON(JavaScript Object Notation,JavaScript对象表示法)是一种轻量级的数据交换格式. JSON是"名值对"的集合.结构由大括号'{

通过序列化和反序列化泛型数据实体集合来实现持久化数据对象的方法

对象|集合|数据 通过序列化和反序列化泛型数据实体集合来实现持久化数据对象的方法 我们在平时使用数据库的时候,经常会碰到一个问题,就是不希望数据实体对象插入数据库中, 却有想持久化的时候,那么就可以用序列化成 XML字符串,来保存到其他地方,由于生成的是字符串,所以可以保存到任意我们想保存的地方.比如 asp.net的ViewState,cookie,cache等. 首先,我们定义一个数据实体类.     class Entity    {        public Entity()     

Asp.Net Forums中对.Net中序列化和反序列化的应用

asp.net 在Forums中,有些内容是不固定的,例如用户资料,除了一些基本资料,可能还要有一些其他资料信息,例如MSN.个人主页.签名档等,一般对于这样的都是每一个属性对应于数据库中的一个字段.但是如果以后我们因为需要增加一些属性,例如QQ号.Blog地址等,如果还是用这种增加数据表字段的方法,那么将会频繁的修改数据库表结构.存储过程.数据库访问的程序. 或许您也遇到过类似问题,看Forums中是怎么借用.Net的序列化和反序列化来解决的:例如我需要在用户资料里面增加QQ号这个属性,那么我

C#实现对象的Xml格式序列化及反序列化

xml|对象   要序列化的对象的类: [Serializable]public class Person{private string name;public string Name{get{return name;}set{name=value;}}public string Sex;public int Age=31;public Course[] Courses; public Person(){}public Person(string Name){name=Name;Sex="男&q