te')); return $arr; } /* 遍历用户所有主题 * @param $uid 用户ID * @param int $page 页数 * @param int $pagesize 每页记录条数 * @param bool $desc 排序方式 TRUE降序 FALSE升序 * @param string $key 返回的数组用那一列的值作为 key * @param array $col 查询哪些列 */ function thread_tid_find_by_uid($uid, $page = 1, $pagesize = 1000, $desc = TRUE, $key = 'tid', $col = array()) { if (empty($uid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('uid' => $uid), array('tid' => $orderby), $page, $pagesize, $key, $col); return $arr; } // 遍历栏目下tid 支持数组 $fid = array(1,2,3) function thread_tid_find_by_fid($fid, $page = 1, $pagesize = 1000, $desc = TRUE) { if (empty($fid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('fid' => $fid), array('tid' => $orderby), $page, $pagesize, 'tid', array('tid', 'verify_date')); return $arr; } function thread_tid_delete($tid) { if (empty($tid)) return FALSE; $r = thread_tid__delete(array('tid' => $tid)); return $r; } function thread_tid_count() { $n = thread_tid__count(); return $n; } // 统计用户主题数 大数量下严谨使用非主键统计 function thread_uid_count($uid) { $n = thread_tid__count(array('uid' => $uid)); return $n; } // 统计栏目主题数 大数量下严谨使用非主键统计 function thread_fid_count($fid) { $n = thread_tid__count(array('fid' => $fid)); return $n; } ?>python - "Column is not iterable" when doing operations with dataframe as part of function - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - "Column is not iterable" when doing operations with dataframe as part of function - Stack Overflow

programmeradmin4浏览0评论

Good day, everyone. I'm trying to perform a comparison with calculation between two dataframes and give an output in a form of a table with two columns: "Type of Change" and "Number of Occurences". To do this I've merged two dataframes, adding "_New" and "_Old" suffixes to names of columns to identify them and wrote a little function to compare them, sum and give an output in form of dataframe. Code looks like that:

# Loop to go through columns and count changes
def Form_Change_Report(
    ColumnList,
    Comparison_Dataframe: Comparison_DF,
    ChangeReport_Dataframe: ChangeReport_DF,
):
    for x in ColumnList:
        Comparison_Dataframe = Comparison_Dataframe.withColumn(
            x + "_Change", when(col(x + "_Old") == col(x + "_New"), 0).otherwise(1)
        )
        Change_DF = Comparison_Dataframe.select(
            lit("Change to " + x).alias("Type of Change"),
            sum(x + "_Change").alias("Number of Occurences"),
        )
        ChangeReport_Dataframe = ChangeReport_Dataframe.unionByName(Change_DF)

    return ChangeReport_Dataframe


# Forming blank dataframe for change report
ChangeReport_RDD = spark.sparkContext.emptyRDD()
ChangeReport_Columns = [
    StructField("Type of Change", StringType(), True),
    StructField("Number of Occurences", IntegerType(), True),
]
ChangeReport_DF = spark.createDataFrame([], schema=StructType(ChangeReport_Columns))

ChangeReport_DF = Form_Change_Report(Columns_To_Compare, Comparison_DF, ChangeReport_DF)

When trying to run it returns error "Column in not iterable". I tried to browse net in search of solution, but all examples I found were based on usage of MAX function, which isn't the case in this scenario. Dataframes are formed in PySpark, all operations are done in DataBricks if this is necessary information Would someone be able to point out what the issue might be?

Update 1

I`ve been asked to add a reprex. Code I got looks like this:

import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.functions import sum as fsum
from pyspark.sql.types import StructType, StructField


data1 = [
    ("Postcode1", "100", "150"),
    ("Postcode2", "200", "250"),
    ("Postcode3", "300", "350"),
    ("Postcode4", "400", "450"),
]
data2 = [
    ("Postcode1", "150", "150"),
    ("Postcode2", "200", "200"),
    ("Postcode3", "350", "350"),
    ("Postcode4", "400", "450"),
]
Columns = ["Postcode", "Count1", "Count2"]

rdd1 = spark.sparkContext.parallelize(data1)
rdd2 = spark.sparkContext.parallelize(data2)

df1 = spark.createDataFrame(rdd1, schema=Columns)
df2 = spark.createDataFrame(rdd2, schema=Columns)

ColumnNames_New = [
    f.col("Postcode"),
    f.col("Count1").alias("Count1_New"),
    f.col("Count2").alias("Count2_New"),
]

df1_NewData = df1.select(ColumnNames_New)

ColumnNames_Old = [
    f.col("Postcode"),
    f.col("Count1").alias("Count1_Old"),
    f.col("Count2").alias("Count2_Old"),
]

df2_OldData = df2.select(ColumnNames_Old)

# Joining two dataframes on postcode
Comparison_DF = df1_NewData.join(df2_OldData, "Postcode")

Columns_to_Compare = [f.col("Count1"), f.col("Count2")]

# Forming blank dataframe for change report
ChangeReport_RDD = spark.sparkContext.emptyRDD()
ChangeReport_Columns = [
    StructField("Type of Change", StringType(), True),
    StructField("Number of Occurences", IntegerType(), True),
]
ChangeReport_DF = spark.createDataFrame([], schema=StructType(ChangeReport_Columns))


def Form_Change_Report(
    ColumnList,
    Comparison_Dataframe: df1_NewData,
    ChangeReport_Dataframe: ChangeReport_DF,
):
    for x in ColumnList:
        Comparison_Dataframe = Comparison_Dataframe.withColumn(
            x + "_Change", when(col(x + "_Old") == col(x + "_New"), 0).otherwise(1)
        )
        Change_DF = Comparison_Dataframe.select(
            lit("Change to " + x).alias("Type of Change"),
            fsum(x + "_Change").alias("Number of Occurences"),
        )
        ChangeReport_Dataframe = ChangeReport_Dataframe.unionByName(Change_DF)

    return ChangeReport_Dataframe


ChangeReport_DF = Form_Change_Report(Columns_to_Compare, Comparison_DF, ChangeReport_DF)

Good day, everyone. I'm trying to perform a comparison with calculation between two dataframes and give an output in a form of a table with two columns: "Type of Change" and "Number of Occurences". To do this I've merged two dataframes, adding "_New" and "_Old" suffixes to names of columns to identify them and wrote a little function to compare them, sum and give an output in form of dataframe. Code looks like that:

# Loop to go through columns and count changes
def Form_Change_Report(
    ColumnList,
    Comparison_Dataframe: Comparison_DF,
    ChangeReport_Dataframe: ChangeReport_DF,
):
    for x in ColumnList:
        Comparison_Dataframe = Comparison_Dataframe.withColumn(
            x + "_Change", when(col(x + "_Old") == col(x + "_New"), 0).otherwise(1)
        )
        Change_DF = Comparison_Dataframe.select(
            lit("Change to " + x).alias("Type of Change"),
            sum(x + "_Change").alias("Number of Occurences"),
        )
        ChangeReport_Dataframe = ChangeReport_Dataframe.unionByName(Change_DF)

    return ChangeReport_Dataframe


# Forming blank dataframe for change report
ChangeReport_RDD = spark.sparkContext.emptyRDD()
ChangeReport_Columns = [
    StructField("Type of Change", StringType(), True),
    StructField("Number of Occurences", IntegerType(), True),
]
ChangeReport_DF = spark.createDataFrame([], schema=StructType(ChangeReport_Columns))

ChangeReport_DF = Form_Change_Report(Columns_To_Compare, Comparison_DF, ChangeReport_DF)

When trying to run it returns error "Column in not iterable". I tried to browse net in search of solution, but all examples I found were based on usage of MAX function, which isn't the case in this scenario. Dataframes are formed in PySpark, all operations are done in DataBricks if this is necessary information Would someone be able to point out what the issue might be?

Update 1

I`ve been asked to add a reprex. Code I got looks like this:

import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.functions import sum as fsum
from pyspark.sql.types import StructType, StructField


data1 = [
    ("Postcode1", "100", "150"),
    ("Postcode2", "200", "250"),
    ("Postcode3", "300", "350"),
    ("Postcode4", "400", "450"),
]
data2 = [
    ("Postcode1", "150", "150"),
    ("Postcode2", "200", "200"),
    ("Postcode3", "350", "350"),
    ("Postcode4", "400", "450"),
]
Columns = ["Postcode", "Count1", "Count2"]

rdd1 = spark.sparkContext.parallelize(data1)
rdd2 = spark.sparkContext.parallelize(data2)

df1 = spark.createDataFrame(rdd1, schema=Columns)
df2 = spark.createDataFrame(rdd2, schema=Columns)

ColumnNames_New = [
    f.col("Postcode"),
    f.col("Count1").alias("Count1_New"),
    f.col("Count2").alias("Count2_New"),
]

df1_NewData = df1.select(ColumnNames_New)

ColumnNames_Old = [
    f.col("Postcode"),
    f.col("Count1").alias("Count1_Old"),
    f.col("Count2").alias("Count2_Old"),
]

df2_OldData = df2.select(ColumnNames_Old)

# Joining two dataframes on postcode
Comparison_DF = df1_NewData.join(df2_OldData, "Postcode")

Columns_to_Compare = [f.col("Count1"), f.col("Count2")]

# Forming blank dataframe for change report
ChangeReport_RDD = spark.sparkContext.emptyRDD()
ChangeReport_Columns = [
    StructField("Type of Change", StringType(), True),
    StructField("Number of Occurences", IntegerType(), True),
]
ChangeReport_DF = spark.createDataFrame([], schema=StructType(ChangeReport_Columns))


def Form_Change_Report(
    ColumnList,
    Comparison_Dataframe: df1_NewData,
    ChangeReport_Dataframe: ChangeReport_DF,
):
    for x in ColumnList:
        Comparison_Dataframe = Comparison_Dataframe.withColumn(
            x + "_Change", when(col(x + "_Old") == col(x + "_New"), 0).otherwise(1)
        )
        Change_DF = Comparison_Dataframe.select(
            lit("Change to " + x).alias("Type of Change"),
            fsum(x + "_Change").alias("Number of Occurences"),
        )
        ChangeReport_Dataframe = ChangeReport_Dataframe.unionByName(Change_DF)

    return ChangeReport_Dataframe


ChangeReport_DF = Form_Change_Report(Columns_to_Compare, Comparison_DF, ChangeReport_DF)
Share Improve this question edited 2 days ago Steven 15.3k7 gold badges47 silver badges78 bronze badges asked Feb 17 at 15:44 Danylo KuznetsovDanylo Kuznetsov 133 bronze badges 5
  • Can you show your Columns_To_Compare? – Jonathan Commented Feb 18 at 0:55
  • That whould be great to have a reprex. – Steven Commented Feb 18 at 8:24
  • @Jonathan, my .Columns_To_Compare look like this: Columns_To_Compare=[ f.col("ColumnName1"), f.col("ColumnName2"), f.col("ColumnName3"), f.col("ColumnName4"), f.col("ColumnName5"), f.col("ColumnName6")] – Danylo Kuznetsov Commented Feb 18 at 9:17
  • @Steven, I've updated question with reprex, hope this helps – Danylo Kuznetsov Commented Feb 18 at 9:57
  • you probably just need DataFrame.summary and do it in one step: df1.join(df2,'Postcode').select([ when(~df1[c].eqNullSafe(df2[c]),1).alias(c+'_Change') for c in ['Count1', 'Count2']]).summary('count'). and then transpose the df and rename the column titles. – lihao Commented Feb 18 at 13:35
Add a comment  | 

1 Answer 1

Reset to default 0

Your reprex cannot run currently. I made some adjustments to make it work, hope it will help you find the problem. I believe it comes from this line :

# fix this line
# Columns_to_Compare = [f.col("Count1"), f.col("Count2")]
Columns_to_Compare = ["Count1", "Count2"]
# import pyspark.sql.functions as f
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, when
# from pyspark.sql.functions import sum as fsum
# from pyspark.sql.types import StructType, StructField

# simplified your import and added reduce for later
from pyspark.sql import functions as F
from functools import reduce


data1 = [
    ("Postcode1", "100", "150"),
    ("Postcode2", "200", "250"),
    ("Postcode3", "300", "350"),
    ("Postcode4", "400", "450"),
]
data2 = [
    ("Postcode1", "150", "150"),
    ("Postcode2", "200", "200"),
    ("Postcode3", "350", "350"),
    ("Postcode4", "400", "450"),
]
Columns = ["Postcode", "Count1", "Count2"]

# rdd1 = spark.sparkContext.parallelize(data1)
# rdd2 = spark.sparkContext.parallelize(data2)

# df1 = spark.createDataFrame(rdd1, schema=Columns)
# df2 = spark.createDataFrame(rdd2, schema=Columns)

# Create directly the DF
df1 = spark.createDataFrame(data1, schema=Columns)
df2 = spark.createDataFrame(data2, schema=Columns)

# rename f to F
ColumnNames_New = [
    F.col("Postcode"),
    F.col("Count1").alias("Count1_New"),
    F.col("Count2").alias("Count2_New"),
]

df1_NewData = df1.select(ColumnNames_New)

# rename f to F
ColumnNames_Old = [
    F.col("Postcode"),
    F.col("Count1").alias("Count1_Old"),
    F.col("Count2").alias("Count2_Old"),
]

df2_OldData = df2.select(ColumnNames_Old)

# Joining two dataframes on postcode
Comparison_DF = df1_NewData.join(df2_OldData, "Postcode")

# fix this line
# Columns_to_Compare = [f.col("Count1"), f.col("Count2")]
Columns_to_Compare = ["Count1", "Count2"]

# useless
# Forming blank dataframe for change report
# ChangeReport_RDD = spark.sparkContext.emptyRDD()
# ChangeReport_Columns = [
#     StructField("Type of Change", StringType(), True),
#     StructField("Number of Occurences", IntegerType(), True),
# ]
# ChangeReport_DF = spark.createDataFrame([], schema=StructType(ChangeReport_Columns))

# update the function with reduce and simplified imports
def Form_Change_Report(
    ColumnList,
    Comparison_Dataframe: df1_NewData,
):
    out = []
    for x in ColumnList:
        Comparison_Dataframe = Comparison_Dataframe.withColumn(
            x + "_Change",
            F.when((F.col(x + "_Old") == F.col(x + "_New")), 0).otherwise(1),
        )
        Change_DF = Comparison_Dataframe.select(
            F.lit("Change to " + x).alias("Type of Change"),
            F.sum(x + "_Change").alias("Number of Occurences"),
        )
        out.append(Change_DF)

    return reduce(lambda a, b: a.union(b), out)

# remove not used anymore ChangeReport_DF
# ChangeReport_DF = Form_Change_Report(Columns_to_Compare, Comparison_DF, ChangeReport_DF)
ChangeReport_DF = Form_Change_Report(Columns_to_Compare, Comparison_DF)

Result is :

Type of Change Number of Occurences
Change to Count1 2
Change to Count2 1
发布评论

评论列表(0)

  1. 暂无评论