как заставить часть задания выполняться только один раз в кварцевом планировщике в java, т. е. потребителя кафки - PullRequest
0 голосов
/ 15 мая 2018

Я использую Java сервлет. В файле web.xml я инициализировал кварцевый планировщик. У меня есть 2 отдельных задания и два триггера. Я использую файл quartz.properties, который находится в папке ресурсов. Ниже мой файл web.xml.

<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
<display-name>Archetype Created Web Application</display-name>


 <context-param>
     <param-name>quartz:config-file</param-name>
     <param-value>quartz.properties</param-value>
 </context-param>
 <context-param>
     <param-name>quartz:shutdown-on-unload</param-name>
     <param-value>true</param-value>
 </context-param>
 <context-param>
     <param-name>quartz:wait-on-shutdown</param-name>
     <param-value>true</param-value>
 </context-param>
 <context-param>
     <param-name>quartz:start-on-load</param-name>
     <param-value>true</param-value>
 </context-param> 

 <listener>
<listener-class>
org.quartz.ee.servlet.QuartzInitializerListener
</listener-class>
</listener>
<listener>
<listener-class>com.hpe.statistics.StatisticsQuartzListener</listener-class>
</listener>
<servlet>
<servlet-name>StatisticsIvr</servlet-name>
<display-name>StatisticsIvr</display-name>
<description></description>
<servlet-class>StatisticsIvr</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>StatisticsIvr</servlet-name>
<url-pattern>/StatisticsIvr</url-pattern>
</servlet-mapping>
</web-app>

StatisticsIvr - это имя моего сервлета. StatisticsQuartzListener - это класс, в котором я инициализировал все задания и триггеры. com.hpe.statistics. это имя пакета, в котором у меня есть класс StatisticsQuartzListener. Ниже представлен класс StatisticsQuartzListener, который реализует ServletContextListener.

public void contextInitialized(ServletContextEvent ctx)
{

    JobDetail newfilejob = JobBuilder.newJob(NewFileJob.class)
            .withIdentity("fileJob", "group1").build();

    JobDetail newlinejob = JobBuilder.newJob(NewFileJob.class)
            .withIdentity("lineJob", "group2").build();

    Trigger filetrigger = TriggerBuilder
            .newTrigger()
            .withIdentity("fileTrigger", "group1")
            .startNow()
            .withSchedule(
                    SimpleScheduleBuilder.simpleSchedule()
                            .withIntervalInMinutes(6).repeatForever())
            .build();
    Trigger linetrigger = TriggerBuilder
            .newTrigger()
            .withIdentity("lineTrigger", "group2")
            .startNow()
            .withSchedule(
                    SimpleScheduleBuilder.simpleSchedule()
                            .withIntervalInMinutes(2).repeatForever())
            .build();

    try {
            scheduler1 = ((StdSchedulerFactory) ctx.getServletContext()
                    .getAttribute(
                            QuartzInitializerListener.QUARTZ_FACTORY_KEY))
                    .getScheduler();
            scheduler1.scheduleJob(newfilejob, filetrigger);
        } catch (SchedulerException e) {

        }

     try {
            scheduler2 = ((StdSchedulerFactory) ctx.getServletContext()
                    .getAttribute(
                            QuartzInitializerListener.QUARTZ_FACTORY_KEY))
                    .getScheduler();
            scheduler2.scheduleJob(newlinejob, linetrigger);
        } catch (SchedulerException e) {

        }

}

ниже - файл quartz.properties

# Main Quartz configuration
org.quartz.scheduler.skipUpdateCheck = true
org.quartz.scheduler.instanceName = StatisticsQuartzListener
org.quartz.scheduler.jobFactory.class = org.quartz.simpl.SimpleJobFactory
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 5

В newlinejob (2-е задание) я хочу выполнить часть кода только один раз. Класс NewLineJob как показано ниже

public class NewLineJob implements Job{



public static final String KAFKAHOST="kafkahost";
public static final String KAFKAPORT="kafkaport";
public static final String KAFKATOPICNAME="topicname";
KafkaConsumer<String, String> consumer;
 HashMap<String,Long> consumerMap=new HashMap<String,Long>();
 File lastModifiedFile;



 String flag="off";



public void execute(JobExecutionContext context)
        throws JobExecutionException
        {

            String kafkahost=dataMap.getString(KAFKAHOST);
            String kafkaport=dataMap.getString(KAFKAPORT);
            String topicname=dataMap.getString(KAFKATOPICNAME);


              SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
              Date d = new Date();
              String timeStamp = sf.format(d);

              int i=0;
              String writeString;
              if(flag.equals("off"))
              {
                  flag="on";
              Properties propsConsumer = new Properties();


                propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
                propsConsumer.put("group.id", "test");
                propsConsumer.put("enable.auto.commit", "true");
                propsConsumer.put("auto.commit.interval.ms", "1000");
                propsConsumer.put("session.timeout.ms", "30000");
                propsConsumer.put("key.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");
                propsConsumer.put("value.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");         
                consumer = new KafkaConsumer<String, String>(propsConsumer);

                  //Kafka Consumer subscribes list of topics here.
                consumer.subscribe(Arrays.asList(topicname)); 

              }

              ///some other operations which I want according to sceduler
        }

Часть, которую я хочу выполнить только один раз:

Properties propsConsumer = new Properties();


                propsConsumer.put("bootstrap.servers", kafkahost+":"+kafkaport);
                propsConsumer.put("group.id", "test");
                propsConsumer.put("enable.auto.commit", "true");
                propsConsumer.put("auto.commit.interval.ms", "1000");
                propsConsumer.put("session.timeout.ms", "30000");
                propsConsumer.put("key.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");
                propsConsumer.put("value.deserializer", 
                     "org.apache.kafka.common.serialization.StringDeserializer");         
                consumer = new KafkaConsumer<String, String>(propsConsumer);

                  //Kafka Consumer subscribes list of topics here.
                consumer.subscribe(Arrays.asList(topicname)); 

Потому что я не хочу создавать нового потребителя каждый раз, когда запускается планировщик. Я просто хочу использовать один и тот же потребитель kafka в остальной части кода каждый раз, когда запускается планировщик. Даже я пытался поместить ту часть кода, которую я хотел, только один раз в конструктор. Но он получает ошибку, так как key.deserializer не имеет значения по умолчанию. Какую модификацию я должен сделать? Заранее спасибо

...