На самом деле дело неясно, поэтому может быть иначе.Однако я могу дать некоторые предложения.Это не точное решение, но я думаю, что оно может помочь вам дать идею.
Сначала я читаю детали таблиц;
>>> rdd1 = sc.textFile('/home/ali/table1.txt')
>>> table1 = rdd1.map(lambda x: x.split(':')).map(lambda x: (x[0],x[1])).toDF(['col_name','data_type'])
>>> table1.show()
+--------+-------------+
|col_name| data_type|
+--------+-------------+
| ename| varchar(10)|
| eid| smallint(5)|
| esal|numeric(10,3)|
+--------+-------------+
>>> rdd2 = sc.textFile('/home/ali/table2.txt')
>>> table2 = rdd2.map(lambda x: x.split(':')).map(lambda x: (x[0],x[1])).toDF(['col_name','data_type'])
>>> table2.show()
+--------+-----------+
|col_name| data_type|
+--------+-----------+
| sid|smallint(5)|
| sname|varchar(10)|
| sclass|varchar(10)|
+--------+-----------+
И я читаю файлы данных, но перед этим вы должны определить схемы.Если вы этого не сделаете, тип данных всех столбцов будет назначен в виде строки по умолчанию
>>> from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
>>>
>>> schema1 = StructType([
... StructField("col1", StringType()),
... StructField("col2", IntegerType()),
... StructField("col3", DoubleType())
... ])
>>>
>>> schema2 = StructType([
... StructField("col1", IntegerType()),
... StructField("col2", StringType()),
... StructField("col3", StringType())
... ])
>>>
>>> data1 = spark.read.csv('/home/ali/file1.txt', schema=schema1)
>>> data1.show()
+----+----+---------+
|col1|col2| col3|
+----+----+---------+
| aa| 1|12222.009|
| bb| 2|12345.012|
+----+----+---------+
>>> data2 = spark.read.csv('/home/ali/file2.txt', schema=schema2)
>>> data2.show()
+----+----+---------+
|col1|col2| col3|
+----+----+---------+
| 1| s1|1st_class|
| 2| s2|2nd_class|
+----+----+---------+
Я определяю функцию для проверки соответствия типов данных или нет.Но когда вы определяете функцию, вы должны конвертировать некоторые типы данных базы данных (например, varchar -> string, numeric -> double ..) Я конвертирую только для строковых, int и double типов данных.Если вы будете работать с несколькими типами данных, вы должны определить все их
>>> def matchTableData(t,d):
... matched = []
... for k1,table in t.items():
... table_dtypes = []
... a = True
... for i in [i.data_type for i in table.select('data_type').collect()]:
... if 'char' in i:
... table_dtypes.append('string')
... elif 'int' in i:
... table_dtypes.append('int')
... elif 'numeric' in i:
... table_dtypes.append('double')
... for k2,data in d.items():
... data_dtypes = [i[1] for i in data.dtypes]
... if table_dtypes == data_dtypes:
... matched.append([k1,k2])
... return matched
Теперь мы готовы сравнить типы данных.Я создаю два кода для таблиц и данных.
>>> tables = {'table1':table1, 'table2':table2}
>>> data = {'data1':data1, 'data2':data2}
>>> print(matchTableData(tables,data))
[['table1', 'data1'], ['table2', 'data2']]
Как видите, он возвращает совпадающие.Как я уже говорил, это не может быть точным решением, но я думаю, что вы можете использовать некоторую часть этого