Spark Cassandra записывает строки, не показанные в cqlsh - PullRequest
0 голосов
/ 27 января 2020

Я использую Sparks Cassandra Connector для записи потоковых сообщений в семейство Cassandra Column. Несмотря на то, что журнал отображается как Данные, записываемые в таблицу, при запросе данных он не отображается.

Вот программный стек в действии.

Java 1.8, Cassandra 3.11.4, Spark 2.4.3

Я использую следующий код для записи данных из набора данных,

dataset.write().format("org.apache.spark.sql.cassandra")
     .option("table", tableName)
     .option("keyspace", keySpace)
     .mode(SaveMode.Append).save();

сообщение журнала указывает, что записи пишутся успешно следующим образом:

[Executor task launch worker for task 58915] INFO  2020-01-27 08:49:25,608 com.datastax.spark.connector.writer.TableWriter  - Wrote 7 rows to keyspace.ssl_connections_1_2020 in 0.011 s.
[Executor task launch worker for task 58916] INFO  2020-01-27 08:49:25,612 com.datastax.spark.connector.writer.TableWriter  - Wrote 9 rows to keyspace.ssl_connections_1_2020 in 0.011 s.
[Executor task launch worker for task 58919] DEBUG 2020-01-27 08:49:25,619 com.datastax.spark.connector.writer.TableWriter  - Writing data partition to keyspace.ssl_connections_1_2020 in batches of BytesInBatch(1024)

Буду признателен за любую помощь в направлении отладки проблемы.

Редактировать структуру таблицы

connections_timeseries_1_2020 (
    srcip text,
    day int,
    hour int,
    id text,
    connectionstate text,
    connectiontimestamp timestamp,
    dayofweek int,
    destip text,
    destipbytes double,
    destmac text,
    destpackets double,
    destport int,
    duration double,
    history text,
    localdest boolean,
    localsrc boolean,
    minute int,
    missedbytes double,
    month int,
    protocol text,
    receivedbytes double,
    reportedsensor text,
    services list<text>,
    srcipbytes double,
    srcmac text,
    srcpackets double,
    srcport int,
    transmittedbytes double,
    vlan int,
    weekofyear int,
    year int,
    PRIMARY KEY ((srcip, day), hour, id)
) WITH CLUSTERING ORDER BY (hour ASC, id ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': '500'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

Строки изначально сохраняются, но останавливаются при изменении даты, и тип сообщений JSON, который преобразуется в наборы данных в Spark перед сохранением в Cassandra.

...