Spark使用Java开发其实比较方便的,JAVA8的lambda表达式使得编写体验并不比Scala差很多,但是因为Spark本身使用Scala实现,导致使用Java开发的时候,也遇到不少的类型匹配问题。
本文列举出自己在工作开发中遇到的一些问题,供大家参考:
WrappedArray和Vector
报错信息为:Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.ml.linalg.Vector
当使用DataFrame打印Schema的时候,是这样的输出:
|-- tag_weights: array (nullable = true) | |-- element: double (containsNull = true) |-- word_sims: array (nullable = true) | |-- element: double (containsNull = true)
这时候如果Java用Vector接收,就会报这个错误,JAVA代码为:
spark.udf().register( "computeWeightSim", new UDF2<Vector, Vector, Double>() { @Override public Double call(Vector tag_weights, Vector word_sims) throws Exception {
解决办法是使用WrappedArray<Long>来接收,这是个scala的类型,可以用Iterator做遍历:
scala.collection.Iterator<Long> it1 = view_qipuids.iterator(); scala.collection.Iterator<Long> it2 = view_cnts.iterator(); Map<Long, Long> viewMap = new HashMap<>(); while (it1.hasNext() && it2.hasNext()) { viewMap.put(it1.next(), it2.next()); }
或者可以zip两个iterator进行计算:
new UDF2<WrappedArray<Double>, WrappedArray<Double>, Double>() { /** * 计算加权权重 * @param tag_weights 加权 * @param word_sims 计算结果目标 * @return 加权权重 * @throws Exception */ @Override public Double call(WrappedArray<Double> tag_weights, WrappedArray<Double> word_sims) throws Exception { scala.collection.Iterator<Double> tag_weightsIter = tag_weights.iterator(); scala.collection.Iterator<Double> word_simsIter = word_sims.iterator(); scala.collection.Iterator<Tuple2<Double, Double>> zipIterator = tag_weightsIter.zip(word_simsIter); double totalWeight = 0.0; double fenziWeight = 0.0; while (zipIterator.hasNext()) { Tuple2<Double, Double> iterTuple = zipIterator.next(); totalWeight += iterTuple._1; fenziWeight += iterTuple._1 * iterTuple._2; } if (totalWeight == 0.0) { return 0.0; } else { return fenziWeight / totalWeight; } } }
详细内容见scala的文档:https://docs.scala-lang.org/overviews/collections/iterators.html