Почему моя база данных MySQL отключается при выполнении задания cron? - PullRequest
0 голосов
/ 10 июля 2020

Я выполняю задание на записной книжке Databricks, которая подключается к моей базе данных MySQL на AWS RDS и вставляет данные. Когда я запустил записную книжку вручную, я смог подключиться к URL-адресу конечной точки и вставить свои данные. Теперь у меня ноутбук работает на кукурузе каждые 30 минут. Первое задание было успешным, но все последующие задания завершались с ошибкой:

MySQLInterfaceError: MySQL server has gone away

Затем я снова попытался запустить задание вручную и получил ту же ошибку на tweets_pdf.to_sql(name='tweets', con=engine, if_exists = 'replace', index=False). Это код, который выполняется в записной книжке Databricks:

from __future__ import print_function
import sys
import pymysql
import os
import re
import mysql.connector
from sqlalchemy import create_engine
from operator import add
import pandas as pd
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import json
import boto
import boto3
from boto.s3.key import Key
import boto.s3.connection
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import *

# Get AWS credentials
aws_key_id = os.environ.get("accesskeyid")
aws_key = os.environ.get("secretaccesskey")

# Start spark instance
conf = SparkConf().setAppName("first") 
sc = SparkContext.getOrCreate(conf=conf)

# Allow spark to access my S3 bucket
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",aws_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",aws_key)
config_dict = {"fs.s3n.awsAccessKeyId":aws_key_id,
               "fs.s3n.awsSecretAccessKey":aws_key}
bucket = "diego-twitter-stream-sink"
prefix = "/2020/*/*/*/*"
filename = "s3n://{}/{}".format(bucket, prefix)

# Convert file from S3 bucket to an RDD
rdd = sc.hadoopFile(filename,
                'org.apache.hadoop.mapred.TextInputFormat',
                'org.apache.hadoop.io.Text',
                'org.apache.hadoop.io.LongWritable',
                conf=config_dict)
spark = SparkSession.builder.appName("PythonWordCount").config("spark.files.overwrite","true").getOrCreate()

# Map RDD to specific columns
df = spark.read.json(rdd.map(lambda x: x[1]))
features_of_interest = ["ts", "text", "sentiment"]
df_reduce = df.select(features_of_interest)



# Convert RDD to Pandas Dataframe
tweets_pdf = df_reduce.toPandas()

engine = create_engine(f'mysql+mysqlconnector://admin:{os.environ.get("databasepassword")}@{os.environ.get("databasehost")}/twitter-data')

tweets_pdf.to_sql(name='tweets', con=engine, if_exists = 'replace', index=False)

Кто-нибудь знает, в чем может быть проблема? Все переменные конфигурации базы данных верны, сегмент S3, из которого выполняется потоковая передача PySpark, содержит данные, а AWS RDS не имеет никаких ограничений по емкости или вычислению.

1 Ответ

1 голос
/ 10 июля 2020

По умолчанию max_allowed_packets (4M) может вызвать эту проблему

...