Приведите числовые поля с помощью kafka connect и table.whitelist - PullRequest
0 голосов
/ 22 ноября 2018

Я создал источник и коннектор приемника для kafka connect Confluent 5.0, чтобы отправить две таблицы sqlserver в мой набор данных

Вот моя схема таблицы SQLServer:

CREATE TABLE MYBASE.dbo.TABLE1 (
id_field int IDENTITY(1,1) NOT NULL,
my_numericfield numeric(24,6) NULL,
time_field smalldatetime NULL,
CONSTRAINT PK_CBMARQ_F_COMPTEGA PRIMARY KEY (id_field)
) GO

Моя Кассандрасхема:

create table TEST-TABLE1(my_numericfield decimal, id_field int, time_field timestamp, PRIMARY KEY (id_field));

Вот исходная конфигурация с параметром белого списка:

{
"config":
{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:sqlserver://localhost:1433;database=MYBASE",
    "connection.user": "admin",
    "connection.password": "password",
    "table.whitelist": "TABLE1, TABLE2",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "time_field",
    "incrementing.column.name": "id_field",
    "validate.non.null": "false",
    "topic.prefix": "TEST-",
    "tasks.max": "8",
    "numeric.mapping":"best_fit"
},
"name": "sqlserver-MYBASE-test"
}

Вот мой разъем приемника:

{
"name": "s3-sink-MYBASE",
"config":
{
    "topics": "TEST-TABLE1, TEST_TABLE2",
    "topics.dir": "DATABASE_FULL",
    "s3.part.size": 5242880,
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "tasks.max": 8,
    "schema.compatibility": "NONE",
    "s3.region": "eu-central-1",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "s3.bucket.name": "mydatalake",
    "flush.size": 1,
    "transforms":"InsertSourceDetails",  
    "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertSourceDetails.static.field":"DATABASE",
    "transforms.InsertSourceDetails.static.value":"MYBASE"
}
}

Проблема в том, чтонекоторые поля имеют тип NUMERIC в sqlserver, и kafka преобразует их в BINARY при поступлении в datalake

Вот результат schema_registry:

{"type": "record",
"name": "TEST-TABLE1",
"fields": [
{
  "name": "my_numericfield",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 6,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "6"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
},
{
  "name": "id_field",
  "type": "int"
},
{
  "name": "cbCreateur",
  "type": [
    "null",
    "string"
  ],
  "default": null
},
{
  "name": "time_field",
  "type": [
    "null",
    {
      "type": "long",
      "connect.version": 1,
      "connect.name": "org.apache.kafka.connect.data.Timestamp",
      "logicalType": "timestamp-millis"
    }
  ],
  "default": null
},
],
"connect.name": "TEST-TABLE1"}

Вот сценарий и результат spark:

...: from pyspark.sql.functions import col 
...: AWS_ID='xxxxxxxxxxxxxxxxx'
...: AWS_KEY='xxxxxxxxxxxxxxxxx/'
...: sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ID)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_KEY)
...: sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")
...: spark.conf.set('spark.cassandra.connection.host', 'localhost')
...: spark.conf.set('spark.cassandra.connection.port', 9042)
...: spark.conf.set('spark.cassandra.auth.username', 'cassandra')
...: spark.conf.set('spark.cassandra.auth.password', 'cassandra')
...: 
...: 

   :    F_TEST-TABLE1 = spark.read.format('com.databricks.spark.avro').load('s3a://mydatalake/DATABASE_FULL/TEST-TABLE1').drop('partition')
...:    DF_TEST-TABLE1 = F_TEST-TABLE1.toDF(*[c.lower() for c in TEST-TABLE1.columns])
...: 
...: 

: DF_TEST-TABLE1.printSchema()
root
 |-- my_numericfield: binary (nullable = true)
 |-- id_field: integer (nullable = true)
 |-- time_field: long (nullable = true)


: DF_TEST-TABLE1.createTempView("event")

: spark.sql("select * from event").show(1, False)
+----------------+--------+--------------+
||my_numericfield|id_field|time_field    |
+----------------+-----------+-----------+
|[00]            | 5      |1542733800000 |
+----------------+--------+--------------+
only showing top 1 row


: DF_TEST-TABLE1.write.format('org.apache.spark.sql.cassandra').options(keyspace='sage_full', table='f_test-table1').option('confirm.truncate', True).save(mode='overwrite')
18/11/22 08:29:05 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 3)
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [B@6d0d5743 of type class [B to java.lang.BigDecimal.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:190)

Я пытаюсь разыграть поля на лету, чтобы они соответствовали числовому типу (то есть с плавающей точкой), но я не могу найти способ сделать это, не зная заранее названия полей

С помощью параметра белого списка соединитель обрабатывает две таблицы без описания полей в конфигурации соединителя

Можно ли выполнить приведение всех числовых полей на лету?

ThaNKS за вашу помощь

...