最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - PySparkSpark connection to Geomesa Cassandra DB - Stack Overflow

programmeradmin3浏览0评论

Im trying to make a PySpark connection to Cassandra DB indexed with Geomesa. Searching about it, I noticed that it uses the Geotools spark runtime since there is no optimized runtime for Cassandra. I'm struggling searching about the keys and values I may used for this connection. My code is returning that it does not find a SpatialRDD.

Code

import geomesa_pyspark

print("Configuring")
conf = geomesa_pyspark.configure(
    jars=['path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'],
    packages=['geomesa_pyspark', 'pytz'],
    spark_home='path/to/spark'). \
    setAppName('MyTestApp')

conf.get('spark.master')

print(f"Configuration set: {conf.getAll()}")

from pyspark.sql import SparkSession

print("Imported Pyspark")

spark = SparkSession.builder.config(
    conf=conf
).config(
    "spark.driver.extraClassPath",
    "path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).config(
    "spark.executor.extraClassPath",
    "path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).enableHiveSupport().getOrCreate()

print("SparkSession UP")
params = {
    "geotools": "true",
    "dbtype": "cassandradb", # Also tried with "cassandra"
    "cassandra.contact.point": "contact_point",
    "cassandra.username": "user",
    "cassandra.password": "password",
    "cassandra.keyspace": "keyspace",
    "cassandra.catalog": "catalog",
    "geomesa.feature": "feature"
}

print("Spark loading")

df = spark.read.format("geomesa").options(**params).load()

print("Loaded DB")

df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()

Response

Configuring
Configuration set: [('spark.master', 'yarn'), ('spark.yarn.dist.jars', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.yarn.dist.files', '/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip,/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip,/tmp/spark-python-3.9/geomesa_pyspark-5.2.0.zip,/tmp/spark-python-3.9/pytz-2025.1.zip'), ('spark.executorEnv.PYTHONPATH', 'py4j-0.10.9.7-src.zip:pyspark.zip:geomesa_pyspark-5.2.0.zip:pytz-2025.1.zip'), ('spark.executorEnv.PYSPARK_PYTHON', '/usr/bin/python'), ('spark.driver.extraClassPath', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.app.name', 'MyTestApp')]
Imported Pyspark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 21:13:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/04 21:13:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip added multiple times to distributed cache.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip added multiple times to distributed cache.
SparkSession UP
Spark loading
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.MergedViewConfigLoader): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.RouteSelector): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
Traceback (most recent call last):
  File "/home/rocky/test2.py", line 46, in <module>
    df = spark.read.format("geomesa").options(**params).load()
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 314, in load
    return self._df(self._jreader.load())
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.load.
: java.lang.RuntimeException: Could not find a SpatialRDDProvider
    at org.locationtech.geomesa.spark.GeoMesaSpark$.$anonfun$apply$2(GeoMesaSpark.scala:20)
    at scala.Option.getOrElse(Option.scala:189)
    at org.locationtech.geomesa.spark.GeoMesaSpark$.apply(GeoMesaSpark.scala:20)
    at org.locationtech.geomesa.spark.sql.GeoMesaRelation$.apply(GeoMesaRelation.scala:162)
    at org.locationtech.geomesa.spark.sql.GeoMesaDataSource.createRelation(GeoMesaDataSource.scala:45)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4jmands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4jmands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)

I had checked documentation and made some test with the keys but it still returning this error. I also had passed through some other errors about Hadoop and Java config.

Jars mentioned on the comment.

"org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0," "com.datastax.cassandra:cassandra-driver-core:4.0.0," "org.locationtech.geomesa:geomesa-cassandra-datastore_2.12:5.2.0," "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1," "org.locationtech.geomesa:geomesa-cassandra_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-gs-plugin_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-dist_2.12:5.2.0," "org.locationtech.geomesa:geomesa-spark-jts_2.12:5.2.0," "org.locationtech.geomesa:geomesa-spark-core_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-tools_2.12:5.2.0"

Im trying to make a PySpark connection to Cassandra DB indexed with Geomesa. Searching about it, I noticed that it uses the Geotools spark runtime since there is no optimized runtime for Cassandra. I'm struggling searching about the keys and values I may used for this connection. My code is returning that it does not find a SpatialRDD.

Code

import geomesa_pyspark

print("Configuring")
conf = geomesa_pyspark.configure(
    jars=['path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'],
    packages=['geomesa_pyspark', 'pytz'],
    spark_home='path/to/spark'). \
    setAppName('MyTestApp')

conf.get('spark.master')

print(f"Configuration set: {conf.getAll()}")

from pyspark.sql import SparkSession

print("Imported Pyspark")

spark = SparkSession.builder.config(
    conf=conf
).config(
    "spark.driver.extraClassPath",
    "path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).config(
    "spark.executor.extraClassPath",
    "path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).enableHiveSupport().getOrCreate()

print("SparkSession UP")
params = {
    "geotools": "true",
    "dbtype": "cassandradb", # Also tried with "cassandra"
    "cassandra.contact.point": "contact_point",
    "cassandra.username": "user",
    "cassandra.password": "password",
    "cassandra.keyspace": "keyspace",
    "cassandra.catalog": "catalog",
    "geomesa.feature": "feature"
}

print("Spark loading")

df = spark.read.format("geomesa").options(**params).load()

print("Loaded DB")

df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()

Response

Configuring
Configuration set: [('spark.master', 'yarn'), ('spark.yarn.dist.jars', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.yarn.dist.files', '/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip,/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip,/tmp/spark-python-3.9/geomesa_pyspark-5.2.0.zip,/tmp/spark-python-3.9/pytz-2025.1.zip'), ('spark.executorEnv.PYTHONPATH', 'py4j-0.10.9.7-src.zip:pyspark.zip:geomesa_pyspark-5.2.0.zip:pytz-2025.1.zip'), ('spark.executorEnv.PYSPARK_PYTHON', '/usr/bin/python'), ('spark.driver.extraClassPath', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.app.name', 'MyTestApp')]
Imported Pyspark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 21:13:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/04 21:13:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip added multiple times to distributed cache.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip added multiple times to distributed cache.
SparkSession UP
Spark loading
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.MergedViewConfigLoader): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.RouteSelector): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
Traceback (most recent call last):
  File "/home/rocky/test2.py", line 46, in <module>
    df = spark.read.format("geomesa").options(**params).load()
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 314, in load
    return self._df(self._jreader.load())
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.load.
: java.lang.RuntimeException: Could not find a SpatialRDDProvider
    at org.locationtech.geomesa.spark.GeoMesaSpark$.$anonfun$apply$2(GeoMesaSpark.scala:20)
    at scala.Option.getOrElse(Option.scala:189)
    at org.locationtech.geomesa.spark.GeoMesaSpark$.apply(GeoMesaSpark.scala:20)
    at org.locationtech.geomesa.spark.sql.GeoMesaRelation$.apply(GeoMesaRelation.scala:162)
    at org.locationtech.geomesa.spark.sql.GeoMesaDataSource.createRelation(GeoMesaDataSource.scala:45)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)

I had checked documentation and made some test with the keys but it still returning this error. I also had passed through some other errors about Hadoop and Java config.

Jars mentioned on the comment.

"org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0," "com.datastax.cassandra:cassandra-driver-core:4.0.0," "org.locationtech.geomesa:geomesa-cassandra-datastore_2.12:5.2.0," "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1," "org.locationtech.geomesa:geomesa-cassandra_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-gs-plugin_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-dist_2.12:5.2.0," "org.locationtech.geomesa:geomesa-spark-jts_2.12:5.2.0," "org.locationtech.geomesa:geomesa-spark-core_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-tools_2.12:5.2.0"

Share Improve this question edited 2 days ago Erick Ramirez 16.3k2 gold badges21 silver badges30 bronze badges asked Feb 7 at 13:54 TotopoTotopo 11 bronze badge New contributor Totopo is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct. 1
  • did you already try .config("spark.jars.packages", "org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0") instead of spark.executor/driver.extraClassPath? – Kashyap Commented yesterday
Add a comment  | 

1 Answer 1

Reset to default 0

You should just need to get the Cassandra data store jars onto your classpath. There isn't a cassandra-spark bundled jar that contains all the dependencies, but at a minimum you would need the geomesa-gt-spark-runtime, geomesa-cassandra-datastore and com.datastax.cassandra:cassandra-driver-core jars. At that point, if it still can't load the datastore, you should get some ClassNotFound errors which you can use to determine any missing jars. You can reference the GeoMesa-Cassandra CLI setup for the full set of jars (although many of those will already be present in the geomesa-gt-spark-runtime bundled jar).

If you do get it working, please contribute the instructions back to the GeoMesa documentation!

As a side note, Cassandra doesn't natively use Hadoop, so if you plan to use Spark heavily you may wish to use something like HBase instead, which uses HDFS for storage and may perform better when loading data through map-reduce/Spark.

发布评论

评论列表(0)

  1. 暂无评论