невозможно вставить или вставить данные из таблицы kafka topi c в таблицу куду с помощью линз - PullRequest
0 голосов
/ 26 марта 2020

линзы kudu версия разъема раковины = kafka-connect-kudu-1.2.3-2.1.0

схема таблицы kudu

CREATE TABLE IF NOT EXISTS table_name(
su_id bigint not null,
su_tenant_id int null,
su_bu_id int null,
su_user_type string null,
su_acpd_id int null,
su_user_code string null,
su_user_title string null,
su_first_name string not null,
su_middle_name string null,
su_last_name string null,
su_dob timestamp null,
su_doj timestamp null,
su_primary_position_id bigint null,
su_role_id int null,
su_masterdataref string null,
su_primary_address bigint null,
su_mobile_no string null,
su_email_id string null,
su_photo string null,
su_isactive boolean not null,
su_created_by bigint not null,
su_created_timestamp timestamp not null,
su_modified_by bigint null,
su_modified_timestamp timestamp null,
su_status string null,
flex_1 string null,
flex_2 string null,
flex_3 string null,
flex_4 string null,
flex_5 string null,
flex_6 string null,
flex_7 string null,
flex_8 string null,
flex_9 string null,
su_gender string null,
su_theme_id int null,
su_activated_timestamp timestamp not null,
su_deactivated_timestamp timestamp null,
su_level_id smallint null,
su_hierarchy_type string null,
su_user_type_id int null,
su_adh_id int null,
su_user_classification int null,
su_credit_limit decimal(18, 4) null,
su_culture_alov_id int null,
su_culture_al_id smallint null,
su_profile_image_file string null,
su_terms_isagree boolean not null,
su_terms_agreed_timestamp timestamp null,
primary key(su_id)
)
PARTITION BY HASH (su_id) PARTITIONS 3
STORED AS KUDU;

Данные тем Кафки с key.converter.schemas. enable = false, value.converter.schemas.enable = false,

{
  "su_id": 1,
  "su_tenant_id": 0,
  "su_bu_id": 0,
  "su_user_type": "A",
  "su_acpd_id": null,
  "su_user_code": "sampletest",
  "su_user_title": null,
  "su_first_name": "test_data",
  "su_middle_name": null,
  "su_last_name": "",
  "su_dob": null,
  "su_doj": null,
  "su_primary_position_id": null,
  "su_role_id": 1,
  "su_masterdataref": "0",
  "su_primary_address": null,
  "su_mobile_no": null,
  "su_email_id": null,
  "su_photo": null,
  "su_isactive": true,
  "su_created_by": 1,
  "su_created_date": 1526324248760,
  "su_modified_by": 1,
  "su_modified_date": 1547137351267,
  "su_status": "I",
  "flex_1": null,
  "flex_2": null,
  "flex_3": null,
  "flex_4": null,
  "flex_5": null,
  "flex_6": null,
  "flex_7": null,
  "flex_8": null,
  "flex_9": null,
  "su_gender": null,
  "su_theme_id": 406,
  "su_activated_date": 1526324248760,
  "su_deactivated_date": null,
  "su_level_id": null,
  "su_hierarchy_type": null,
  "su_user_type_id": null,
  "su_adh_id": null,
  "su_user_classification": null,
  "su_credit_limit": null,
  "su_culture_alov_id": null,
  "su_culture_al_id": null,
  "su_profile_image_file": null,
  "su_terms_isagree": false,
  "su_terms_agreed_date": null
}

Коннектор для подключения приемника kudu:

config: 1

{
  "name": "snk_test",
    "config": {
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name AUTOCREATE DISTRIBUTEBY su_id INTO 3 BUCKETS AUTOEVOLVE"}
}

config: 2

{
  "name": "snk_test",
    "config": {
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name "}
}

с обоими этими настройками я получаю следующую ошибку

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat

org. apache .kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError (RetryWithToleranceOperator. java: 178) \ n \ tat org. apache .kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator. java: 104) \ n \ tat org. apache .kafka.connect. runtime.WorkerSinkTask.convertAndTransformRecord (WorkerSinkTask. java: 484) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkTask.convertMessages (WorkerSinkTask. java: 464) \ 10 * .kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask. java: 320) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkT ask.iteration (WorkerSinkTask. java: 224) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask. java: 192) \ n \ tat org. apache. kafka.connect.runtime.WorkerTask.doRun (WorkerTask. java: 175) \ n \ tat org. apache .kafka.connect.runtime.WorkerTask.run (WorkerTask. java: 219) \ n \ tat java .util.concurrent.Executors $ RunnableAdapter.call (Executors. java: 511) \ n \ tat java .util.concurrent.FutureTask.run (FutureTask. java: 266) \ n \ tat java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) \ n \ tat java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) \ n tat java .lang.Thread.run (Thread. java: 748) \ nПричиняется: org. apache .kafka.connect.errors.DataException: JsonConverter с schemas.enable требует \ "schema \" и \ " полезные данные \ "и могут не содержать дополнительных полей. Если вы пытаетесь десериализовать обычные JSON данные, задайте schemas.enable = false в конфигурации конвертера. \ N \ tat org. apache .kafka.connect. json .JsonConverter.toConnectData (JsonConverter. java : 348) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 0 (WorkerSinkTask. java: 484) \ n \ tat org. apache .kafka.connect.runtime. ошибки . Еще 13 \ n

Темы Kafka с key.converter.schemas.enable = true, value.converter.schemas.enable = true,

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "su_id"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_tenant_id"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_bu_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_user_type"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_acpd_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_user_code"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_user_title"
      },
      {
        "type": "string",
        "optional": false,
        "field": "su_first_name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_middle_name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_last_name"
      },
      {
        "type": "int32",
        "optional": true,
        "name": "io.debezium.time.Date",
        "version": 1,
        "field": "su_dob"
      },
      {
        "type": "int32",
        "optional": true,
        "name": "io.debezium.time.Date",
        "version": 1,
        "field": "su_doj"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "su_primary_position_id"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_role_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_masterdataref"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "su_primary_address"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_mobile_no"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_email_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_photo"
      },
      {
        "type": "boolean",
        "optional": false,
        "field": "su_isactive"
      },
      {
        "type": "int64",
        "optional": false,
        "field": "su_created_by"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_created_date"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "su_modified_by"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_modified_date"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_status"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_1"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_2"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_3"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_4"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_5"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_6"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_7"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_8"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flex_9"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_gender"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_theme_id"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_activated_date"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_deactivated_date"
      },
      {
        "type": "int16",
        "optional": true,
        "field": "su_level_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_hierarchy_type"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_user_type_id"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_adh_id"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_user_classification"
      },
      {
        "type": "bytes",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "4",
          "connect.decimal.precision": "18"
        },
        "field": "su_credit_limit"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "su_culture_alov_id"
      },
      {
        "type": "int16",
        "optional": true,
        "field": "su_culture_al_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "su_profile_image_file"
      },
      {
        "type": "boolean",
        "optional": false,
        "field": "su_terms_isagree"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_terms_agreed_date"
      }
    ],
    "optional": true,
    "name": "mssql.dbo.table_name.Value"
  },
  "payload": {
    "su_id": 1,
    "su_tenant_id": 0,
    "su_bu_id": 0,
    "su_user_type": "A",
    "su_acpd_id": null,
    "su_user_code": "sampletest1",
    "su_user_title": null,
    "su_first_name": "test_data",
    "su_middle_name": null,
    "su_last_name": "",
    "su_dob": null,
    "su_doj": null,
    "su_primary_position_id": null,
    "su_role_id": 1,
    "su_masterdataref": "0",
    "su_primary_address": null,
    "su_mobile_no": null,
    "su_email_id": null,
    "su_photo": null,
    "su_isactive": true,
    "su_created_by": 1,
    "su_created_date": 1526324248760,
    "su_modified_by": 1,
    "su_modified_date": 1547137351267,
    "su_status": "I",
    "flex_1": null,
    "flex_2": null,
    "flex_3": null,
    "flex_4": null,
    "flex_5": null,
    "flex_6": null,
    "flex_7": null,
    "flex_8": null,
    "flex_9": null,
    "su_gender": null,
    "su_theme_id": 406,
    "su_activated_date": 1526324248760,
    "su_deactivated_date": null,
    "su_level_id": null,
    "su_hierarchy_type": null,
    "su_user_type_id": null,
    "su_adh_id": null,
    "su_user_classification": null,
    "su_credit_limit": null,
    "su_culture_alov_id": null,
    "su_culture_al_id": null,
    "su_profile_image_file": null,
    "su_terms_isagree": false,
    "su_terms_agreed_date": null
  }
}

Конфигурация соединителя раковины kudu :

config: 1

{
  "name": "snk_test",
    "config": {
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name AUTOCREATE DISTRIBUTEBY su_id INTO 3 BUCKETS AUTOEVOLVE"}
}

config: 2

{
  "name": "snk_test",
    "config": {
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name "}
}

с обоими этими настройками я получаю следующую ошибку

org. apache .kafka.connect.errors.ConnectException: выход из WorkerSinkTask из-за неисправимого исключения. \ n \ tat o rg. apache .kafka.connect.runtime.WorkerSinkTask.deliverMessages (WorkerSinkTask. java: 560) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask. * 1083 321) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask. java: 224) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkTask.execute ( WorkerSinkTask. java: 192) \ n \ tat org. apache .kafka.connect.runtime.WorkerTask.doRun (WorkerTask. java: 175) \ n \ tat org. apache .kafka.connect. runtime.WorkerTask.run (WorkerTask. java: 219) \ n \ tat java .util.concurrent.Executors $ RunnableAdapter.call (Executors. java: 511) \ n \ tat java .util. concurrent.FutureTask.run (FutureTask. java: 266) \ n \ tat java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) \ n \ tat java .util.concurrent. ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) \ n \ tat java .lang.Thread.run (Thread. java: 748) \ nОтзывается: java .lang.RuntimeException: scala .MatchError: null \ n \ tat com.datamountaineer.streamreactor.conne ct.errors.ThrowErrorPolicy.handle (ErrorPolicy. scala: 58) \ n \ tat com.datamountaineer.streamreactor.connect.errors.ErrorHandler $ class.handleError (ErrorHandler. scala: 83) \ n \ tat com.datamountaineer.streamreactor.connect.errors.ErrorHandler $ class.handleTry (ErrorHandler. scala: 64) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.handleTry (KuduWriter. scala: 50) \ n \ tat com.datamountaineer.streamreactor. connect.kudu.sink.KuduWriter.applyInsert (KuduWriter. scala: 143) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.write (KuduWriter. scala: 100) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask $$ anonfun $ put $ 2.apply (KuduSinkTask. scala: 68) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask $ $ put $ 2.apply (KuduSinkTask. scala: 68) \ n \ tat scala .Option.foreach (Option. scala: 257) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink. KuduSinkTask.put (KuduSinkTask. scala: 68) \ n \ tat org. apache .kafka.connect.runtime.WorkerSinkTask.deliverMessages (WorkerSinkTask. java: 538) \ n \ t ... еще 10 \ nПричины: scala .MatchError: null \ n \ tat com.datamountaineer.streamreactor.connect.kudu.KuduConverter $ class.com $ datamountaineer $ streamreactor $ connect $ kudu $ KuduConverter $$ addFieldToRow (KuduConverter. scala: 106) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.KuduConverter $$ anonfun $ convertToKuduUpsert $ 2.apply (KuduConverter. scala 48): tat com.datamountaineer.streamreactor.connect.kudu.KuduConverter $$ anonfun $ convertToKuduUpsert $ 2.apply (KuduConverter. scala: 48) \ n \ tat scala .collection.TraversableLike $$ anonfun $ map $ 1.apply (отслеживается . scala: 234) \ n \ tat scala .collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike. scala: 234) \ n \ tat scala .collection.mutable.ResizableArray $ class .foreach (ResizableArray. scala: 59) \ n \ tat scala .collection.mutable.ArrayBuffer.foreach (ArrayBuffer. scala: 48) \ n \ tat scala .collection.TraversableLike $ class.map (TraversableLike. scala: 234) \ n \ tat scala .collection.AbstractTraversable.map (Traversable. scala: 104) \ n \ tat com.datamount aineer.streamreactor.connect.kudu. \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.com $ datamountaineer $ streamreactor $ connect $ kudu $ sink $ KuduWriter $$ handleSinkRecord $ 1 (KuduWriter. scala: 130) \ n \ tat com. datamountaineer.streamreactor.connect.kudu.sink.KuduWriter $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (KuduWriter. scala: 138) \ n \ tat com.datamountaineer.streamreactor.connect.kudu. sink.KuduWriter $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (KuduWriter. scala: 138) \ n \ tat scala .collection.Iterator $$ anon $ 11.next (Итератор. scala: 410) \ n \ tat scala .collection.Iterator $$ anon $ 11.next (Iterator. scala: 410) \ n \ tat scala .collection.Iterator $ GroupedIterator.takeDestructively (Iterator. * 1143) *: 1074) \ n \ tat scala .collection.Iterator $ GroupedIterator. go (Итератор. scala: 1089) \ n \ tat scala .collection.Iterator $ GroupedIterat or.fill (Iterator. scala: 1126) \ n \ tat scala .collection.Iterator $ GroupedIterator.hasNext (Iterator. scala: 1130) \ n \ tat scala .collection.Iterator $ class. foreach (Iterator. scala: 891) \ n \ tat scala .collection.AbstractIterator.foreach (Iterator. scala: 1334) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter $ $ anonfun $ 1.apply $ mcV $ sp (KuduWriter. scala: 141) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter $$ anonfun $ 1.apply (KuduWriter. scala: 141) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter $$ anonfun $ 1.apply (KuduWriter. scala: 141) \ n \ tat scala .util.Try $ .apply (Попробуйте. scala: 192) \ n \ tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.applyInsert (KuduWriter. scala: 136) \ n \ t ... еще 16 \ n "

...