Spark RDD în DataFrame python (Programare, Python, Apache Spark, Pyspark, Spark Dataframe)

Jack Daniel a intrebat.

Încerc să convertesc Spark RDD într-un DataFrame. Am văzut documentația și exemplul în care schema este trecută la sqlContext.CreateDataFrame(rdd,schema) funcție.

Dar am 38 de coloane sau câmpuri și acest lucru va crește în continuare. Dacă dau manual schema specificând informațiile fiecărui câmp, va fi o muncă foarte plictisitoare.

Există vreo altă modalitate de a specifica schema fără a cunoaște în prealabil informațiile despre coloane?

Comentarii

  • Dacă aveți 38 de coloane, de ce lucrați cu RDD în primul rând? De ce nu începeți cu DataFrame? –  > Por Yaron.
  • Încarc date din Neo4j Graph. În cazul în care datele sunt preluate ca RDD și au unele dependențe de acestea. –  > Por Jack Daniel.
3 răspunsuri
Thiago Baldim

Vedeți,

Există două modalități de a converti un RDD în DF în Spark.

toDF() și createDataFrame(rdd, schema)

Vă voi arăta cum puteți face acest lucru în mod dinamic.

toDF()

toDF() comandă vă oferă modalitatea de a converti un RDD[Row] într-un Dataframe. Ideea este că obiectul Row() poate primi un **kwargs argument. Deci, există o modalitate ușoară de a face acest lucru.

from pyspark.sql.types import Row

#here you are going to create a function
def f(x):
    d = {}
    for i in range(len(x)):
        d[str(i)] = x[i]
    return d

#Now populate that
df = rdd.map(lambda x: Row(**f(x))).toDF()

În acest fel, veți putea crea un cadru de date în mod dinamic.

createDataFrame(rdd, schema)

O altă modalitate de a face acest lucru este crearea unei scheme dinamice. Cum?

În acest mod:

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])

df = sqlContext.createDataFrame(rdd, schema)

Acest al doilea mod este mai curat pentru a face acest lucru…

Deci, iată cum puteți crea cadre de date în mod dinamic.

Comentarii

  • Puteți să menționați care este cea mai puțin costisitoare abordare în ceea ce privește timpul de execuție? –  > Por Ishtiaque Khan.
  • În acest caz va fi opțiunea dataframe. Datorită utilizării funcțiilor PySpark RDD, funcțiile RDD vor utiliza conducta dintre JVM și Python pentru a rula logica din f(x), iar utilizând DataFrame nu veți comunica cu Python pentru a realiza schema după ce schema este construită cu For. –  > Por Thiago Baldim.
  • Îmi cer scuze. Vrei să spui că opțiunea sqlContext.createDataFrame(rdd, schema) este mai bună, nu? –  > Por Ishtiaque Khan.
  • Da, aceea, pentru PySpark cea mai bună opțiune pentru performanță este întotdeauna utilizarea DataFrame –  > Por Thiago Baldim.
  • Am votat în minus pentru că acest lucru transformă toate câmpurile în șiruri de caractere. OP spune că nu știe care sunt coloanele din timp. De unde ar putea ști că sunt șiruri de caractere? –  > Por kingledion.
pegah

Mi-a plăcut mai mult răspunsul lui Arun, dar există o mică problemă și nu am putut comenta sau edita răspunsul. sparkContext nu are createDeataFrame, sqlContext are (așa cum a menționat Thiago). Deci:

from pyspark.sql import SQLContext

# assuming the spark environemnt is set and sc is spark.sparkContext 
sqlContext = SQLContext(sc)
schemaPeople = sqlContext.createDataFrame(RDDName)
schemaPeople.createOrReplaceTempView("RDDName")

Arun Sharma

Încercați dacă funcționează

sc = spark.sparkContext

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(RddName)
schemaPeople.createOrReplaceTempView("RddName")