AWS Glue Job - невозможно передать преобразованные данные клея лямбда-функции - PullRequest
2 голосов
/ 11 февраля 2020

Я новичок в службе AWS -Клей. У меня проблема с ETL-заданием Glue.


Мой пример использования:
1) Обработка файлов S3 JSON с помощью Glue-Crawler для каталогизации
Примечание: Файлы S3 будут создаваться часто.

2) Создан Glue-Job для обработки данных каталога, которые будут преобразованы и сохранены в целевом местоположении S3 .
В дополнение к этому я вызываю функцию Lambda, чтобы уведомить об этом свободный канал, который работает. Но я не смог отправить преобразованные данные в лямбда-функцию.


Склейка (Python) Сценарий: -

Используемый блок значений в коде [bucket_name] необходимо обновить

import sys
import boto3

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "[bucket_name]", table_name = "data_source", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("doc_id", "string", "doc_id", "string"), ("skill_campaign", "string", "skill_campaign", "string"), ("disposition", "string", "disposition", "string"), ("customer_num", "string", "customer_num", "string"), ("call_date", "string", "call_date", "string"), ("segstart", "string", "segstart", "string"), ("segstop", "string", "segstop", "string"), ("duration", "string", "duration", "string"), ("vendor", "string", "vendor", "string"), ("user_id", "string", "user_id", "string"), ("supervisor_user_id", "string", "supervisor_user_id", "string"), ("evaluation_status_id", "string", "evaluation_status_id", "string"), ("assigned_to", "string", "assigned_to", "string"), ("evaluation_form_id", "string", "evaluation_form_id", "string")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://[bucket_name]/data-target"}, format = "json", transformation_ctx = "datasink2")

job.commit()


##Call Lambda function: How to send data that above ETL job processed to this lambda function?
client = boto3.client('lambda' , region_name='ap-south-1')
r_lambda = client.invoke(FunctionName='lambda-slack-notification', InvocationType='Event') 



Лямбда-функция (NodeJS) Сценарий: -

Блоки значений, используемые в коде [токен], [канал], [имя пользователя] & [текст] необходимо обновить

const https = require('https');

exports.handler = (event, context, callback) => {
    const options = {
        hostname: "slack.com",
        method: "POST",
        path: "/api/chat.postMessage?token=[token]&channel=[channel]&username=[username]&text=[text]",
    };

    const req = https.request(options, (res) => {
        res.setEncoding('utf8');
        res.on('data', (chunk) => {
          // code to execute
        });
        res.on('end', () => {
          // code to execute      
        });
    });

    req.on('error', (e) => {
        callback(null, "Error has occured");
    });
    req.end();
}

Заранее спасибо ...

...