Невозможно проверить подлинность кластера Кассандры с помощью программы spark scala - PullRequest
0 голосов
/ 06 сентября 2018

Пожалуйста, предложите мне решить нижеприведенную проблему или предложите другой подход для постановки задачи. Я получаю данные откуда-то и вставляю их в кассандру ежедневно, затем мне нужно извлечь данные из кассандры за целую неделю и выполнить некоторую обработку и вставить результат обратно в кассандру.

У меня много записей, каждая из которых выполняет большинство из перечисленных ниже операций. Согласно моему предыдущему сообщению Предотвращение предупреждения о подготовленном заявлении , чтобы избежать представления подготовленного оператора, попытался сохранить карту строки запроса и подготовленных операторов.

Я пытался написать следующую программу spark scala, я проверил информацию о хосте cassandra из cqlsh, я могу подключиться к ней. Но через программу, когда я пытаюсь, я получаю ошибку.

class StatementCache {
  val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

  val session = acluster.connect("keyspacename");

      val statementCache: ConcurrentHashMap[String,PreparedStatement] = new ConcurrentHashMap


      def getStatement(cql : String): BoundStatement = {
    var ps : PreparedStatement = statementCache.get(cql);
     if (ps == null) {
                ps = session.prepare(cql);
                statementCache.put(cql, ps);
            }
            return ps.bind();
        }
    }


object CassandraUtils {
  println("##########entered cassandrutils")
   //val st=new STMT();
 private val psCache  : StatementCache = new StatementCache();
 val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
  val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"

  val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
  val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and  summ_dt= ? and t_summ_id=?; "

  def insert(session: Session, data: TripHistoryData, batch: BatchStatement) {
   batch.add(psCache.getStatement(insertQuery));
  }

  def update(session: Session, data: TripHistoryData, batch: BatchStatement) {
    batch.add(psCache.getStatement(updateQuery));
    }

     def initialize(clusterNodes: String, uid: String, pwd: String, port: Int, racdc:String): Cluster = {

    val builder = Cluster.builder().addContactPoints(InetAddress.getByName(clusterNodes))
      .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(
          DCAwareRoundRobinPolicy.builder() //You can directly use the DCaware without TokenAware as well
            .withLocalDc(racdc) //This is case sensitive as defined in rac-dc properties file
            //.withUsedHostsPerRemoteDc(2) //Try at most 2 remote DC nodes in case all local nodes are dead in the current DC
            //.allowRemoteDCsForLocalConsistencyLevel()
            .build()))

    if (StringUtils.isNotEmpty(uid)) {
      builder.withCredentials(uid, pwd)
    }

    val cluster: Cluster = builder.build()
    cluster
  }
}

-----------------------------------------------------------------------------------------------------------------

Я получаю следующую ошибку:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ExceptionInInitializerError
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:91)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:45)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host hostname1: Host hostname1 requires authentication, but no authenticator found in Cluster configuration
    at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:40)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:261)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:243)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutorService.execute(MoreExecutors.java:299)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1.run(Futures.java:632)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185)
    at com.datastax.driver.core.Connection$Future.onSet(Connection.java:1288)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1070)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:993)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    ... 1 more

Ответы [ 3 ]

0 голосов
/ 06 сентября 2018

я решил проблему. Поместив строки ниже в метод getStatement вместо метода outside.

val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

val session = acluster.connect("keyspacename");
0 голосов
/ 06 сентября 2018

Ваша проблема в том, что вы пытаетесь выполнить «ручное» управление соединением - это не работает с Spark - экземпляры Cluster / Session должны быть отправлены исполнителям, но это не будет выполнить правильно, так как эти экземпляры были созданы в драйвере. Конечно, вы можете использовать «типичный» способ выполнения foreachPartition и т. Д., Как описано в этом вопросе .

Лучший способ работать с Cassandra от Spark - это использовать Cassandra Spark Connector - он автоматически распределяет нагрузку между узлами и выполняет правильную вставку и обновление данных. В этом случае вы настраиваете параметры подключения, включая аутентификацию через свойства Spark (spark.cassandra.auth.username & spark.cassandra.auth.password). Более подробная информация о подключении в документации .

0 голосов
/ 06 сентября 2018

вам необходимо обеспечить аутентификацию в вашем экземпляре кластера .withCredentials (username.trim (), password.trim ())

Или вам нужно отключить аутентификацию на уровне кассандры, изменив значение ключа аутентификатора на AllowAllAuthenticator в cassandra.yaml ..

Примечание: изменение в yaml требует перезагрузки кассандры

...