Не удается запросить данные из источника данных Druid на внешней таблице Hive - PullRequest
1 голос
/ 10 февраля 2020

Кластер друидов и кластер Hive / Had oop работают нормально по отдельности. Мы создаем таблицу в Hive, которая считывает данные из целей Druid (для ETL), однако в начальных тестах мы обнаружили, что мы не можем сделать из нее простое SELECT * с последующей ошибкой:

hive> select * from druid_hive_table;
OK
druid_hive_table.__time druid_hive_table.op_ts  druid_hive_table.op_type    druid_hive_table.pos    druid_hive_table.table
Failed with exception java.io.IOException:org.apache.hive.druid.com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.util.ArrayList out of START_OBJECT token
 at [Source: org.apache.hive.druid.com.metamx.http.client.io.AppendableByteArrayInputStream@656c5818; line: -1, column: 4]
Time taken: 0.449 seconds

Тем не менее, SELECT COUNT(*) отлично работает!

hive> select count(*) from druid_hive_table;
OK
$f0
21409
Time taken: 0.199 seconds, Fetched: 1 row(s)

ОСОБЕННОСТИ:
ВНЕШНИЙ СТОЛ DRUID

SET hive.druid.broker.address.default=<host>:8082;

CREATE EXTERNAL TABLE druid_hive_table
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "druid_datasource_name");

hive> DESCRIBE FORMATTED druid_hive_table;
OK
col_name    data_type   comment
# col_name              data_type               comment             

__time                  timestamp               from deserializer   
op_ts                   string                  from deserializer   
op_type                 string                  from deserializer   
pos                     string                  from deserializer   
table                   string                  from deserializer   

# Detailed Table Information         
Database:               tests                    
Owner:                  OWNER                   
CreateTime:             Mon Feb 10 13:52:13 UTC 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               <LOCATION>    
Table Type:             EXTERNAL_TABLE           
Table Parameters:        
    COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
    EXTERNAL                TRUE                
    druid.datasource        druid_datasource_name          
    numFiles                0                   
    numRows                 0                   
    rawDataSize             0                   
    storage_handler         org.apache.hadoop.hive.druid.DruidStorageHandler
    totalSize               0                   
    transient_lastDdlTime   1581342733          

# Storage Information        
SerDe Library:          org.apache.hadoop.hive.druid.serde.DruidSerDe    
InputFormat:            null                     
OutputFormat:           null                     
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:         
    serialization.format    1                   
Time taken: 0.144 seconds, Fetched: 37 row(s)

Для справки - Друид Супервизор Spe c:

{
  "dataSchema": {
    "dataSource": "druid_datasource_name",
    "timestampSpec": {
      "column": "current_ts",
      "format": "iso",
      "missingValue": null
    },
    "dimensionsSpec": {
      "dimensions": [],
      "dimensionExclusions": [
        "current_ts"
      ]
    },
    "metricsSpec": [],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": {
        "type": "none"
      },
      "rollup": false,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "ioConfig": {
    "topic": "<kafka_topic>",
    "inputFormat": {
      "type": "json",
      "flattenSpec": {
        "useFieldDiscovery": true,
        "fields": []
      },
      "featureSpec": {}
    },
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT3600S",
    "consumerProperties": {
      "bootstrap.servers": "<bootstrap_servers>",
      "group.id": "<group_name>",
      "security.protocol": "SASL_SSL",
      "ssl.truststore.location": "<location>",
      "ssl.truststore.password": "<pass>",
      "sasl.jaas.config": "<config>",
      "sasl.mechanism": "SCRAM-SHA-512"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": true,
    "completionTimeout": "PT1800S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "lateMessageRejectionStartDateTime": null,
    "stream": "<kafka_topic>",
    "useEarliestSequenceNumber": true,
    "type": "kafka"
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/opt/apache-druid-0.17.0/var/tmp/druid-realtime-persist7801461398656096281",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "indexSpecForIntermediatePersists": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": false,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": false,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false,
    "repartitionTransitionDuration": "PT120S"
  },
  "type": "kafka"
}

Спасибо за помощь в решении этой проблемы.

1 Ответ

0 голосов
/ 20 февраля 2020

Мне удалось решить эту проблему. Обновление Hive и Had oop до версии 3+ решило проблему.

Это было так же просто, как смазать кусок хлеба, используя этот код:

SET hive.druid.broker.address.default=<host>:8082;

CREATE EXTERNAL TABLE druid_hive_table
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "druid_datasource_name");
...