Как создать кадр данных из строкового ключа \ tvalue, разделенного символом '\ t' - PullRequest
0 голосов
/ 26 июня 2019

У меня есть файл журнала со структурой:

log_type    time_stamp  kvs
p   2019-06-05 18:53:20 us\tc\td\tus-xx-bb\th\ti-0b1\tvl\t20190605.1833\tvt\t20190605.1833\tvs\t20190508
p   2019-06-05 18:53:20 us\tc\td\tus-xx-bb\th\ti-03a\tvl\t20190605.1833\tvt\t20190605.1833
p   2019-06-05 18:53:20 us\tc\td\tus-xx-bb\th\ti-030

Мне нужно прочитать поле kvs, взломать ключи и превратить в отдельные столбцы, итоговый DataFrame должен выглядеть так:

log_type    time_stamp us   d   h   vl  vt  vs
p   2019-06-05 18:53:20 c   us-xx-bb    0b1 20190605.1833   20190605.1833   20190508
p   2019-06-05 18:53:20 c   us-xx-bb    03a 20190605.1833   20190605.1833
p   2019-06-05 18:53:20 c   us-xx-bb    030

Очень важно, что количество ключей в kvs является динамическим, а имя ключей также является динамическим

Столбец kvs отделен \ t. Если мы разделяем столбец kvs, то элемент четного числа является заголовком, а элемент нечетного числа является значением.

Попытка состоит в том, чтобы прочитать файл журнала, создать кадр данных со схемой, основанной на всех строках, и использовать функцию write () для преобразования кадра данных в файл HDFS, но не знаете, как это сделать

val logSchema = new StructType().add("log_type",StringType).add("time_stamp",StringType).add("kvs",StringType)
val logDF = spark.read.option("delimiter", "\t").format("com.databricks.spark.csv").schema(logSchema).load("/tmp/log.tsv")

I have also tried 
logDF.withColumn("pairkv", split($"kvs", "\t")).select(col("pairkv")(1) as "us" ,col("pairkv")(3) as "d" ,col("pairkv")(5) as "h" ,col("pairkv")(7) as "vl" ,col("pairkv")(9) as "vt" ,col("pairkv")(11) as "vs").show() 
But no luck 

Есть предложения?

Ответы [ 3 ]

0 голосов
/ 26 июня 2019

В scala это можно сделать следующим образом:

object DataFrames {

    case class Person(ID:Int, name:String, age:Int, numFriends:Int)
    def mapper(line:String): Person = {
      val fields = line.split(',')  

      val person:Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
      return person
    }

    def main(args: Array[String]) {
        ....
        import spark.implicits._
        val lines = spark.sparkContext.textFile("../myfile.csv")
        val people = lines.map(mapper).toDS().cache()
        ....
        //here people will be the dataframe...and you can execute your sql queries on this
    }
}
0 голосов
/ 03 июля 2019

Я нашел решение

logDF
.withColumn("us", regexp_extract(col("kvs") ,"(^|\\\\t)us\\\\t([\\w]+)",2))
.withColumn("d", regexp_extract(col("kvs") ,"(\\\\t)d\\\\t([\\w-]+)",2))
.withColumn("h", regexp_extract(col("kvs") ,"(\\\\t)h\\\\t([\\w-]+)",2))
.withColumn("vl", regexp_extract(col("kvs") ,"(\\\\t)vl\\\\t([\\w.]+)",2))
.withColumn("vt", regexp_extract(col("kvs") ,"(\\\\t)vt\\\\t([\\w.]+)",2))
.withColumn("vs", regexp_extract(col("kvs") ,"(\\\\t)vs\\\\t([\\w]+)",2))
.show()

Таким образом, у нас есть отдельные столбцы в DF

0 голосов
/ 26 июня 2019

Проблема в том, что у вас есть 2 символа-разделителя, ' ' и '\ t'.

Я вижу одно прямое решение, которое состоит в том, чтобы переформатировать входной файл, чтобы иметь только один символ-разделитель.

with open('original_log_file.txt', 'r') as f:
    with open('new_lof_file.txt','w') as out:
        for line in f:
            out.write(line.replace(' ','\t')) #all separators are '\t'

df = pd.read_csv('new_lof_file.txt', delimiter ='\t)
#then fix the header and you are done.

Альтернативным подходом было бы проанализировать каждую строку файла, сделать из него DataFrame и затем добавить его к исходному DataFrame.

Вот пример:

file = '''
p   2019-06-05 18:53:20 us\tc\td\tus-xx-bb\th\ti-0b1\tvl\t20190605.1833\tvt\t20190605.1833\tvs\t20190508
p   2019-06-05 18:53:20 us\tc\td\tus-xx-bb\th\ti-03a\tvl\t20190605.1833\tvt\t20190605.1833
p   2019-06-05 18:53:20 us\tc\td\tus-xx-bb\th\ti-030
'''

columns=['log_type', 'date', 'time', 'us', 'd', 'h', 'vl', 'vt', 'vs']
df = pd.DataFrame({k:[] for k in columns}) #initial df

for line in file.split('\n'):
    if len(line):
        clean_line = line.strip().replace('   ','\t').replace(' ','\t').split('\t') #fix the line
        #remove redundant header
        for c in columns:
            if c in clean_line:
                clean_line.remove(c)
        clean_line = [[x] for x in clean_line]
        df = df.append(pd.DataFrame(dict(zip(columns,clean_line))),'sort=True')

df = df[columns]
df.head()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...