Запишите с помощью Apache Ignite in Spring загрузочное приложение - PullRequest
0 голосов
/ 28 марта 2020

Я хочу использовать стратегию записи за Apache Ignite с моим загрузочным приложением Spring с базой данных mysql. Я хочу записать данные кэша в базу данных mysql с асинхронной записью. Я новичок в Apache Ignite, и я ссылаюсь на документы из https://apacheignite.readme.io/docs/.

Ниже приведена моя конфигурация загрузки Spring для использования стратегии записи за Apache Ignite.

    @Bean
    public Ignite igniteInstance() {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setIgniteInstanceName("ignite-1");
        cfg.setPeerClassLoadingEnabled(true);

        CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache");
        ccfg2.setIndexedTypes(Long.class, Contact.class);
        ccfg2.setWriteBehindEnabled(true);
        ccfg2.setWriteBehindFlushFrequency(1000);
        ccfg2.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
        ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setBackups(1);
        ccfg2.setWriteBehindFlushSize(0);
        ccfg2.setWriteBehindFlushThreadCount(1);
        ccfg2.setWriteBehindBatchSize(1);
        //ccfg2.setReadThrough(true);
        //ccfg2.setWriteThrough(true);

        // Memory Configuration
        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
        DataRegionConfiguration defaultDataRegionCfg = storageCfg.getDefaultDataRegionConfiguration();
        defaultDataRegionCfg.setPersistenceEnabled(false);   // Only Memory
        defaultDataRegionCfg.setMaxSize(1024 * 1024 * 256);  // 256MB
        defaultDataRegionCfg.setMetricsEnabled(true);
        cfg.setDataStorageConfiguration(storageCfg);


        CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>();
        f2.setDataSource(datasource);
        f2.setDialect(new MySQLDialect());
        JdbcType jdbcContactType = new JdbcType();
        jdbcContactType.setCacheName("ContactCache");
        jdbcContactType.setKeyType(Long.class);
        jdbcContactType.setValueType(Contact.class);
        jdbcContactType.setDatabaseTable("contact");
        jdbcContactType.setDatabaseSchema("demo");
        jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
        jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"),
                new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"),
                new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
        f2.setTypes(jdbcContactType);
        ccfg2.setCacheStoreFactory(f2);

         TcpCommunicationSpi spi = new  TcpCommunicationSpi();
         spi.setMessageQueueLimit(1000);

        CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
        ccfg.setIndexedTypes(Long.class, Person.class);
        ccfg.setWriteBehindEnabled(true);
        ccfg.setWriteBehindFlushFrequency(2000);
        ccfg.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
        ccfg.setWriteBehindFlushSize(0);
        ccfg.setWriteBehindFlushThreadCount(10);
        ccfg.setWriteBehindBatchSize(10);
        ccfg.setBackups(1);
        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
//      ccfg.setDataRegionName(storageCfg);
//      ccfg.setReadThrough(true);
//      ccfg.setWriteThrough(true);
        CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
        f.setDataSource(datasource);
        f.setDialect(new MySQLDialect());
        JdbcType jdbcType = new JdbcType();
        jdbcType.setCacheName("PersonCache");
        jdbcType.setKeyType(Long.class);
        jdbcType.setValueType(Person.class);
        jdbcType.setDatabaseTable("person");
        jdbcType.setDatabaseSchema("demo");
        jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
        jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"),
                new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"),
                new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"),
                new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"),
                new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"),
                new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"),
                new JdbcTypeField(Types.VARCHAR, "birth_date", String.class, "birthDate"));
        f.setTypes(jdbcType);
        ccfg.setCacheStoreFactory(f);

        Ignite ignite = Ignition.start(cfg);
        ignite.cluster().active(true);
        return ignite;
    }

При этой конфигурации, если я установлю запись до true, тогда данные будут успешно вставлены в базу данных. , Но запись позади не работает с этой конфигурацией. Я указал страницу https://apacheignite.readme.io/v2.7/docs/3rd-party-store для настройки обратной записи, но я думаю, что она не работает. Пожалуйста, дайте мне знать, если я что-то пропустил, так как я новичок ie с Ignite.

Код моего хранилища -

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {

    List<Person> findByFirstNameAndLastName(String firstName, String lastName);

    @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<Contact> selectContacts(String firstName, String lastName);

    @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<List<?>> selectContacts2(String firstName, String lastName);
}

Класс модели -

@QueryGroupIndex.List(
        @QueryGroupIndex(name="idx1")
) 
public class Person implements Serializable {

    private static final long serialVersionUID = -1271194616130404625L;
    private static final AtomicLong ID_GEN = new AtomicLong();

    @QuerySqlField(index = true)
    private Long id;
    @QuerySqlField(index = true)
    @QuerySqlField.Group(name = "idx1", order = 0) 
    private String firstName;
    @QuerySqlField(index = true)
    @QuerySqlField.Group(name = "idx1", order = 1) 
    private String lastName;
    private Gender gender;
    private Date birthDate;
    private String country;
    private String city;
    private String address;
    @Transient
    private List<Contact> contacts = new ArrayList<>();

    public void init() {
        this.id = ID_GEN.incrementAndGet();
    }

Контроллер класс -

@RestController
@RequestMapping("/person")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @Autowired
    PersonRepository repository;

    @PostMapping
    public Person add(@RequestBody Person person) {
        person.init();
        return repository.save(person.getId(), person);
    }

    @PutMapping
    public Person update(@RequestBody Person person) {
        return repository.save(person.getId(), person);
    }

    @DeleteMapping("/{id}")
    public void delete(Long id) {
        repository.delete(id);
    }

    @GetMapping("/{id}")
    public Person findById(@PathVariable("id") Long id) {
        return repository.findOne(id);
    }

Если кто-то хочет получить доступ к полному коду, он доступен на - https://github.com/gotidhavalh/ignite-rest-service

1 Ответ

1 голос
/ 01 апреля 2020

Вам нужно включить как

    ccfg.setWriteThrough(true);

, так и

    ccfg.setWriteBehindEnabled(true);

, чтобы «Запись за» работала. Пожалуйста, не забудьте проверить время ожидания гриппа sh (по умолчанию 5 с).

...