Кафка-спарк Потоковая обработка заданий синхронно - PullRequest
0 голосов
/ 16 мая 2018

Я пытаюсь провести простой тест, в котором я использую Kafka-connect и spark

Я написал пользовательский kafka-connect, который создает эту исходную запись

SourceRecord sr = new SourceRecord(null,
                    null,
                    destTopic,
                   Schema.STRING_SCHEMA,
                    cleanPath);

в спарке, я получаю это сообщениекак это

val kafkaConsumerParams = Map[String, String](
      "metadata.broker.list" -> prop.getProperty("kafka_host"),
      "zookeeper.connect" -> prop.getProperty("zookeeper_host"),
      "group.id" -> prop.getProperty("kafka_group_id"),
      "schema.registry.url" -> prop.getProperty("schema_registry_url"),
      "auto.offset.reset" -> prop.getProperty("auto_offset_reset")
    )
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConsumerParams, topicsSet)

val ds = messages.foreachRDD(rdd => {
          val toPrint = rdd.map(t => {
            val file_path = t._2

            val startTime = DateTime.now()


            Thread.sleep(1000 * 60)
            1
      }).sum()
        LogUtils.getLogger(classOf[DeviceManager]).info(" toPrint = " + toPrint +" (number of flows calculated)")
      })
    }

когда я использую соединитель для отправки нескольких сообщений на нужную тему (в моем тесте было 6 разделов) Поток сна получает все сообщения, но преобразует их синхронно, а не асинхронно.

Когда я создаю простого тестового производителя, сны выполняются асинхронно.

Я также создал 2 простых потребителей и попробовал и соединитель, и производителя, и обе задачи были использованы асинхронно, что означает, что мойПроблемы связаны с тем, как искра получает сообщения, отправленные с разъема.Я не могу понять, почему задачи не работают так же, как и при отправке их от производителя.

Я даже напечатал запись, которую получает искра, и они точно такие же

производительотправленная запись

1: {partition=2, offset=11, value=something, key=null}
2: {partition=5, offset=9, value=something2, key=null}

подключить отправленную запись

1: {partition=3, offset=9, value=something, key=null}

версии, используемые в моих проектах:

    <scala.version>2.11.7</scala.version>
    <confluent.version>4.0.0</confluent.version>
    <kafka.version>1.0.0</kafka.version>
    <java.version>1.8</java.version>
    <spark.version>2.0.0</spark.version>

зависимости

 <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.0.0-RC1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
        </dependency>
<dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <scope>${global.scope}</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-connect-avro-converter</artifactId>
            <version>${confluent.version}</version>
            <scope>${global.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${kafka.version}</version>
        </dependency>

1 Ответ

0 голосов
/ 16 мая 2018

Мы не можем выполнять потоковые задания Spark-Kafka асинхронно. Но мы можем запустить их параллельно, как это делают потребители Kafka. Для этого нам нужно установить следующую конфигурацию в SparkConf():

sparkConf.set("spark.streaming.concurrentJobs","4")

По умолчанию его значение равно "1". Но мы можем переопределить его на более высокое значение.

Надеюсь, это поможет!

...