Конвейер не полностью параллелен в облачном потоке данных с использованием Go SDK - PullRequest
0 голосов
/ 15 марта 2019

У меня есть реализация кода Apache Beam на Go SDK, как описано ниже. Трубопровод имеет 3 ступени. Один - textio.Read, другой - CountLines, а последний шаг - ProcessLines. ProcessLines шаг занимает около 10 секунд.

После того, как я реализовал приведенный ниже код, я обнаружил, что во время выполнения конвейера каждый шаг ожидает завершения выполнения предыдущего шага и начинает работать параллельно после завершения предыдущего шага. Каждый шаг выполняется параллельно, но в конвейере одновременно выполняется только один шаг, где конвейер сейчас не полностью параллелен.

Этот вопрос является продолжением нижеследующего вопроса. Хотя ответ частично решает проблему, конвейер в настоящее время не работает полностью параллельно. Проблема параллелизма в облачном потоке данных с использованием Go SDK

package main

import (
    "context"
    "flag"
    "time"

    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)

// metrics to be monitored
var (
    input         = flag.String("input", "", "Input file (required).")
    numberOfLines = beam.NewCounter("extract", "numberOfLines")
    lineLen       = beam.NewDistribution("extract", "lineLenDistro")
)

func AddRandomKey(s beam.Scope, col beam.PCollection) beam.PCollection {
    return beam.ParDo(s, addRandomKeyFn, col)
}

func addRandomKeyFn(elm beam.T) (int, beam.T) {
    return rand.Int(), elm
}

func countLines(ctx context.Context, _ int, lines func(*string) bool, emit func(string)) {
    var line string
    for lines(&line) {
        lineLen.Update(ctx, int64(len(line)))
        numberOfLines.Inc(ctx, 1)
        emit(line)
    }
}
func processLines(ctx context.Context, _ int, lines func(*string) bool) {
    var line string
    for lines(&line) {
        time.Sleep(10 * time.Second)
        numberOfLinesProcess.Inc(ctx, 1)
    }
}

func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("Count Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    return beam.ParDo(s, countLines, grouped)
}

func ProcessLines(s beam.Scope, lines beam.PCollection) {
    s = s.Scope("Process Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    beam.ParDo0(s, processLines, grouped)
}

func main() {
    // If beamx or Go flags are used, flags must be parsed first.
    flag.Parse()
    // beam.Init() is an initialization hook that must be called on startup. On
    // distributed runners, it is used to intercept control.
    beam.Init()

    // Input validation is done as usual. Note that it must be after Init().
    if *input == "" {
        log.Fatal(context.Background(), "No input file provided")
    }

    p := beam.NewPipeline()
    s := p.Root()

    l := textio.Read(s, *input)
    lines := CountLines(s, l)
    ProcessLines(s, lines)

    // Concept #1: The beamx.Run convenience wrapper allows a number of
    // pre-defined runners to be used via the --runner flag.
    if err := beamx.Run(context.Background(), p); err != nil {
        log.Fatalf(context.Background(), "Failed to execute job: %v", err.Error())
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...