Im trying to make a PySpark connection to Cassandra DB indexed with Geomesa. Searching about it, I noticed that it uses the Geotools spark runtime since there is no optimized runtime for Cassandra. I'm struggling searching about the keys and values I may used for this connection. My code is returning that it does not find a SpatialRDD.
Code
import geomesa_pyspark
print("Configuring")
conf = geomesa_pyspark.configure(
jars=['path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'],
packages=['geomesa_pyspark', 'pytz'],
spark_home='path/to/spark'). \
setAppName('MyTestApp')
conf.get('spark.master')
print(f"Configuration set: {conf.getAll()}")
from pyspark.sql import SparkSession
print("Imported Pyspark")
spark = SparkSession.builder.config(
conf=conf
).config(
"spark.driver.extraClassPath",
"path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).config(
"spark.executor.extraClassPath",
"path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).enableHiveSupport().getOrCreate()
print("SparkSession UP")
params = {
"geotools": "true",
"dbtype": "cassandradb", # Also tried with "cassandra"
"cassandra.contact.point": "contact_point",
"cassandra.username": "user",
"cassandra.password": "password",
"cassandra.keyspace": "keyspace",
"cassandra.catalog": "catalog",
"geomesa.feature": "feature"
}
print("Spark loading")
df = spark.read.format("geomesa").options(**params).load()
print("Loaded DB")
df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()
Response
Configuring
Configuration set: [('spark.master', 'yarn'), ('spark.yarn.dist.jars', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.yarn.dist.files', '/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip,/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip,/tmp/spark-python-3.9/geomesa_pyspark-5.2.0.zip,/tmp/spark-python-3.9/pytz-2025.1.zip'), ('spark.executorEnv.PYTHONPATH', 'py4j-0.10.9.7-src.zip:pyspark.zip:geomesa_pyspark-5.2.0.zip:pytz-2025.1.zip'), ('spark.executorEnv.PYSPARK_PYTHON', '/usr/bin/python'), ('spark.driver.extraClassPath', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.app.name', 'MyTestApp')]
Imported Pyspark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 21:13:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/04 21:13:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip added multiple times to distributed cache.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip added multiple times to distributed cache.
SparkSession UP
Spark loading
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.MergedViewConfigLoader): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.RouteSelector): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
Traceback (most recent call last):
File "/home/rocky/test2.py", line 46, in <module>
df = spark.read.format("geomesa").options(**params).load()
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 314, in load
return self._df(self._jreader.load())
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.load.
: java.lang.RuntimeException: Could not find a SpatialRDDProvider
at org.locationtech.geomesa.spark.GeoMesaSpark$.$anonfun$apply$2(GeoMesaSpark.scala:20)
at scala.Option.getOrElse(Option.scala:189)
at org.locationtech.geomesa.spark.GeoMesaSpark$.apply(GeoMesaSpark.scala:20)
at org.locationtech.geomesa.spark.sql.GeoMesaRelation$.apply(GeoMesaRelation.scala:162)
at org.locationtech.geomesa.spark.sql.GeoMesaDataSource.createRelation(GeoMesaDataSource.scala:45)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4jmands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4jmands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
I had checked documentation and made some test with the keys but it still returning this error. I also had passed through some other errors about Hadoop and Java config.
Jars mentioned on the comment.
"org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0,"
"com.datastax.cassandra:cassandra-driver-core:4.0.0,"
"org.locationtech.geomesa:geomesa-cassandra-datastore_2.12:5.2.0,"
"com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
"org.locationtech.geomesa:geomesa-cassandra_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-gs-plugin_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-dist_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-spark-jts_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-spark-core_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-tools_2.12:5.2.0"
Im trying to make a PySpark connection to Cassandra DB indexed with Geomesa. Searching about it, I noticed that it uses the Geotools spark runtime since there is no optimized runtime for Cassandra. I'm struggling searching about the keys and values I may used for this connection. My code is returning that it does not find a SpatialRDD.
Code
import geomesa_pyspark
print("Configuring")
conf = geomesa_pyspark.configure(
jars=['path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'],
packages=['geomesa_pyspark', 'pytz'],
spark_home='path/to/spark'). \
setAppName('MyTestApp')
conf.get('spark.master')
print(f"Configuration set: {conf.getAll()}")
from pyspark.sql import SparkSession
print("Imported Pyspark")
spark = SparkSession.builder.config(
conf=conf
).config(
"spark.driver.extraClassPath",
"path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).config(
"spark.executor.extraClassPath",
"path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).enableHiveSupport().getOrCreate()
print("SparkSession UP")
params = {
"geotools": "true",
"dbtype": "cassandradb", # Also tried with "cassandra"
"cassandra.contact.point": "contact_point",
"cassandra.username": "user",
"cassandra.password": "password",
"cassandra.keyspace": "keyspace",
"cassandra.catalog": "catalog",
"geomesa.feature": "feature"
}
print("Spark loading")
df = spark.read.format("geomesa").options(**params).load()
print("Loaded DB")
df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()
Response
Configuring
Configuration set: [('spark.master', 'yarn'), ('spark.yarn.dist.jars', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.yarn.dist.files', '/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip,/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip,/tmp/spark-python-3.9/geomesa_pyspark-5.2.0.zip,/tmp/spark-python-3.9/pytz-2025.1.zip'), ('spark.executorEnv.PYTHONPATH', 'py4j-0.10.9.7-src.zip:pyspark.zip:geomesa_pyspark-5.2.0.zip:pytz-2025.1.zip'), ('spark.executorEnv.PYSPARK_PYTHON', '/usr/bin/python'), ('spark.driver.extraClassPath', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.app.name', 'MyTestApp')]
Imported Pyspark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 21:13:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/04 21:13:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip added multiple times to distributed cache.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip added multiple times to distributed cache.
SparkSession UP
Spark loading
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.MergedViewConfigLoader): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.RouteSelector): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
Traceback (most recent call last):
File "/home/rocky/test2.py", line 46, in <module>
df = spark.read.format("geomesa").options(**params).load()
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 314, in load
return self._df(self._jreader.load())
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.load.
: java.lang.RuntimeException: Could not find a SpatialRDDProvider
at org.locationtech.geomesa.spark.GeoMesaSpark$.$anonfun$apply$2(GeoMesaSpark.scala:20)
at scala.Option.getOrElse(Option.scala:189)
at org.locationtech.geomesa.spark.GeoMesaSpark$.apply(GeoMesaSpark.scala:20)
at org.locationtech.geomesa.spark.sql.GeoMesaRelation$.apply(GeoMesaRelation.scala:162)
at org.locationtech.geomesa.spark.sql.GeoMesaDataSource.createRelation(GeoMesaDataSource.scala:45)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
I had checked documentation and made some test with the keys but it still returning this error. I also had passed through some other errors about Hadoop and Java config.
Jars mentioned on the comment.
"org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0,"
"com.datastax.cassandra:cassandra-driver-core:4.0.0,"
"org.locationtech.geomesa:geomesa-cassandra-datastore_2.12:5.2.0,"
"com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
"org.locationtech.geomesa:geomesa-cassandra_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-gs-plugin_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-dist_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-spark-jts_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-spark-core_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-tools_2.12:5.2.0"
1 Answer
Reset to default 0You should just need to get the Cassandra data store jars onto your classpath. There isn't a cassandra-spark bundled jar that contains all the dependencies, but at a minimum you would need the geomesa-gt-spark-runtime
, geomesa-cassandra-datastore
and com.datastax.cassandra:cassandra-driver-core
jars. At that point, if it still can't load the datastore, you should get some ClassNotFound errors which you can use to determine any missing jars. You can reference the GeoMesa-Cassandra CLI setup for the full set of jars (although many of those will already be present in the geomesa-gt-spark-runtime
bundled jar).
If you do get it working, please contribute the instructions back to the GeoMesa documentation!
As a side note, Cassandra doesn't natively use Hadoop, so if you plan to use Spark heavily you may wish to use something like HBase instead, which uses HDFS for storage and may perform better when loading data through map-reduce/Spark.
.config("spark.jars.packages", "org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0")
instead ofspark.executor/driver.extraClassPath
? – Kashyap Commented yesterday