Я пишу ведомую программу Go, которая прослушивает очередь сообщений AQMP для выполнения задач.Эти задачи содержат входные и выходные данные в кодировке json.Разные задачи могут иметь совершенно разные структуры ввода / вывода.
Я пытаюсь найти способ написать свою функцию registerTask, чтобы я мог повторно использовать эту функцию для нескольких задач и кодировать JSON в требуемую структуру ввода.Однако синтаксический анализатор JSON должен выдать ошибку, если ввод не соответствует вводу конкретной задачи.Я пытался принять интерфейс {}, но это разрешает что угодно и заставляет "foo" проверять любую пользовательскую структуру.Идеи?
Вся эта композиция для меня нова.В PHP я решил бы это с помощью абстрактного базового класса.
Мой код для регистрации новых задач:
func sendResult(chann *amqp.Channel, task tasks.TaskImpl, output tasks.TaskOutput) error {
log.Printf("Sending response for %s: %v", task.GetName(), output)
jsonResponse, err := json.Marshal(output)
if err != nil {
log.Printf("Could not encode task %s to JSON: %v", task.GetName(), output)
return err
}
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: jsonResponse,
}
return chann.Publish(QUEUE_RESULT, task.GetName(), false, false, msg)
}
func registerTask(chann *amqp.Channel, task tasks.TaskImpl) {
log.Println("Registering task: " + task.GetName())
deliverChann, err := chann.Consume(QUEUE_TODO, task.GetName(), false, false, false, false, nil)
if err != nil {
log.Fatalf("Could not consume: %v", err)
}
go func() {
for taskMsg := range deliverChann {
log.Printf("Task received: %v", taskMsg)
// check if it's in UTF-8
if strings.ToLower(taskMsg.ContentEncoding) != "utf-8" {
log.Printf("Warning: task %s is not encoded in utf-8 but: %s", task.GetName(), taskMsg.ContentEncoding)
taskMsg.Reject(false)
}
// check if it's JSON
if strings.ToLower(taskMsg.ContentType) != "application/json" {
log.Printf("Warning: task %s is not json but: %s", task.GetName(), taskMsg.ContentType)
taskMsg.Reject(false)
}
log.Printf("Decoding %v", taskMsg.Body)
if err := json.Unmarshal(taskMsg.Body, &task; err != nil {
log.Printf("Could not decode task %s body: %v", task.GetName(), err)
taskMsg.Reject(false)
continue
}
log.Printf("Executing %s with args %v (from %v)", task.GetName(), task, taskMsg.Body)
output, err := task.Process(input)
if err != nil {
log.Printf("Task '%s' error: %s", task.GetName(), err)
taskMsg.Reject(true)
continue
}
if err := sendResult(chann, task, output); err != nil {
log.Printf("Could not send result for task: " + task.GetName())
taskMsg.Reject(true)
continue
}
if err := taskMsg.Ack(false); err != nil {
log.Println("Could not acknowledge: " + err.Error())
}
}
}()
}
И код моей задачи:
type TaskInput struct{}
type TaskOutput struct{}
type TaskData interface {
Input TaskInput `json:"input"`
Output TaskOutput `json:"output"`
}
type TaskImpl interface {
GetName() string
Process(msg *TaskData) (*TaskData, error)
}
type SampleTaskInput struct {
TaskInput
Arg string
}
type SampleTaskOutput struct {
TaskOutput
Result string
}
type SampleTask struct {
TaskImpl
}
func (SampleTask) GetName() string {
return "sample"
}
func (SampleTask) Process(msg *TaskData) (*TaskData, error){
log.Printf("Executing sampletask with arg: %v", msg.Input)
msg.Output = SampleTaskOutput{}
return SampleTaskOutput{
Result: "String was " + data.Arg,
}, nil
}