Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug report] Can't load filesystem 'gs' when use spark to access Gravitino GCS bundles #5609

Closed
yuqi1129 opened this issue Nov 19, 2024 · 1 comment
Assignees
Labels
0.8.0 Release v0.8.0 bug Something isn't working

Comments

@yuqi1129
Copy link
Contributor

yuqi1129 commented Nov 19, 2024

Version

main branch

Describe what's wrong

When I running spark to access GCS, the following error occur:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 955, in csv
    self._jwrite.csv(path)
  File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1309, in __call__
    return_value = get_return_value(
  File "/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o104.csv.
: org.apache.gravitino.exceptions.GravitinoRuntimeException: Exception occurs when create new FileSystem for actual uri: gs://example_qazwsx/example/people, msg: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
	at org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.lambda$getFilesetContext$2(GravitinoVirtualFileSystem.java:431)
	at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
	at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFilesetContext(GravitinoVirtualFileSystem.java:386)
	at org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFileStatus(GravitinoVirtualFileSystem.java:547)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1862)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:839)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)

Error message and/or stacktrace

please see above.

How to reproduce

  1. create a metalake named test, gcs catalog name gcs_catalog, schema named schema and fileset_named example.
  2. install pyspark=3.2.0 and gravitino python client.
pip install apache-gravitino==0.7.0
pip install pyspark==3.2.0
  1. use the following code to access GCS.
import logging
logging.basicConfig(level=logging.INFO)

from gravitino import NameIdentifier, GravitinoClient, Catalog, Fileset, GravitinoAdminClient

gravitino_url = "http://localhost:8090"
metalake_name = "test"

catalog_name = "gcs_catalog"
schema_name = "schema"
fileset_name = "example"

fileset_ident = NameIdentifier.of(schema_name, fileset_name)

gravitino_admin_client = GravitinoAdminClient(uri=gravitino_url)
gravitino_client = GravitinoClient(uri=gravitino_url, metalake_name=metalake_name)
from pyspark.sql import SparkSession
import os


os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars /Users/yuqi/project/gravitino/bundles/gcp-bundle/build/libs/gravitino-gcp-bundle-0.8.0-incubating-SNAPSHOT.jar,/Users/yuqi/project/gravitino/clients/filesystem-hadoop3-runtime/build/libs/gravitino-filesystem-hadoop3-runtime-0.8.0-incubating-SNAPSHOT.jar --master local[1] pyspark-shell"

spark = SparkSession.builder \
    .appName("s3_fielset_test") \
    .config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs") \
    .config("spark.hadoop.fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem") \
    .config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090") \
    .config("spark.hadoop.fs.gravitino.client.metalake", "test") \
    .config("spark.hadoop.gcs-service-account-file", "/Users/yuqi/Downloads/silken-physics-431108-g3-30ab3d97bb60.json") \
    .config("spark.hadoop.fs.gvfs.filesystem.providers", "gcs") \
    .config("spark.driver.memory", "2g") \
    .config("spark.driver.port", "2048") \
    .config("spark.driver.extraClassPath", "/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar") \
    .config("spark.executor.extraClassPath", "/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("DEBUG")  

data = [("Alice", 25), ("Bob", 30), ("Cathy", 45)]
columns = ["Name", "Age"]
spark_df = spark.createDataFrame(data, schema=columns)
gvfs_path = f"gvfs://fileset/{catalog_name}/{schema_name}/{fileset_name}/people"

spark_df.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(gvfs_path)
spark.stop()   

Additional context

image

image

Reason: The value of FILE_SYSTEMS_LOADED in FileSystem is always true before provider.getFileSystem(filePath, maps); in GravitinoVirtualFileSystem and the GCS filesystem has not been loaded please see the value of SERVICE_FILE_SYSTEMS before and after:

Before

{viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class org.apache.hadoop.fs.HarFileSystem, http=class org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class org.apache.hadoop.fs.http.HttpsFileSystem}

After:

{viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class org.apache.hadoop.fs.HarFileSystem, http=class org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class org.apache.hadoop.fs.http.HttpsFileSystem}

We need to call ServiceLoader again to load all the FileSystem.

@yuqi1129 yuqi1129 added the bug Something isn't working label Nov 19, 2024
@yuqi1129 yuqi1129 changed the title [Bug report] Can't load filesystem 'gs' will use spark to acess Gravitino GCS bundles [Bug report] Can't load filesystem 'gs' when use spark to access Gravitino GCS bundles Nov 19, 2024
@jerryshao jerryshao added the 0.8.0 Release v0.8.0 label Dec 30, 2024
@yuqi1129
Copy link
Contributor Author

yuqi1129 commented Jan 2, 2025

This has been fixed.

@yuqi1129 yuqi1129 closed this as completed Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.8.0 Release v0.8.0 bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants