import unittest
import warnings
from datetime import datetime
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, TimestampType, FloatType
from ohlcv_service.ohlc_gwa import datetime_col
class ReusedPySparkTestCase(unittest.TestCase):
sc_values = {}
@classmethod
def setUpClass(cls):
conf = (SparkConf().setMaster('local[2]')
.setAppName(cls.__name__)
.set('deploy.authenticate.secret', '111111'))
cls.sc = SparkContext(conf=conf)
cls.sc_values[cls.__name__] = cls.sc
cls.spark = (SparkSession.builder
.master('local[2]')
.appName('local-testing-pyspark-context')
.getOrCreate())
@classmethod
def tearDownClass(cls):
print('....calling stop tearDownClass, the content of sc_values=', cls.sc_values, '\n')
for key, sc in cls.sc_values.items():
print('....closing=', key, '\n')
sc.stop()
cls.sc_values.clear()
class TestDateTimeCol(ReusedPySparkTestCase):
def setUp(self):
# Ignore ResourceWarning: unclosed socket.socket!
warnings.simplefilter("ignore", ResourceWarning)
def test_datetime_col(self):
test_data_frame = self.create_data_frame(rows=[['GWA',
'2b600c2a-782f-4ccc-a675-bbbd7d91fde4',
'02fb81fa-91cf-4eab-a07e-0df3c107fbf8',
'2019-06-01T00:00:00.000Z',
0.001243179008694,
0.001243179008694,
0.001243179008694,
0.001243179008694,
0.001243179008694]],
columns=[StructField('indexType', StringType(), False),
StructField('id', StringType(), False),
StructField('indexId', StringType(), False),
StructField('timestamp', StringType(), False),
StructField('price', FloatType(), False),
StructField('open', FloatType(), False),
StructField('high', FloatType(), False),
StructField('low', FloatType(), False),
StructField('close', FloatType(), False)])
expected = self.create_data_frame(rows=[['GWA',
'2b600c2a-782f-4ccc-a675-bbbd7d91fde4',
'02fb81fa-91cf-4eab-a07e-0df3c107fbf8',
'2019-06-01T00:00:00.000Z',
'1559347200',
0.001243179008694,
0.001243179008694,
0.001243179008694,
0.001243179008694,
0.001243179008694]],
columns=[StructField('indexType', StringType(), False),
StructField('id', StringType(), False),
StructField('indexId', StringType(), False),
StructField('timestamp', StringType(), False),
StructField('datetime', TimestampType(), True),
StructField('price', FloatType(), False),
StructField('open', FloatType(), False),
StructField('high', FloatType(), False),
StructField('low', FloatType(), False),
StructField('close', FloatType(), False)])
print(expected)
convert_to_datetime = datetime_col(test_data_frame)
self.assertEqual(expected, convert_to_datetime)
def create_data_frame(self, rows, columns):
rdd = self.sc.parallelize(rows)
df = self.spark.createDataFrame(rdd.collect(), test_schema(columns=columns))
return df
def test_schema(columns):
return StructType(columns)
if __name__ == '__main__':
unittest.main()
Ошибка
TimestampType can not accept object '1559347200' in type <class 'str'>
Функция datetime_col
def datetime_col(df):
return df.select("indexType", "id", "indexId", "timestamp",
(F.col("timestamp").cast(TimestampType)).alias("datetime"),
"price", "open", "high", "low", "close")
Функции col типа datetime преобразуют метку времени из строки в метку времени формат. Это работает должным образом в ноутбуке EMR-Zeppelin, но когда я пытаюсь выполнить модульное тестирование, выдает вышеуказанную ошибку. Версия spark и pyspark в моей локальной версии 2.3.1. Как устранить эту ошибку. Когда я пытаюсь конвертировать spark df в pandas df, он конвертирует метку времени как + 12.