Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] spark 3.5.3 is not supported. API incompatibility #12099

Open
melin opened this issue Oct 15, 2024 · 11 comments
Open

[BUG] spark 3.5.3 is not supported. API incompatibility #12099

melin opened this issue Oct 15, 2024 · 11 comments
Labels
priority:critical production down; pipelines stalled; Need help asap. release-1.0.0-beta2 spark Issues related to spark version-compatibility

Comments

@melin
Copy link

melin commented Oct 15, 2024

spark 3.5.1 is supported, but not since 3.5.2

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.catalog.SessionCatalog.invalidateCachedTable(Lorg/apache/spark/sql/catalyst/QualifiedTableName;)V
	at org.apache.spark.sql.hudi.command.DropHoodieTableCommand.run(DropHoodieTableCommand.scala:52)
	at org.apache.spark.sql.hudi.catalog.HoodieCatalog.dropTable(HoodieCatalog.scala:159)
	at org.apache.spark.sql.execution.datasources.v2.DropTableExec.run(DropTableExec.scala:38)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)

@rangareddy
Copy link

Hi @melin

Could you please let me know which Hudi version you have used for testing?

@melin
Copy link
Author

melin commented Oct 15, 2024

Hi @melin

Could you please let me know which Hudi version you have used for testing?

hudi 0.15.0, This pr changes the api: https://issues.apache.org/jira/browse/SPARK-49211

@rangareddy
Copy link

As per the Hudi Git repository, the supported Spark version is 3.5.1, however I am also able to run the Hudi examples using Spark 3.5.2, which is not officially supported.

<spark3.version>3.5.1</spark3.version>

spark_3_5_2_hudi_0_15.log

Reference:

@ad1happy2go
Copy link
Collaborator

@melin Thanks for raising this. This may need a fix. Can you please tell us more about what operation you are doing? I see its failing in DropHoodieTableCommand.

Did you tried to run same code with spark version 3.5.1 ?

@ad1happy2go ad1happy2go added priority:critical production down; pipelines stalled; Need help asap. version-compatibility spark Issues related to spark labels Oct 15, 2024
@melin
Copy link
Author

melin commented Oct 15, 2024

@melin Thanks for raising this. This may need a fix. Can you please tell us more about what operation you are doing? I see its failing in DropHoodieTableCommand.

Did you tried to run same code with spark version 3.5.1 ?

The drop table sql fails to be executed. The same code is correct when spark 3.5.1 is executed.

@rangareddy
Copy link

Hi @melin

I successfully ran the sample code using Spark 3.5.3, and it worked without any issues when writing and reading the Hudi table data.

It is possible to share the sample reproducible code, so that i can test it.

spark_3_5_3_hudi_0_15.log

@melin
Copy link
Author

melin commented Oct 16, 2024

Try sql: drop table if exists bigdata.hudi_users_kafka
image

@rangareddy
Copy link

Hi @melin

I have re-tested with Spark 3.5.3 and Hudi 0.15, and I did not encounter any issues. I am sharing a sample pom.xml file and Scala code for your reference. Please test and let me know the results.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <artifactId>Spark_Hudi_100</artifactId>
    <version>1.0.0</version>
    <groupId>com.ranga</groupId>

    <properties>
        <java.version>8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>3.5.3</spark.version>
        <spark.major.version>3.5</spark.major.version>
        <scala.version>2.12.18</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hudi.version>0.15.0</hudi.version>
        <maven-shade-plugin.version>3.5.0</maven-shade-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark${spark.major.version}-bundle_${scala.binary.version}</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-hadoop-mr-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>${maven-shade-plugin.version}</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>false</minimizeJar>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <includes>
                                    <!-- Include here the dependencies you want to be packed in your fat jar -->
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>module-info.class</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/*.MF</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

Test12099.scala

package com.ranga

import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}

object Test12099 extends App {
  val name = this.getClass.getSimpleName.replace("$", "")
  val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

  val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

  val tableName = name

  spark.sql(
    f"""
      |CREATE TABLE IF NOT EXISTS ${tableName} (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '/tmp/warehouse/t_test'
    """.stripMargin)

  val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
  ))

  val input_data = Seq(
    Row(1L, "hello", 42,  1695159649087L),
    Row(2L, "world", 13, 1695091554788L),
    Row(3L, "spark", 7, 1695115999911L),
    Row(1L, "hello", 43,  1695159649087L),
  )

  val basePath = f"file:///tmp/$tableName"
  val hoodieConf = scala.collection.mutable.Map[String, String]()
  hoodieConf.put("hoodie.datasource.write.recordkey.field", "id,age")
  hoodieConf.put("hoodie.table.precombine.field", "ts")
  hoodieConf.put("hoodie.table.name", tableName)

  val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)
  input_df.write.format("hudi").
    options(hoodieConf).
    mode("append").
    save(basePath)

  spark.read.format("hudi").load(basePath).show(false)
  println("Displaying the tables")
  spark.sql("SHOW tables").show(truncate = false)
  println("Drop the table")
  spark.sql(f"DROP TABLE ${tableName}")
  println("Displaying the tables")
  spark.sql("SHOW tables").show(truncate = false)
  spark.stop()
}

Output:

Displaying the tables
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 60.211 ms
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 31.001125 ms
24/10/16 15:52:28 INFO CodeGenerator: Code generated in 18.569542 ms
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|default  |test12099|false      |
+---------+---------+-----------+

Drop the table
Displaying the tables
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

@melin
Copy link
Author

melin commented Oct 16, 2024

The DropHoodieTableCommand class calls invalidateCachedTable with the QualifiedTableName object passed as the parameter. The FullQualifiedTableName class was added in spark 3.5.3. The invalidateCachedTable method supports only FullQualifiedTableName and TableIdentifier.

The spark version in the hudi project is 3.5.3. idea displays an error calling invalidateCachedTable
image

@rangareddy
Copy link

Hi @melin

Thanks for sharing detailed analysis. Could you please try my example code and see you can reproduce the issue. If we can reproduce the issue we can raise HUDI jira.

@melin
Copy link
Author

melin commented Oct 17, 2024

Your code will work. idea breakpoint debugging, DropHoodieTableCommand run method is not entered, I guess the hive catalog is not used, the execution is different.
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. release-1.0.0-beta2 spark Issues related to spark version-compatibility
Projects
Status: 🚧 Needs Repro
Development

No branches or pull requests

3 participants