идти каналы с потребителем кафки - PullRequest
0 голосов
/ 12 мая 2018

Я новичок и начинаю изучать каналы.Я использую слитного потребителя кафки для создания функционального потребителя.Что я хочу сделать, это отправить сообщения в буферизованный канал (2000) ... и затем записать сообщения в канале для повторного отображения с использованием конвейера.Я приступил к работе с потребителем, просто выполняя println сообщения по одному, пока оно не достигнет конца смещений, но когда я пытаюсь добавить канал, кажется, что в случае default:switch, а затем просто заморозить.

также не похоже, что я правильно заполняю канал?Это fmt.Println("count is: ", len(redisChnl)) всегда печатает 0

вот что у меня есть:

// Example function-based high-level Apache Kafka consumer
package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "os/signal"
    "syscall"
    "time"
    "encoding/json"
    "regexp"
    "github.com/go-redis/redis"
    "encoding/binary"
)

var client *redis.Client

func init() {
    client = redis.NewClient(&redis.Options{
        Addr:         ":6379",
        DialTimeout:  10 * time.Second,
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        PoolSize:     10,
        PoolTimeout:  30 * time.Second,
    })
    client.FlushDB()
}

type MessageFormat struct {
    MetricValueNumber float64     `json:"metric_value_number"`
    Path              string      `json:"path"`
    Cluster           string      `json:"cluster"`
    Timestamp         time.Time   `json:"@timestamp"`
    Version           string      `json:"@version"`
    Host              string      `json:"host"`
    MetricPath        string      `json:"metric_path"`
    Type              string      `json:"string"`
    Region            string      `json:"region"`
}

//func redis_pipeline(ky string, vl string) {
//  pipe := client.Pipeline()
//
//  exec := pipe.Set(ky, vl, time.Hour)
//
//  incr := pipe.Incr("pipeline_counter")
//  pipe.Expire("pipeline_counter", time.Hour)
//
//  // Execute
//  //
//  //     INCR pipeline_counter
//  //     EXPIRE pipeline_counts 3600
//  //
//  // using one client-server roundtrip.
//  _, err := pipe.Exec()
//  fmt.Println(incr.Val(), err)
//  // Output: 1 <nil>
//}

func main() {


    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":               "kafka.com:9093",
        "group.id":                        "testehb",
        "security.protocol":               "ssl",
        "ssl.key.location":                "/Users/key.key",
        "ssl.certificate.location":        "/Users/cert.cert",
        "ssl.ca.location":                 "/Users/ca.pem",
    })

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }

    fmt.Printf("Created Consumer %v\n", c)

    err = c.SubscribeTopics([]string{"jmx"}, nil)

    redisMap := make(map[string]string)

    redisChnl := make(chan []byte, 2000)

    run := true

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:

                //fmt.Printf("%% Message on %s:\n%s\n",
                //  e.TopicPartition, string(e.Value))
                if e.Headers != nil {
                    fmt.Printf("%% Headers: %v\n", e.Headers)
                }

                str := e.Value
                res := MessageFormat{}
                json.Unmarshal([]byte(str), &res)


                fmt.Println("size", binary.Size([]byte(str)))

                host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

                redisMap[host] = string(str)
                fmt.Println("count is: ", len(redisChnl)) //this always prints "count is:  0"

                redisChnl <- e.Value //I think this is the write way to put the messages in the channel?

            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                run = false
            default:
                fmt.Printf("Ignored %v\n", e)
            }

            <- redisChnl // I thought I could just empty the channel like this once the buffer is full?


        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()
}

------- РЕДАКТИРОВАТЬ -------

Ладно, думаю, я заставил его работать, переместив <- redisChnl внутрь default, но теперь я вижу, что count before read и count after read внутри default всегда печатают 2,000 ... Iподумал бы, что сначала count before read = 2,000, а затем count after read = 0, так как канал будет пуст, то ??

    select {
    case sig := <-sigchan:
        fmt.Printf("Caught signal %v: terminating\n", sig)
        run = false
    default:
        ev := c.Poll(100)
        if ev == nil {
            continue
        }

        switch e := ev.(type) {
        case *kafka.Message:

            //fmt.Printf("%% Message on %s:\n%s\n",
            //  e.TopicPartition, string(e.Value))
            if e.Headers != nil {
                fmt.Printf("%% Headers: %v\n", e.Headers)
            }

            str := e.Value
            res := MessageFormat{}
            json.Unmarshal([]byte(str), &res)


            //fmt.Println("size", binary.Size([]byte(str)))

            host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

            redisMap[host] = string(str)

            go func() {
                redisChnl <- e.Value
            }()


        case kafka.PartitionEOF:
            fmt.Printf("%% Reached %v\n", e)
        case kafka.Error:
            fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
            run = false
        default:
            fmt.Println("count before read: ", len(redisChnl))

            fmt.Printf("Ignored %v\n", e)

            <-redisChnl

            fmt.Println("count after read: ", len(redisChnl)) //would've expected this to be 0

        }


    }

1 Ответ

0 голосов
/ 12 мая 2018

Я думаю, что более простой способ упростить этот код - разделить конвейер на несколько подпрограмм.

Преимущество каналов в том, что на них могут одновременно писать и читать несколько человек. В этом примере это может означать, что одна процедура go ставится в очередь на канал, а другая - отсоединяется от канала и помещает вещи в redis.

Примерно так:

c := make(chan Message, bufferLen)
go pollKafka(c)
go pushToRedis(c)

Если вы хотите добавить пакетную обработку, вы можете добавить среднюю стадию, которая опрашивает канал kafka и добавляет к срезу до тех пор, пока срез не заполнится, а затем помещает этот срез в канал для повторного просмотра.

Если такой параллелизм не является целью, возможно, будет проще заменить канал в вашем коде фрагментом. Если на объекте действует всего 1 процедура, не стоит пытаться использовать канал.

...