У меня есть класс сущности MailingList и исходный код ниже:
@Data
@Entity
@Table(name = AccountConstants.TABLE_NAME_MAILING_LIST, uniqueConstraints = @UniqueConstraint(columnNames = { "list_name" }))
@EntityListeners(MailingListListener.class)
public class MailingList extends AbstractBaseEntity implements Serializable, BasicEntity<Long> {
public static final String MAILINGLIST_PROP_NAME = "list_name";
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "mail_list_generator")
@SequenceGenerator(name = "mail_list_generator", sequenceName = "mail_list_seq", allocationSize = 1, initialValue = 1)
@Column(name = "id", updatable = false, nullable = false)
private Long id;
@Column(name = "list_name", length = 50, nullable = false, updatable = true)
private String name;
}
Как видите, сущность выше добавляет слушателя к классу MailingListListener, а исходный код MailingListListener ниже
@Component
public class MailingListListener {
@PrePersist
public void onPrePersist(Object o) {
try {
if (o instanceof Timestampable) {
Timestampable tso = (Timestampable) o;
tso.setCreatedAt(new Date());
tso.setActive(true);
tso.setCreatedBy(getUserId());
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
private void sendMailingList(MailingList mailingList) {
try {
MailingListVM vm = MailingListMapper.INSTANCE.toMailingListVM(mailingList);
KafkaMessage kafkaMsg = KafkaUtil.saveKafkaMessage(vm, BaseConstants.TABLE_NAME_MAILING_LIST, "uaa");
Map<String, Object> sendingData = new HashMap<>();
if (mailingList.getActive() == null || mailingList.getActive() == false) {
sendingData.put(BaseConstants.KAFKA_ACTION, BaseConstants.KAFKA_ACTION_DELETE);
} else {
sendingData.put(BaseConstants.KAFKA_ACTION, BaseConstants.KAFKA_ACTION_UPDATE);
}
sendingData.put(BaseConstants.TABLE_NAME, BaseConstants.TABLE_NAME_MAILING_LIST + "");
sendingData.put(MailingList.PROP_ID, mailingList.getId().longValue() + "");
sendingData.put(MailingList.MAILINGLIST_PROP_NAME, mailingList.getName());
sendingData.put(MailingList.PROP_CREATED_AT, mailingList.getCreatedAt());
sendingData.put(MailingList.PROP_UPDATED_AT, mailingList.getUpdatedAt());
sendingData.put(KafkaMessage.SOURCE_SERVICE, KafkaMessage.UAA_SERVICE);
sendingData.put(KafkaMessage.KAFKA_MESSAGE_ID, kafkaMsg.getId() != null ? kafkaMsg.getId() + "" : "-1");
UpdatedUaaPublicProducerChannel updatedUaaPublicProducerChannel = ApplicationContextHolder
.getBean(UpdatedUaaPublicProducerChannel.class);
updatedUaaPublicProducerChannel.updateUaaPublic().send(MessageBuilder.withPayload(sendingData).build());
} catch (Exception e) {
LOGGER.info(e.getMessage(), e);
}
}
@PostPersist
public void onPostPersist(Object o) {
MailingList obj = (MailingList) o;
send1MailingList(obj);
}
}
Вы можете увидеть код.Всякий раз, когда я обновляю или вставляю новую запись в таблицу MailingList, она выполняет функцию sendMailingList
.Эта функция просто сохраняет одну запись в таблицу kafka_message
и отправляет только одно сообщение на сервер kafka.Но я не мог понять, если я прокомментирую 2 кода первой строки этой функции, тогда я смогу запустить также
MailingListVM vm = MailingListMapper.INSTANCE.toMailingListVM(mailingList);
KafkaMessage kafkaMsg = KafkaUtil.saveKafkaMessage(vm, BaseConstants.TABLE_NAME_MAILING_LIST, "uaa");
Содержимое функции KafkaUtil.saveKafkaMessage
, как показано ниже
public final static KafkaMessage saveKafkaMessage(Object dept, String tableName, String source) {
Session session = null;
session = ApplicationContextHolder.getSessionFactory().openSession();
try {
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setContents(KafkaUtil.writeValueAsString(dept));
kafkaMessage.setTableName(tableName);
if(source.equals("device")) {
kafkaMessage.setDeviceServiceReceived(new Long(2));
} else if(source.equals("facility")) {
kafkaMessage.setFacilityServiceReceived(new Long(2));
} else if(source.equals("archive")) {
kafkaMessage.setArchiveServiceReceived(new Long(2));
} else if(source.equals("uaa")) {
kafkaMessage.setUaaServiceReceived(new Long(2));
} else if(source.equals("monitoring")) {
kafkaMessage.setMonitoringServiceReceived(new Long(2));
} else if(source.equals("sale")) {
kafkaMessage.setSaleServiceReceived(new Long(2));
}
session = ApplicationContextHolder.getSessionFactory().openSession();
Transaction transaction = null;
transaction = session.beginTransaction();
transaction.begin();
session.save(kafkaMessage);
transaction.commit();
return kafkaMessage;
} finally {
//It will close session to avoid exception that it cannot get connection from pool
if(session != null)
session.close();
}
}
Пожалуйста, помогите мнеЕсли вы знаете решение.Большое спасибо за вашу помощь