Я пытаюсь отправить событие avro на esper с помощью сокетов.Я использую класс EsperIOSocketAdapter, описанный в Esper-IO , он отлично работает с событиями POJO и Map, но я не могу заставить его работать с событиями avro.
Поэтому в основном мне нужно отправить событие, представленное Avro, в адаптер сокетов Esper.
Я использую Esper 7.1.0
Это моя инициализация Esper, я используюсобытие с именем «PersonEvent» с двумя полями, «name» типа String и «age» типа int:
Configuration conf = new Configuration();
EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider(conf);
//event Avro
ConfigurationEventTypeAvro avroType = new ConfigurationEventTypeAvro();
Schema schema = new Schema.Parser().parse("{"+
"\"type\": \"record\","+
"\"name\": \"PersonEvent\","+
"\"fields\":["+
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"age\", \"type\": \"int\"}" +
"]" +
"}");
avroType.setAvroSchema(schema);
engine.getEPAdministrator().
getConfiguration().
addEventTypeAvro("PersonEvent", avroType);
Я также добавил оператор для просмотра входящих событий esper:
String epl = "select name, age from PersonEvent";
EPStatement statement = engine.getEPAdministrator().createEPL(epl);
statement.addListener((newData, oldData) -> {
String name = (String) newData[0].get("name");
int age = (int) newData[0].get("age");
System.out.println(String.format("Name: %s, Age: %d", name, age));
});
Это инициализация Socket-адаптера:
ConfigurationSocketAdapter adapterConfig = new ConfigurationSocketAdapter();
SocketConfig socket = new SocketConfig();
socket.setDataType(DataType.OBJECT);
socket.setPort(8085);
adapterConfig.getSockets().put("SocketService", socket);
EsperIOSocketAdapter socketAdapter = new EsperIOSocketAdapter(adapterConfig, "default");
socketAdapter.start();
Наконец, чтобы отправить событие на socker, я попробовал несколько вещей:
Первая попытка (отлично работает с событиями POJO,но не для Avro):
Socket requestSocket = new Socket("localhost", 8085);
ObjectOutputStream out = new ObjectOutputStream(requestSocket.getOutputStream());
out.writeObject(new PersonEvent("John", 23));
out.flush();
out.close();
requestSocket.close();
Вторая попытка (отлично работает с событиями Map, но не для Avro):
Socket requestSocket = new Socket("localhost", 8085);
ObjectOutputStream out = new ObjectOutputStream(requestSocket.getOutputStream());
Map<String,Object> map = new HashMap<String,Object>();
map.put("stream","PersonEvent");
map.put("name","John");
map.put("age",10);
out.writeObject(map);
out.flush();
out.close();
requestSocket.close();
Третья попытка (найдена в Интернете, но не работает):
Schema schema = new Schema.Parser().parse("{"+
"\"type\": \"record\","+
"\"name\": \"PersonEvent\","+
"\"fields\":["+
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"age\", \"type\": \"int\"}" +
"]" +
"}");
//Here also tried with ObjectOutputStream, like first try
OutputStream outToServer = requestSocket.getOutputStream();
EncoderFactory enc=new EncoderFactory();
//Here I also tried with JsonEncoder
BinaryEncoder binaryEncoder=enc.binaryEncoder(outToServer,null);
DatumWriter datumWriter = new GenericDatumWriter(schema);
GenericData.Record event = new GenericData.Record(schema);
event.put("name", "Joshn");
event.put("age", 10);
datumWriter.write(event,binaryEncoder);
binaryEncoder.flush();
Большое спасибо.