Как отправить авро события на эспер с помощью сокетов? - PullRequest
0 голосов
/ 29 декабря 2018

Я пытаюсь отправить событие avro на esper с помощью сокетов.Я использую класс EsperIOSocketAdapter, описанный в Esper-IO , он отлично работает с событиями POJO и Map, но я не могу заставить его работать с событиями avro.

Поэтому в основном мне нужно отправить событие, представленное Avro, в адаптер сокетов Esper.

Я использую Esper 7.1.0

Это моя инициализация Esper, я используюсобытие с именем «PersonEvent» с двумя полями, «name» типа String и «age» типа int:

Configuration conf = new Configuration();
EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider(conf);

//event Avro
ConfigurationEventTypeAvro avroType = new ConfigurationEventTypeAvro();
Schema schema = new Schema.Parser().parse("{"+
    "\"type\": \"record\","+
    "\"name\": \"PersonEvent\","+
    "\"fields\":["+
    "{\"name\": \"name\", \"type\": \"string\"}," +
    "{\"name\": \"age\",  \"type\": \"int\"}" +
    "]" +
    "}");
avroType.setAvroSchema(schema);

engine.getEPAdministrator().
    getConfiguration().
    addEventTypeAvro("PersonEvent", avroType);

Я также добавил оператор для просмотра входящих событий esper:

String epl = "select name, age from PersonEvent";
EPStatement statement = engine.getEPAdministrator().createEPL(epl);

statement.addListener((newData, oldData) -> {
    String name = (String) newData[0].get("name");
    int age = (int) newData[0].get("age");
    System.out.println(String.format("Name: %s, Age: %d", name, age));
});

Это инициализация Socket-адаптера:

ConfigurationSocketAdapter adapterConfig = new ConfigurationSocketAdapter();
SocketConfig socket = new SocketConfig();
socket.setDataType(DataType.OBJECT);
socket.setPort(8085);
adapterConfig.getSockets().put("SocketService", socket);

EsperIOSocketAdapter socketAdapter = new EsperIOSocketAdapter(adapterConfig, "default");
socketAdapter.start();

Наконец, чтобы отправить событие на socker, я попробовал несколько вещей:

Первая попытка (отлично работает с событиями POJO,но не для Avro):

Socket requestSocket = new Socket("localhost", 8085);
ObjectOutputStream out = new ObjectOutputStream(requestSocket.getOutputStream());
out.writeObject(new PersonEvent("John", 23));
out.flush();

out.close();
requestSocket.close();

Вторая попытка (отлично работает с событиями Map, но не для Avro):

Socket requestSocket = new Socket("localhost", 8085);
ObjectOutputStream out = new ObjectOutputStream(requestSocket.getOutputStream());
Map<String,Object> map = new HashMap<String,Object>();
map.put("stream","PersonEvent");
map.put("name","John");
map.put("age",10); 
out.writeObject(map);
out.flush();

out.close();
requestSocket.close();

Третья попытка (найдена в Интернете, но не работает):

Schema schema = new Schema.Parser().parse("{"+
    "\"type\": \"record\","+
    "\"name\": \"PersonEvent\","+
    "\"fields\":["+
    "{\"name\": \"name\", \"type\": \"string\"}," +
    "{\"name\": \"age\",  \"type\": \"int\"}" +
    "]" +
    "}");

//Here also tried with ObjectOutputStream, like first try
OutputStream outToServer = requestSocket.getOutputStream();
EncoderFactory enc=new EncoderFactory();
//Here I also tried with JsonEncoder
BinaryEncoder binaryEncoder=enc.binaryEncoder(outToServer,null);
DatumWriter datumWriter = new GenericDatumWriter(schema);

GenericData.Record event = new GenericData.Record(schema);
event.put("name", "Joshn");
event.put("age", 10);
datumWriter.write(event,binaryEncoder);
binaryEncoder.flush();

Большое спасибо.

1 Ответ

0 голосов
/ 30 декабря 2018

Не думаю, что адаптер берет авро.Вы можете изменить его или написать свой.

...