Запись циклов в java и JSON - PullRequest
0 голосов
/ 26 апреля 2020

Метод print () получает сообщение от потоковой передачи и отправляет его в виде JSON на сервер изasticsearch, индексируя его в строке IndexRequest. Проблема в том, что вам нужно размещать каждое полученное сообщение на каждой странице (id), и я не знаю, как правильно это реализовать.

Необходимо, чтобы каждое сообщение записывалось на каждой странице с возрастающим индексом , Можете ли вы помочь написать этот l oop / method?

public class WebsoketPrinter {

    private static final RequestOptions COMMON_OPTIONS;

    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();

        // The default cache size is 100 MB. Change it to 30 MB.
        builder.setHttpAsyncResponseConsumerFactory(
                new HttpAsyncResponseConsumerFactory
                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }

    private VkStreamingApiClient streamingClient;
    public StreamingActor streamingActor;
    private VkApiClient vkClient;

    WebsoketPrinter() {
        TransportClient transportClient = new HttpTransportClient();
        streamingClient = new VkStreamingApiClient(transportClient);
        vkClient = new VkApiClient(transportClient);
        streamingActor = new StreamingActor(this.getServerUrlResponse().getEndpoint(), this.getServerUrlResponse().getKey());
    }

    private GetServerUrlResponse getServerUrlResponse() {
        ConfigReader configReader = new ConfigReader();
        Integer appId = Integer.valueOf(configReader.getProp("appId"));
        String accessToken = configReader.getProp("accessToken");
        ServiceActor serviceActor = new ServiceActor(appId, accessToken);
        GetServerUrlResponse getServerUrlResponse = null;

        try {
            getServerUrlResponse = vkClient.streaming().getServerUrl(serviceActor).execute();
        } catch (ApiException | ClientException e) {
            e.printStackTrace();
        }
        return getServerUrlResponse;
    }

    public void print() {

            try {
                streamingClient.stream().get(streamingActor, new StreamingEventHandler() {
                    @Override
                    public void handle(StreamingCallbackMessage message) {

                        System.out.println("Сообщение от стриминга получено");
                        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

                        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "changeme"));

                        RestClientBuilder builder = RestClient.builder(new HttpHost("host", "port"))
                                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                                    @Override
                                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                                    }
                                });
//                        final RestHighLevelClient highClient = new RestHighLevelClient(builder);

                        try {
                                    // Create a request.
//                                    System.out.println("Начало цикла");
                                    RestHighLevelClient highClient = new RestHighLevelClient(builder);
//                              for(int d = 0; i > d; d++, i++) {
                                    Map<String, String> jsonMap = new HashMap<>();

                                    jsonMap.put("Message id", message.toString());

                                        // index_name is the index name, type_name is the type name, and doc_id is the document ID.
                                        IndexRequest indexRequest = new IndexRequest("stream", "_doc", "id").source(jsonMap);

                                        System.out.println(message.toString());
                                        System.out.println("Передача сообщения на сервер");

                                        // Run the following command in parallel and use the custom RequestOptions (COMMON_OPTIONS).
                                        IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);

                                    highClient.close();

                        } catch (IOException ioException) {
                            ioException.printStackTrace();
                        }
                    }
                }).execute();
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
        }
    }
}

1 Ответ

0 голосов
/ 27 апреля 2020

Я решил проблему, правда, не с помощью циклов, а путем сохранения идентификатора в отдельном числовом массиве.

...