添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Hello, I use spark 3.4.1-hadooop 3 on windows 11. And I am struggling to generate the schema of csv data with schema_of csv function. Below is my java codes.

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("kafka.bootstrap.servers", "localhost:9092");
kafkaParams.put("subscribe", "topic_unempl_ann");
kafkaParams.put("startingOffsets", "earliest");
SparkSession spark = SparkSession.builder().master("local[*]").appName("kafka_test").getOrCreate();
Dataset<Row> df = spark.read().format("kafka")
            .options(kafkaParams)
            .load()
            .selectExpr("CAST(value AS STRING) as column").filter(not(col("column").startsWith("date")));
Map<String, String> options = new HashMap<String, String>();
options.put("delimiter", ",");
options.put("header", "true");
options.put("inferSchema", "true");
String schemaString = "date,value,id,title,state,frequency_short,units_short,seasonal_adjustment_short";
Column schema = schema_of_csv(schemaString);
Dataset<Row> parsed = df.select(from_csv(col("column"), schema, options).as("entityStoredPojo"))
                 .selectExpr("entityStoredPojo.date", "entityStoredPojo.value", "entityStoredPojo.id", 
                        "entityStoredPojo.title", "entityStoredPojo.state", "entityStoredPojo.frequency_short", 
                        "entityStoredPojo.units_short", "entityStoredPojo.seasonal_adjustment_short").toDF();

But these codes throw errors like below,

Exception in thread "main" org.apache.spark.sql.AnalysisException: [FIELD_NOT_FOUND] No such struct field `date` in `_c0`, `_c1`, `_c2`, `_c3`, `_c4`, `_c5`, `_c6`, `_c7`.; line 1 pos 0
        at org.apache.spark.sql.errors.QueryCompilationErrors$.noSuchStructFieldInGivenFieldsError(QueryCompilationErrors.scala:2115)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:83)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:56)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.$anonfun$resolve$3(package.scala:363)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:362)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:115)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpressionByPlanChildren$2(ColumnResolutionHelper.scala:378)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$3(ColumnResolutionHelper.scala:158)
        at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$1(ColumnResolutionHelper.scala:165)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.innerResolve$1(ColumnResolutionHelper.scala:136)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpression(ColumnResolutionHelper.scala:195)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:385)      
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:358)     
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.resolveExpressionByPlanChildren(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.$anonfun$applyOrElse$110(Analyzer.scala:1638)    
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1638)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4196)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1578)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1612)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1611)
        at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:53)

I think the schema is not generated successfully but by default column names like _c0, _c1, _c2.

Kindly inform me how to generate the schema correctly with the schema_of_csv function which parameter type is string.


@Kaniz wrote:

Hi @aupres , To generate schema with org.apache.spark.sql.functions.schema_of_csv , you can use the following syntax:

scala
import org.apache.spark.sql.functions.schema_of_csv

val csvData = "1,abc"
val schema = schema_of_csv(csvData)

This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.

Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html


Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.

Hi @aupres , To generate schema with org.apache.spark.sql.functions.schema_of_csv , you can use the following syntax:

scala
import org.apache.spark.sql.functions.schema_of_csv

val csvData = "1,abc"
val schema = schema_of_csv(csvData)

This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.

Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html


@Kaniz wrote:

Hi @aupres , To generate schema with org.apache.spark.sql.functions.schema_of_csv , you can use the following syntax:

scala
import org.apache.spark.sql.functions.schema_of_csv

val csvData = "1,abc"
val schema = schema_of_csv(csvData)

This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.

Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html


Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.

Click here to register and join today!

Engage in exciting technical discussions , join a group with your peers and meet our Featured Members.

Very large binary files ingestion error when using binaryFile reader in Community Discussions Error on Workflow in Community Discussions Is it not needed to preserve the data in its original format anymore with the usage of medallion? in Community Discussions © Databricks 2023. All rights reserved. Apache, Apache Spark, Spark and the Spark logo are trademarks of the Apache Software Foundation.
  • Privacy Notice
  • Terms of Use
  • Your Privacy Choices
  • Your California Privacy Rights
  •