Как разобрать несколько структур JSON в программе Spark - PullRequest
0 голосов
/ 03 мая 2018

Я работаю над анализом логов (в формате Json) в Scala. Я не знаю, как поступить. Я могу получить различные типы журналов для обработки.

как мне написать / спроектировать мой код для обработки различных типов структур Json? могу ли я дать своей программе Scala схему и дать ей разобраться?

Я написал некоторый код, используя Object mapper и прочитал узлы, но мне нужен более структурно-независимый подход.

Я не уверен, с чего начать. пожалуйста, укажите мне на некоторые чтения или примеры. я пытался зайти в Google или выполнить поиск в Stackoverflow, что привело к слишком большому количеству примеров, и это сбивает с толку, поскольку я также изучаю Scala.

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Calendar;
import org.apache.spark.sql.hive.HiveContext
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException; 
import org.apache.spark.rdd.RDD;

sc.setLogLevel("OFF");

val args = sc.getConf.get("spark.driver.args").split("\\s+")

args.foreach(println);

var envStr = "dev";
var srcStr = "appm"

val RootFolderStr = "/source_folder/";
val DestFolderStr = "/dest_folder/";

val dateformatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS'Z'");
val formatter = new SimpleDateFormat("yyyy-MM-dd");

val theMonthFormatter = new SimpleDateFormat("yyyy-MM");

var fromDay: Date = formatter.parse("2018-04-29");
var toDay: Date = formatter.parse("2018-05-01");

if (args.length < 2) {
  printf("usage: need at least 2 parameters in spark.driver.args");
  sys.exit(2);
}

envStr = args(0).toLowerCase();

srcStr = args(1).toLowerCase();

if (args.length == 4) {
  fromDay = formatter.parse(args(2));
  toDay = formatter.parse(args(3));
}

if (args.length == 2) {
  // default to be yesterday to today
  toDay = formatter.parse(formatter.format(Calendar.getInstance().getTime()));

  val previousDay = Calendar.getInstance();
  previousDay.add(Calendar.DATE, -1);

  fromDay = formatter.parse(formatter.format(previousDay.getTime()));
}

// get the sub-folder for the monthly partition
val monthFolder = theMonthFormatter.format(fromDay);

var rootFolder = RootFolderStr.replaceAll("ENV", envStr) + monthFolder;
rootFolder = rootFolder.replaceAll("SRC", srcStr);

val destFolder = DestFolderStr.replaceAll("ENV", envStr);

var toCalendar = Calendar.getInstance();

toCalendar.setTime(toDay);
toCalendar.add(Calendar.DATE, 1);

// need to consider the case across the month boundary
val toDay2 = formatter.parse(formatter.format(toCalendar.getTime()));

// filter out .tmp files and 0-size files
// .tmp files are not safe to read from, it's possible that the files are under updating by Flume job and the message data is incomplete
// when the Spark job starts to read from it.

val pathInfos = FileSystem.get(sc.hadoopConfiguration).listStatus(new Path(rootFolder));

// filter out the 0-length files, .tmp files which is of today
val allfiles = pathInfos.filter(fileStatus => {
  if (fileStatus.getLen == 0)
    false
  else {
    val aPath = fileStatus.getPath().getName();

    // use the modification time is more accurate.
    val lastTime = fileStatus.getModificationTime();
    val aDate = new Date(lastTime);

    // all files between fromDay and toDay2
    aDate.after(fromDay) && aDate.before(toDay2);
  }
}
).map(_.getPath.toString);


case class event_log(
                      time_stp: Long,
                      msg_sze: Int,
                      msg_src: String,
                      action_path: String,

                      s_code: Int,
                      s_desc: String,

                      p_code: String,
                      c_id: String,
                      m_id: String,

                      c_ip: String,
                      c_gp: String,

                      gip: String,
                      ggip: String,

                      rbody: String
                    );

def readEvents(fileList: Array[String], msgSrc: String, fromTS: Long, toTS: Long): RDD[(event_log)] = {
  val records =
    sc.sequenceFile[Long, String](fileList.mkString(","))
      .filter((message) => {
        (message._1 >= fromTS && message._1 < toTS);
      }
      )

  val eventLogs = records.map((message) => {
    val time_stp = message._1;
    var msg_sze = message._2.length();

    var c_id = ""
    var m_id = "";
    var p_code = "";

    var c_ip = "";
    var c_gp = "";

    var gip = "";
    var ggip = "";

    var rbody = "";

    var action_path = "";
    var s_code: Int = 200;
    var s_desc = "";

    try {
      // parse the message
      val mapper = new ObjectMapper();
      val aBuff = message._2.getBytes();

      val root = mapper.readTree(aBuff);

      var aNode = root.path("rbody");
      rbody = aNode.textValue();

      if (rbody != null && rbody.length() > 0) {
        val mapper_2 = new ObjectMapper();
        val aBuff_2 = rbody.getBytes();

        var root2 = mapper_2.readTree(aBuff_2);

        aNode = root2.path("p_code");
        if (aNode != null && aNode.isValueNode())
          p_code = String.valueOf(aNode.intValue());

        aNode = root2.path("mkay");
        if (aNode != null && aNode.isObject()) {

          root2 = aNode;
        }

        {
          aNode = root2.get("c_id");
          if (aNode != null && aNode.isValueNode())
            c_id = aNode.textValue();

          aNode = root2.get("m_id");
          if (aNode != null && aNode.isValueNode()) {
            m_id = String.valueOf(aNode.intValue());
          }
        }
      }

      aNode = root.path("c_ip");
      c_ip = aNode.textValue();

      aNode = root.path("c_gp");
      c_gp = aNode.textValue();

      aNode = root.path("gip");
      gip = aNode.textValue();

      aNode = root.path("ggip");
      ggip = aNode.textValue();

      aNode = root.path("action_path");
      action_path = aNode.textValue();

      aNode = root.path("s_code");

      val statusNodeValue = aNode.textValue().trim();
      s_code = Integer.valueOf(statusNodeValue.substring(0, 3));
      s_desc = statusNodeValue.substring(3).trim();
    }
    catch {
      // return empty string as indicator that it's not a well-formatted JSON message
      case jex: JsonParseException => {
        msg_sze = 0
      };

      case ioEx: java.io.IOException => {
        msg_sze = 0
      };
      case rtEx: JsonMappingException => {
        msg_sze = 0
      };
    }

    event_log(time_stp, msg_sze, msgSrc, action_path, s_code, s_desc,
      p_code, c_id, m_id,
      c_ip, c_gp, gip, ggip,
      rbody);

  });

  eventLogs;
}

val hiveContext = new HiveContext(sc)

if (allfiles.length == 0)
  sys.exit(3);

val fromTime = fromDay.getTime();
val toTime = toDay.getTime();

val events = readEvents(allfiles, srcStr, fromTime, toTime);

val df = hiveContext.createDataFrame(events).coalesce(1);

df.write.parquet(destFolder);

sys.exit(0);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...