Как установить смещение / порядковый номер для события с помощью spark-eventhubs?Использование scala - PullRequest
0 голосов
/ 02 июля 2019

1) У меня 32 раздела в EventHUB. 2) я хочу начать чтение с определенного seqNO для всех этих 32 разделов. 3) Итак, я загрузил seqNO и 4) теперь карта позиций передается в EventHubsConf, как показано ниже.

5) использование этого ehConf для создания DirectStream, как показано ниже

Вместо чтения с позиции, которая упоминается на карте позиций. Моя работа использует свой собственный seqNO

Пробовал с документацией Azure / DataBricks

var позиции = collection.immutable.MapNameAndPartition, EventPosition

     for (partitionId <- 0 until 32 )
        {
        positions += (new NameAndPartition("myHUBName", partitionId)) -> EventPosition.fromSequenceNumber(untilOffset.toLong)
     }

// до тех пор, пока значение OffOffset не будет равно значению seqNO (например, 234 456 и т. Д.)

Карта (myHUBName-12 -> 56, myHUBName-18 -> 69, myHUBName-13 -> 160, myHUBName-17 -> 287, myHUBName-15 -> 440, myHUBName-1 -> 671, myHUBName-19 -> 141, myHUBName-31 -> 66, myHUBName-0 -> 196, myHUBName-14 -> 88, myHUBName-24 -> 1006, myHUBName-30 -> 106, myHUBName-28 -> 72, myHUBName-4 - > 91, myHUBName-8 -> 172, myHUBName-20 -> 692, myHUBName-10 -> 92, myHUBName-25 -> 104, myHUBName-9 -> 447, myHUBName-3 -> 238, myHUBName-5 -> 383, myHUBName-7 -> 52, myHUBName-11 -> 147, myHUBName-23 -> 157, myHUBName-27 -> 205, myHUBName-29 -> 117, myHUBName-21 -> 30, myHUBName-26 -> 60 , myHUBName-22 -> 107, myHUBName-6 -> 222, myHUBName-2 -> 58, myHUBName-16 -> 59)

val ehConf = EventHubsConf (ConnectionString) .setStartingPositions (позиция) .setConsumerGroup ( "$ По умолчанию") .setMaxEventsPerTrigger (80)

val directStream = EventHubsUtils.createDirectStream (ssc, ehConf)

  val eventStream = directStream.map(messageStr => {
    var event = new String(messageStr.getBytes)
    event
  })

19/07/02 07:42:07 INFO EventHubsDirectDStream: запуск пакета в 1562053320000 мс для EH: XXXXX с OffsetRange (ID_раздела: 0 | от SeqNo: 195 | до SeqNo: 196) OffsetRange (ID раздела: 1 | от SeqNo: 670 | до SeqNo: 671) OffsetRange (ID_раздела: 2 | от SeqNo: 57 | до SeqNo: 58) OffsetRange (ID раздела: 3 | от SeqNo: 237 | до SeqNo: 238) OffsetRange (ID раздела: 4 | от SeqNo: 91 | до SeqNo: 91) OffsetRange (ID раздела: 5 | от SeqNo: 382 | до SeqNo: 383) OffsetRange (ID раздела: 6 | от SeqNo: 221 | до SeqNo: 222) OffsetRange (ID раздела: 7 | от SeqNo: 52 | до SeqNo: 52) OffsetRange (ID раздела: 8 | от SeqNo: 171 | до SeqNo: 172) OffsetRange (ID раздела: 9 | от SeqNo: 446 | до SeqNo: 447) OffsetRange (ID раздела: 10 | от SeqNo: 91 | до SeqNo: 92) OffsetRange (ID секции: 11 | от SeqNo: 146 | до SeqNo: 147) OffsetRange (ID раздела: 12 | от SeqNo: 56 | до SeqNo: 56) OffsetRange (ID раздела: 13 | от SeqNo: 159 | до SeqNo: 160) OffsetRange (ID раздела: 14 | от SeqNo: 87 | до SeqNo: 88) OffsetRange (ID раздела: 15 | от SeqNo: 439 | до SeqNo: 440) OffsetRange (ID раздела: 16 | от SeqNo: 58 | до SeqNo: 59) OffsetRange (ID раздела: 17 | от SeqNo: 286 | до SeqNo: 287) OffsetRange (ID раздела: 18 | от SeqNo: 69 | до SeqNo: 69) OffsetRange (ID раздела: 19 | от SeqNo: 141 | до SeqNo: 141) OffsetRange (ID раздела: 20 | от SeqNo: 691 | до SeqNo: 692) OffsetRange (ID раздела: 21 | от SeqNo: 30 | до SeqNo: 30) OffsetRange (ID раздела: 22 | от SeqNo: 106 | до SeqNo: 107) OffsetRange (ID раздела: 23 | от SeqNo: 156 | до SeqNo: 157) OffsetRange (ID раздела: 24 | от SeqNo: 1005 | до SeqNo: 1006) OffsetRange (ID раздела: 25 | от SeqNo: 103 | до SeqNo: 104) OffsetRange (ID раздела: 26 | от SeqNo: 59 | до SeqNo: 60) OffsetRange (ID раздела: 27 | от SeqNo: 204 | до SeqNo: 205) OffsetRange (ID раздела: 28 | от SeqNo: 71 | до SeqNo: 72) OffsetRange (ID секции: 29 | от SeqNo: 117 | до SeqNo: 117) OffsetRange (ID секции: 30 | от SeqNo: 105 | до SeqNo: 106) OffsetRange (ID раздела: 31 | от SeqNo: 65 | до SeqNo: 66)

...