Одновременный Upsert In Dgraph - PullRequest
       34

Одновременный Upsert In Dgraph

0 голосов
/ 25 февраля 2020

У меня 25 разделенных kafka topi c и я использую данные из этого topi c с параллелизмом 25 при весенней загрузке. Но я получаю «TxnConflictException: транзакция была прервана» ошибка. Я боролся в течение нескольких дней. Вот мой сервис для потребителей и конфигурация кафки

@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 26214400);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> dgraphKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory("dgraph"));
    factory.setConcurrency(25);
    return factory;
}






ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9080) // ip Address ve port App prop'tan okunmalı
        .usePlaintext().build();
DgraphGrpc.DgraphStub stub = DgraphGrpc.newStub(channel);
DgraphClient dgraphClient = new DgraphClient(stub);

Gson gson = null;
JsonParser parser = new JsonParser();


@KafkaListener(topics = "ztest", groupId = "dgraph", containerFactory = "dgraphKafkaListenerContainerFactory", concurrency = "25")
public void connectorListener(String record) throws InterruptedException {
    Transaction txn = dgraphClient.newTransaction();
    JsonObject jsonObject = parser.parse(record).getAsJsonObject();
    String userId = jsonObject.get("userId").getAsString();
    userId = "\"" + userId + "\"";
    String blogId = jsonObject.get("blogId").getAsString();
    blogId = "\"" + blogId + "\"";
    float rating = jsonObject.get("rating").getAsFloat();

    String query = " query data{\n" +
            "         var(func: eq(username, " + userId + ")) {\n" +
            "          user as uid\n" +
            "         }\n" +
            "            var(func: eq(moviename," + blogId + ")) {\n" +
            "            movie as uid\n" +
            "         }\n" +
            "      }";

    DgraphProto.Mutation mu = DgraphProto.Mutation.newBuilder().setSetNquads(ByteString.copyFromUtf8(
                    "           uid(user) <username> " + userId + " .\n" +
                    "           uid(user) <appID> " + blogId + " .\n" +
                    "           uid(movie) <moviename> " + blogId + " .\n" +
                    "           uid(user) <rated> uid(movie) (rating = " + rating + ") .")).build();



    DgraphProto.Request mreq = DgraphProto.Request.newBuilder().setQuery(query).setStartTs(3).addMutations(mu).setCommitNow(true).build();
    txn.doRequest(mreq);
}

Заранее спасибо!

...