最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

pyspark - spark connect udf fails with "SparkContext or SparkSession should be created first" - Stack Overflow

programmeradmin2浏览0评论

I have a Spark Connect server running. Things are fine when I don't use UDFs (df.show() always works fine). But when I use UDF, it fails with SparkContext or SparkSession should be created first. Obviously SparkSession exists, because it created the Dataframe and printed it. It's only when it tries to apply a UDF to it that it fails.

Is this (udfs/things-using-spark-context) something simply not yet supported in spark connect?

Here is a reproducible example:

  • start a spark connect server (or on windows)
  • run pytest -k test_spark_connect to run simple udf test on a spark connect remote session.
  • run pytest -k test_spark to run same simple udf test on a new local spark session.

PS: running pytest to run both tests at the same time will cause conflicts, so must be run separately.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf

@udf
def to_upper(s):
    if s is not None:
        return s.upper()

def test_spark_connect():
    simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())

def test_spark():
    simple_spark_test(SparkSession.builder.getOrCreate())

def simple_spark_test(session_to_test: SparkSession):
    print(f'\nss: {session_to_test}')
    print(f'ss.conf.get("spark.app.name"): {session_to_test.conf.get("spark.app.name")}')

    df = session_to_test.createDataFrame([(1, "John Doe")], ("id", "name"))
    df.show()
    df.select(col("name"), to_upper("name")).show()

test_spark_connect

$ pytest -k test_spark_connect
... snip ...
conftest.py::test_spark_connect FAILED                                   [100%]
ss: <pyspark.sql.connect.session.SparkSession object at 0x0000022C1D8C37F0>
ss.conf.get("spark.app.name"): Spark Connect server
+---+--------+
| id|    name|
+---+--------+
|  1|John Doe|
+---+--------+

conftest.py:9 (test_spark_connect)
def test_spark_connect():
>       simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())

conftest.py:11: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
conftest.py:22: in simple_spark_test
    df.select(col("name"), to_upper("name")).show()
.venv\lib\site-packages\pyspark\sql\udf.py:425: in wrapper
    return self(*args)
.venv\lib\site-packages\pyspark\sql\udf.py:340: in __call__
    sc = get_active_spark_context()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    def get_active_spark_context() -> SparkContext:
        """Raise RuntimeError if SparkContext is not initialized,
        otherwise, returns the active SparkContext."""
        sc = SparkContext._active_spark_context
        if sc is None or sc._jvm is None:
>           raise RuntimeError("SparkContext or SparkSession should be created first.")
E           RuntimeError: SparkContext or SparkSession should be created first.

.venv\lib\site-packages\pyspark\sql\utils.py:248: RuntimeError
$

test_spark

$ pytest -k test_spark
conftest.py::test_spark
... snip ...
25/03/03 12:25:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
PASSED                                           [100%]
ss: <pyspark.sql.session.SparkSession object at 0x000001EA41099540>
ss.conf.get("spark.app.name"): pyspark-shell
+---+--------+
| id|    name|
+---+--------+
|  1|John Doe|
+---+--------+

+--------+--------------+
|    name|to_upper(name)|
+--------+--------------+
|John Doe|      JOHN DOE|
+--------+--------------+

$

I have a Spark Connect server running. Things are fine when I don't use UDFs (df.show() always works fine). But when I use UDF, it fails with SparkContext or SparkSession should be created first. Obviously SparkSession exists, because it created the Dataframe and printed it. It's only when it tries to apply a UDF to it that it fails.

Is this (udfs/things-using-spark-context) something simply not yet supported in spark connect?

Here is a reproducible example:

  • start a spark connect server (or on windows)
  • run pytest -k test_spark_connect to run simple udf test on a spark connect remote session.
  • run pytest -k test_spark to run same simple udf test on a new local spark session.

PS: running pytest to run both tests at the same time will cause conflicts, so must be run separately.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf

@udf
def to_upper(s):
    if s is not None:
        return s.upper()

def test_spark_connect():
    simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())

def test_spark():
    simple_spark_test(SparkSession.builder.getOrCreate())

def simple_spark_test(session_to_test: SparkSession):
    print(f'\nss: {session_to_test}')
    print(f'ss.conf.get("spark.app.name"): {session_to_test.conf.get("spark.app.name")}')

    df = session_to_test.createDataFrame([(1, "John Doe")], ("id", "name"))
    df.show()
    df.select(col("name"), to_upper("name")).show()

test_spark_connect

$ pytest -k test_spark_connect
... snip ...
conftest.py::test_spark_connect FAILED                                   [100%]
ss: <pyspark.sql.connect.session.SparkSession object at 0x0000022C1D8C37F0>
ss.conf.get("spark.app.name"): Spark Connect server
+---+--------+
| id|    name|
+---+--------+
|  1|John Doe|
+---+--------+

conftest.py:9 (test_spark_connect)
def test_spark_connect():
>       simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())

conftest.py:11: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
conftest.py:22: in simple_spark_test
    df.select(col("name"), to_upper("name")).show()
.venv\lib\site-packages\pyspark\sql\udf.py:425: in wrapper
    return self(*args)
.venv\lib\site-packages\pyspark\sql\udf.py:340: in __call__
    sc = get_active_spark_context()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    def get_active_spark_context() -> SparkContext:
        """Raise RuntimeError if SparkContext is not initialized,
        otherwise, returns the active SparkContext."""
        sc = SparkContext._active_spark_context
        if sc is None or sc._jvm is None:
>           raise RuntimeError("SparkContext or SparkSession should be created first.")
E           RuntimeError: SparkContext or SparkSession should be created first.

.venv\lib\site-packages\pyspark\sql\utils.py:248: RuntimeError
$

test_spark

$ pytest -k test_spark
conftest.py::test_spark
... snip ...
25/03/03 12:25:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
PASSED                                           [100%]
ss: <pyspark.sql.session.SparkSession object at 0x000001EA41099540>
ss.conf.get("spark.app.name"): pyspark-shell
+---+--------+
| id|    name|
+---+--------+
|  1|John Doe|
+---+--------+

+--------+--------------+
|    name|to_upper(name)|
+--------+--------------+
|John Doe|      JOHN DOE|
+--------+--------------+

$
Share Improve this question asked Mar 3 at 18:48 KashyapKashyap 17.6k14 gold badges74 silver badges122 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

I met the same error on Spark 3.5.5. Spark sql can run, but failed on udf:

>>> spark.sql("select 1").show()
+---+
|  1|
+---+
|  1|
+---+

>>> df.select(to_upper("name"), add_one("age")).show()
Traceback (most recent call last):
  File "<python-input-13>", line 1, in <module>
    df.select(to_upper("name"), add_one("age")).show()
              ~~~~~~~~^^^^^^^^
  File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/udf.py", line 423, in wrapper
    return self(*args)
  File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/udf.py", line 339, in __call__
    sc = get_active_spark_context()
  File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/utils.py", line 248, in get_active_spark_context
    raise RuntimeError("SparkContext or SparkSession should be created first.")
RuntimeError: SparkContext or SparkSession should be created first.

I finally solved it by defining the Spark connect session before defining the udf function. Only in this way, PySpark can know to use the spark connect version of udf registering. Also, python version on client side must be same as that on executors. For instance, you can only use python 3.8 client for Spark 3.5.5

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.remote("sc://localhost:15002/").create()
>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)")).show()
+----------+
|slen(name)|
+----------+
|         8|
+----------+

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论