У меня 25 разделенных kafka topi c и я использую данные из этого topi c с параллелизмом 25 при весенней загрузке. Но я получаю «TxnConflictException: транзакция была прервана» ошибка. Я боролся в течение нескольких дней. Вот мой сервис для потребителей и конфигурация кафки
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 26214400);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> dgraphKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("dgraph"));
factory.setConcurrency(25);
return factory;
}
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9080) // ip Address ve port App prop'tan okunmalı
.usePlaintext().build();
DgraphGrpc.DgraphStub stub = DgraphGrpc.newStub(channel);
DgraphClient dgraphClient = new DgraphClient(stub);
Gson gson = null;
JsonParser parser = new JsonParser();
@KafkaListener(topics = "ztest", groupId = "dgraph", containerFactory = "dgraphKafkaListenerContainerFactory", concurrency = "25")
public void connectorListener(String record) throws InterruptedException {
Transaction txn = dgraphClient.newTransaction();
JsonObject jsonObject = parser.parse(record).getAsJsonObject();
String userId = jsonObject.get("userId").getAsString();
userId = "\"" + userId + "\"";
String blogId = jsonObject.get("blogId").getAsString();
blogId = "\"" + blogId + "\"";
float rating = jsonObject.get("rating").getAsFloat();
String query = " query data{\n" +
" var(func: eq(username, " + userId + ")) {\n" +
" user as uid\n" +
" }\n" +
" var(func: eq(moviename," + blogId + ")) {\n" +
" movie as uid\n" +
" }\n" +
" }";
DgraphProto.Mutation mu = DgraphProto.Mutation.newBuilder().setSetNquads(ByteString.copyFromUtf8(
" uid(user) <username> " + userId + " .\n" +
" uid(user) <appID> " + blogId + " .\n" +
" uid(movie) <moviename> " + blogId + " .\n" +
" uid(user) <rated> uid(movie) (rating = " + rating + ") .")).build();
DgraphProto.Request mreq = DgraphProto.Request.newBuilder().setQuery(query).setStartTs(3).addMutations(mu).setCommitNow(true).build();
txn.doRequest(mreq);
}
Заранее спасибо!