Метод print () получает сообщение от потоковой передачи и отправляет его в виде JSON на сервер изasticsearch, индексируя его в строке IndexRequest. Проблема в том, что вам нужно размещать каждое полученное сообщение на каждой странице (id), и я не знаю, как правильно это реализовать.
Необходимо, чтобы каждое сообщение записывалось на каждой странице с возрастающим индексом , Можете ли вы помочь написать этот l oop / method?
public class WebsoketPrinter {
private static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// The default cache size is 100 MB. Change it to 30 MB.
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
private VkStreamingApiClient streamingClient;
public StreamingActor streamingActor;
private VkApiClient vkClient;
WebsoketPrinter() {
TransportClient transportClient = new HttpTransportClient();
streamingClient = new VkStreamingApiClient(transportClient);
vkClient = new VkApiClient(transportClient);
streamingActor = new StreamingActor(this.getServerUrlResponse().getEndpoint(), this.getServerUrlResponse().getKey());
}
private GetServerUrlResponse getServerUrlResponse() {
ConfigReader configReader = new ConfigReader();
Integer appId = Integer.valueOf(configReader.getProp("appId"));
String accessToken = configReader.getProp("accessToken");
ServiceActor serviceActor = new ServiceActor(appId, accessToken);
GetServerUrlResponse getServerUrlResponse = null;
try {
getServerUrlResponse = vkClient.streaming().getServerUrl(serviceActor).execute();
} catch (ApiException | ClientException e) {
e.printStackTrace();
}
return getServerUrlResponse;
}
public void print() {
try {
streamingClient.stream().get(streamingActor, new StreamingEventHandler() {
@Override
public void handle(StreamingCallbackMessage message) {
System.out.println("Сообщение от стриминга получено");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "changeme"));
RestClientBuilder builder = RestClient.builder(new HttpHost("host", "port"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
// final RestHighLevelClient highClient = new RestHighLevelClient(builder);
try {
// Create a request.
// System.out.println("Начало цикла");
RestHighLevelClient highClient = new RestHighLevelClient(builder);
// for(int d = 0; i > d; d++, i++) {
Map<String, String> jsonMap = new HashMap<>();
jsonMap.put("Message id", message.toString());
// index_name is the index name, type_name is the type name, and doc_id is the document ID.
IndexRequest indexRequest = new IndexRequest("stream", "_doc", "id").source(jsonMap);
System.out.println(message.toString());
System.out.println("Передача сообщения на сервер");
// Run the following command in parallel and use the custom RequestOptions (COMMON_OPTIONS).
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
highClient.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}).execute();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}