Spark:怎么通过mapInPandas正确转换数据帧

本教程将介绍Spark:如何通过mapInPandas正确转换数据帧的处理方法,这篇教程是从别的地方看到的,然后加了一些国外程序员的疑问与解答,希望能对你有所帮助,好了,下面开始学习吧。

Spark:怎么通过mapInPandas正确转换数据帧 教程 第1张

问题描述

我正在尝试使用最新的Spark 3.0.1函数转换10k行的Spark数据帧mapInPandas。

预期输出:映射的PANDAS_Function()将一行转换为三行,因此输出Transform_df应具有30k行

当前输出:我得到3行1核和24行8核。

输入:Response_sdf有10k行

+-----+-------------------------------------------------------------------+|url  |content|
+-----+-------------------------------------------------------------------+
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }|
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
+-----+-------------------------------------------------------------------+
only showing top 20 rows
Input respond_sdf has 10000 rows

输出A)3行-1核-.master(‘local[1]’)

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (0 + 1) / 1]
+-----+---+---+|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
+-----+---+---+

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
 Output transformed_df has 3 rows

输出B)24行-8核-.master(‘local[8]’)

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (0 + 1) / 1]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } 
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
+-----+---+---+
only showing top 20 rows

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (3 + 5) / 8]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
 Output transformed_df has 24 rows 

示例代码:

#### IMPORT PYSPARK ###
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") 
 .master('local[1]') 
 .getOrCreate()
sc = spark.sparkContext

####### INPUT DATAFRAME WITH LIST OF JSONS ########################

# Create list with 10k nested tuples(url,content)
rdd_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
(' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]*5000

schema = StructType([
  StructField('url', StringType(), True),
  StructField('content', StringType(), True)
  ])

#Create input dataframe with 10k rows
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
respond_sdf.show(truncate=False)

print(f'Input respond_sdf has {respond_sdf.count()} rows')

####### TRANSFORMATION DATAFRAME ########################

# Pandas transformation function returning pandas dataframe
def pandas_function(iter):
 for df in iter:
  print(df['content'][0])
  yield pd.DataFrame(eval(df['content'][0]))

transformed_df = respond_sdf.mapInPandas(pandas_function, "api string, A int, B int")
transformed_df.show()
print(f' Output transformed_df has {transformed_df.count()} rows')
print(f'Expected output dataframe should has 30k rows')

相关讨论链接:
How to yield pandas dataframe rows to spark dataframe

推荐答案

很抱歉,在我对您上一个问题的回答中,使用mapInPandas的部分不正确。我认为下面这个函数是编写 pandas 函数的正确方式。上次我犯了一个错误,因为我之前认为iter是行的迭代,但实际上它是数据帧的迭代。

def pandas_function(iter):
 for df in iter:
  yield pd.concat(pd.DataFrame(x) for x in df['content'].map(eval))

(PS感谢here的回答。)

好了关于Spark:怎么通过mapInPandas正确转换数据帧的教程就到这里就结束了,希望趣模板源码网找到的这篇技术文章能帮助到大家,更多技术教程可以在站内搜索。