提问者:小点点

Pyspark,prophet,pandas UDF-[8906行x 3列]类型<class'pandas。果心框架数据帧'>。对于列文字,请使用'


我很抱歉,如果这是一个愚蠢的问题,但我卡住了,并尝试了每一个解决类似问题的建议。

我正在尝试使用pyspark和PandasUDF来扩展facebook先知模型,类似于使用Pandasudf在火花中预测facebook先知。我得到的最终结果是熊猫数据帧有一些错误,我看不到结果。我已经尝试了熊猫和火花数据帧,但它不起作用。

我想这是一个简单的解决办法,但我已经为此损失了几天。提前感谢!

import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline


df = pd.read_excel("All Mylan products value&units 2015-2020.xlsx")

df.drop(['Region', 'Channel', 'Product - Level 0', 'Product - Level 1', 'Time - Level 0', 'Time - Level 1', 'Sales RET [BGN]'],axis=1, inplace =True)

df

df.columns ='prod','ds','y'
df['ds'] = pd.to_datetime(df['ds'])

df.fillna(0,inplace=True)

df1 = df.groupby(by='prod')

df1.head()

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import *

import pyspark
from pyspark.sql.types import *

from pyspark.sql import SparkSession
# Create a spark session
spark = SparkSession.builder.getOrCreate()
#create schema using sparkDF
schema = StructType([
        StructField("prod", StringType(), True),
        StructField("ds", DateType(), True),
        StructField("y", DoubleType(), True)
    ])

import os
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "0"
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

sparkdf = spark.createDataFrame(df,schema)

from fbprophet import Prophet

sparkschema = sparkdf.schema

mySchema = StructType([ StructField("prod", StringType(), True)\
                       ,StructField("ds", DateType(), True)\
                       ,StructField("trend", DoubleType(), True)\
                       ,StructField("yhat_lower", DoubleType(), True)\
                       ,StructField("yhat_upper", DoubleType(), True)\
                       ,StructField("trend_lower", DoubleType(), True)\
                       ,StructField("trend_upper", DoubleType(), True)\
                       ,StructField("monthly", DoubleType(), True)\
                       ,StructField("monthly_lower", DoubleType(), True)\
                       ,StructField("monthly_upper'", DoubleType(), True)\
                       ,StructField("multiplicative_terms", DoubleType(), True)\
                       ,StructField("multiplicative_terms_lower", DoubleType(), True)\
                       ,StructField('multiplicative_terms_upper', DoubleType(), True)\
                       ,StructField('additive_terms', DoubleType(), True)\
                       ,StructField('additive_terms_lower', DoubleType(), True)\
                       ,StructField('additive_terms_upper', DoubleType(), True)\
                       ,StructField('yhat', DoubleType(), True)])

@pandas_udf(sparkschema, PandasUDFType.GROUPED_MAP)
def forecast(df):

    model = Prophet(
        growth="linear",
        interval_width=0.10,
        seasonality_mode="multiplicative",
        yearly_seasonality=False,
        weekly_seasonality=False,
        daily_seasonality=False,
    ).add_seasonality(name="monthly", period=12 * 30.5, fourier_order=12)

    model.fit(df.loc[:, ["ds", "y"]])

    futper = model.make_future_dataframe(periods=12, freq="M")
    results_pd = model.predict(futper)
    results_pd = pd.concat([results_pd, df["prod"]], axis=1)

    return pd.DataFrame(results_pd, columns=mySchema.fieldNames())

results = sparkdf.groupby('prod').apply(forecast)
results.show()

pdres = results.toPandas()


我在最后两行中得到的错误是:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-113-d072fc517970> in <module>
     20 
     21 results = sparkdf.groupby('prod').apply(forecast)
---> 22 results.show()

C:\SPARK\spark\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
    405         """
    406         if isinstance(truncate, bool) and truncate:
--> 407             print(self._jdf.showString(n, 20, vertical))
    408         else:
    409             print(self._jdf.showString(n, int(truncate), vertical))

~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

C:\SPARK\spark\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     96     def deco(*a, **kw):
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:
    100             converted = convert_exception(e.java_exception)

~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1193.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 20.0 failed 1 times, most recent failure: Lost task 1.0 in stage 20.0 (TID 75, DESKTOP-LE3RNA6, executor driver): java.io.FileNotFoundException: C:\Users\PZashev\AppData\Local\Temp\blockmgr-d86ed11f-022a-4dfa-8616-a9b2e2f9784b\01\temp_shuffle_39ce5516-b787-48c4-af99-a2d32b2fb3a3 (The system cannot find the path specified)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:105)
    at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:118)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:431)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: C:\Users\PZashev\AppData\Local\Temp\blockmgr-d86ed11f-022a-4dfa-8616-a9b2e2f9784b\01\temp_shuffle_39ce5516-b787-48c4-af99-a2d32b2fb3a3 (The system cannot find the path specified)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:105)
    at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:118)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-114-fea8c4aadf2a> in <module>
----> 1 pdres = results.toPandas()

C:\SPARK\spark\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\dataframe.py in toPandas(self)
   2224 
   2225         # Below is toPandas without Arrow optimization.
-> 2226         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   2227 
   2228         dtype = {}

C:\SPARK\spark\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\dataframe.py in collect(self)
    561         """
    562         with SCCallSiteSync(self._sc) as css:
--> 563             sock_info = self._jdf.collectToPython()
    564         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    565 

~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

C:\SPARK\spark\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     96     def deco(*a, **kw):
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:
    100             converted = convert_exception(e.java_exception)

~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1193.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 22.0 failed 1 times, most recent failure: Lost task 7.0 in stage 22.0 (TID 89, DESKTOP-LE3RNA6, executor driver): java.io.FileNotFoundException: C:\Users\PZashev\AppData\Local\Temp\blockmgr-d86ed11f-022a-4dfa-8616-a9b2e2f9784b\31\temp_shuffle_e9340b3c-7c60-4fd9-b8ae-5eff51d457bd (The system cannot find the path specified)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:105)
    at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:118)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:365)
    at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3310)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3307)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: C:\Users\PZashev\AppData\Local\Temp\blockmgr-d86ed11f-022a-4dfa-8616-a9b2e2f9784b\31\temp_shuffle_e9340b3c-7c60-4fd9-b8ae-5eff51d457bd (The system cannot find the path specified)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:105)
    at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:118)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

共1个答案

匿名用户

分组映射的输出存储在结果中,而不是UDF本身。删除内部UDF函数并从结果中获取结果:

@pandas_udf(mySchema, PandasUDFType.GROUPED_MAP)
def forecast(df):

    model = Prophet(
        growth="linear",
        interval_width=0.10,
        seasonality_mode="multiplicative",
        yearly_seasonality=False,
        weekly_seasonality=False,
        daily_seasonality=False,
    ).add_seasonality(name="monthly", period=12 * 30.5, fourier_order=12)

    model.fit(df.loc[:, ["ds", "y"]])

    futper = model.make_future_dataframe(periods=12, freq="M")
    results_pd = model.predict(futper)
    results_pd = pd.concat([results_pd, df["prod"]], axis=1)

    return pd.DataFrame(results_pd, columns=mySchema.fieldNames())

results = sparkdf.groupby('prod').apply(forecast)
results.columns