万字长文详解Java lambda表达式

发表于 1年以前  | 总阅读数:418 次

本文的脉络

Lambda介绍

何为lambda

咱们首先来说说 Lambda 这个名字,Lambda 并不是一个什么的缩写,它是希腊第十一个字母 λ 的读音,同时它也是微积分函数中的一个概念,所表达的意思是一个函数入参和出参定义,在编程语言中其实是借用了数学中的 λ,并且多了一点含义,在编程语言中功能代表它具体功能的叫法是匿名函数(Anonymous Function),根据百科的解释:> 匿名函数(英语:Anonymous Function)在计算机编程中是指一类无需定义标识符(函数名)的函数或子程序。

接着再来说说Lambda 的历史,虽然它在 JDK8 发布之后才正式出现,但是在编程语言界,它是一个具有悠久历史的东西,最早在 1958 年在Lisp 语言中首先采用,而且虽然Java脱胎于C++,但是C++在2011年已经发布了Lambda 了,但是 JDK8 的 LTS 在2014年才发布,所以 Java 被人叫做老土不是没有原因的,现代编程语言则是全部一出生就自带 Lambda 支持,所以Lambda 其实是越来越火的一个节奏~

Lambda 在编程语言中往往是一个匿名函数,也就是说Lambda 是一个抽象概念,而编程语言提供了配套支持,比如在 Java 中其实为Lambda 进行配套的就是函数式接口,通过函数式接口生成匿名类和方法进行Lambda 式的处理。

那么,既然是这一套规则我们明白了,那么Lambda 所提供的好处在Java中就是函数式接口所提供的能力了,函数式接口往往则是提供了一些通用能力,这些函数式接口在JDK中也有一套完整的实践,那就是 Stream。

不同语言中的Lambda

Python

lambda [arg1[,arg2,arg3....argN]]:expression

例子:


add2 = lambda x,y:x+y
print add2(1,2)     #3

sum2 = lambda x,y=10:x+y
print sum2(1)       #11
print sum2(1,100)   #101

C++

C++11中增加了对lambda表达式的支持


[ capture clause ] (parameters) -> return-type  
{   
   definition of method   
}

具体语法:

[1]:Lambda表达式的引入标志,在‘[]’里面可以填入‘=’或‘&’表示该lambda表达式“捕获”(lambda表达式在一定的scope可以访问的数据)的数据时以什么方式捕获的,‘&’表示一引用的方式;‘=’表明以值传递的方式捕获,除非专门指出。

[2]:Lambda表达式的参数列表

[3]:Mutable 标识

[4]:异常标识

[5]:返回值

[6]:“函数”体,也就是lambda表达式需要进行的实际操作。

例子:


void func(std::vector<int>& v) {
  std::for_each(v.begin(), v.end(), [](int i) {
      cout << i << endl;
  });
}

Javascript

(p1 [,p2,p3,....pn]) => { code block  }

例子:


let func  = x => x * x;
func(2) #4

Java Lambda 表达式

Lambda 表达式在 Java 8 中添加的。lambda 表达式是一小段代码,它接受参数并返回一个值。Lambda 表达式类似于方法,但它们不需要名称,并且可以直接在方法体中实现。

句法

最简单的 lambda 表达式包含一个参数和一个表达式:

零参数:

() -> System.out.println("零参数 lambda");

一个参数:

p -> System.out.println("一个参数:" + p);

多个参数:

(p1 [,p2,p3,....pn]) -> System.out.println("多个参数:" + p1 + ", " + p2 + ... + pn);

上面的表达式有一定的限制。它们要么返回一个值要么执行一段方法,并且它们不能包含变量、赋值或语句,例如if or for 。为了进行更复杂的操作,可以使用带有花括号的代码块。如果 lambda 表达式需要返回一个值,那么代码块应该有一个return语句。

(parameter1, parameter2) -> { code block [return] }

方法引用

  • 类 :: 静态方法
Consumer<String> c = [ (s) -> System.out.println(s);  <=>  System.out::println; ]
  • 对象 :: 实例方法

List<String> list = Lists.newArrayList();
Consumer<String> c = [ (e) => list.add(e);  <=>  list::add; ]
  • 构造器 :: new
Supplier<List<String>> s = [ () -> new ArrayList<>(); <=> ArrayList::new; ]

原生函数式接口

@FunctionalInterface注解

有且只有一个抽象方法的接口被称为函数式接口,函数式接口适用于函数式编程的场景,Lambda就是Java中函数式编程的体现,可以使用Lambda表达式创建一个函数式接口的对象,一定要确保接口中有且只有一个抽象方法,这样Lambda才能顺利的进行推导。

与@Override 注解的作用类似,Java 8中专门为函数式接口引入了一个新的注解:@FunctionalInterface 。该注解可用于一个接口的定义上,一旦使用该注解来定义接口,编译器将会强制检查该接口是否确实有且仅有一个抽象方法(equal和hashcode方法不算),否则将会报错。但是这个注解不是必须的,只要符合函数式接口的定义,那么这个接口就是函数式接口。

Consumer: 消费性接口

Consumer通过名字可以看出它是一个消费函数式接口,主要针对的是消费(1..n 入参, 无返回)这个场景,它的代码定义如下:


@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

通过泛型 T 定义了一个入参,但是没有返回值,它代表你可以针对这个入参做一些自定义逻辑,比较典型的例子是 forEach 方法。

例子:


List<String> list = Lists.newArrayList("1", "2", "3", "4", "5", "6");
list.foreach(System.out::println); //打印数组

Supplier: 供给型接口

Supplier通过名字比较难看出来它是一个场景的函数式接口,它主要针对的是说获取(无入参,有返回)这个场景,它的代码定义如下:


@FunctionalInterface
public interface Supplier<T> {
    T get();
}

通过泛型 T 定义了一个返回值类型,但是没有入参,它代表你可以针对调用方获取某个值,比较典型的例子是 Stream 中的 collect 方法,通过自定义传入我们想要取得的某种对象进行对象收集。

例子:


List<String> list = Lists.newArrayList("1", "2", "3", "4", "5", "6");
List<String> newList = list.stream().filter(x -> x >= 2).collect(Collectors.toList()); 
// 将大于等于2的数重新收集成一个集合,其中Collectors.toList()的函数原型为 
// new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,(left, right) -> { left.addAll(right); return left; },CH_ID)
// 原型中的ArrayList::new即为Supplier类型

Function: 函数型接口

Function 接口的名字不太能轻易看出来它的场景,它主要针对的则是 转换(有入参,有返回,其中T是入参,R是返回)这个场景,其实说转换可能也不太正确,它是一个覆盖范围比较广的场景,你也可以理解为扩展版的Consumer,接口定义如下:


@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

通过一个入参 T 进行自定义逻辑处理,最终得到一个出参 R,比较典型的例子是 Stream 中的 map 系列方法和 reduce 系列方法。

例子:


List<String> list = Lists.newArrayList("1", "2", "3", "4", "5", "6");
List<Integet> newList = list.stream().map(Integer::parseInt).collect(Collectors.toList());
// map将list中所有的元素的类型由 String 通过 Integer.parseInt的方式转换为Intger。简单来说就是A => B;

Predicate: 断言型接口

Predicate主要针对的是判断(有入参,有返回,凡是返回的类型固定为Boolean。可以说Function 是包含Predicate的 )这个场景,它的代码定义如下:


@FunctionalInterface
public interface Predicate<T> {
    boolean test(T t);
}

通过泛型 T 定义了一个入参,返回了一个布尔值,它代表你可以传入一段判断逻辑的函数,比较典型的例子是 Stream 中的 filter方法。


List<String> list = Lists.newArrayList("1", "2", "3", "4", "5", "6");
List<String> newList = list.stream().filter(x -> x >= 2).collect(Collectors.toList()); 
// 将大于等于2的数重新收集成一个集合,filter中的 x -> x >= 2就是Predicate接口

Stream表达式

Stream,就是JDK8又依托于函数式编程特性为集合类库做的一个类库,它其实就是jdk提供的函数式接口的最佳实践。它能让我们通过lambda表达式更简明扼要的以流水线的方式去处理集合内的数据,可以很轻松的完成诸如:过滤、分组、收集、归约这类操作。其中Stream的操作大致分为两类

  • 中间型操作
  • 终结型操作

其中转换型操作又分为有状态和无状态两类。有状态是本次的结果需要依赖于前面的处理结果,而无状态则是不依赖。简单来讲就是无状态方法可以互相调换位置,而有状态方法不能调换位置。中间型操作

中间型操作就是返回值依旧是stream类型的方法。api如下:

终结型操作

终结型操作与中间型相反,返回值是非Stream类型的。api如下:

探究lambda运行的底层原理

源码分析

接下来通过一个例子Debug来探究下lambda运行的底层原理,实验代码如下:


Set<Integer> collect = list.stream()
                .filter(e -> e > 2)
                .sorted()
                .map(e -> e * 2)
                .collect(Collectors.toSet());
上诉例子可拆解成下面5部分:
  Stream<Integer> stream = list.stream();
  Stream<Integer> filterStream = stream.filter(e -> e > 2);
  Stream<Integer> sortedStream = filterStream.sorted();
  Stream<Integer> mapStream = sortedStream.map(e -> e * 2);
  Set<Integer> integers = mapStream.collect(Collectors.toSet());
  • list.stream()

@Override
    default Spliterator<E> spliterator() {
        return Spliterators.spliterator(this, 0);
    }

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}>

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

list.stream()最终调用了ReferencePipeline.Head<>,返回一个Head对象。Head是ReferencePipeline的内部类。官方注释说此类是ReferencePipeline的源阶段。也是stream调用的起始阶段。

运行完这一方法返回ReferencePipeline.Head对象,对象的所有元素保存在sourceSpliterator中

  • stream.filter(e -> e > 2)

filter的方法原型如下:


public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

StatelessOp是stream 无状态的基类,与之相对的是StatefulOp,stream有状态的基类。元素原型如下E_IN是上游元素的类型,E_OUT是当前阶段返回的类型。

abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

abstract static class StatefulOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
                   StreamShape inputShape,
                   int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return true;
        }
}

需要注意的是 filter等方法的构造方法:

new StatelessOp<P_OUT,

P_OUT>(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED)

会将this传入。StatulessOp的构造方法,会一直super到AbstractPipeline方法。注意到AbstractPipeline类的构造方法中打注释的地方。


AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);

        previousStage.linkedOrConsumed = true; // 
        previousStage.nextStage = this; // 注意打注释的语句
        this.previousStage = previousStage; //

        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }

简化就是双向链表加入节点的操作。


p.next = this;
this.pre = p;

运行完返回如下:注意看上一步对象的地址保存在当前对象的perviousStage中,而且当前对象增加predicate对象

如图所示:

  • filterStream.sorted()

@Override
    public final Stream<P_OUT> sorted() {
        return SortedOps.makeRef(this);
    }

OfRef(AbstractPipeline<?, T, ?> upstream) {
    super(upstream, StreamShape.REFERENCE,
        StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    this.isNaturalSort = true;
    // Will throw CCE when we try to sort if T is not Comparable
    @SuppressWarnings("unchecked")
        Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
    this.comparator = comp;
}

和filter操作一样,将Sorted节点加入链表中同时设置标志位:

StreamOpFlag.IS_ORDERED|StreamOpFlag.IS_SORTED

运行完这一步结果如图

  • sortedStream.map(e -> e * 2)

map()方法跟filter()方法的执行逻辑很像,分析方法跟分析filter()方法一样


public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

不过与filter不同时的参数中增加了mapper参数,类型为function

执行完如图所示:

  • mapStream.collect(Collectors.toSet());

collect方法原型和Collectors.toSet()方法原型如下:


public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
        && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
        && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
        ? (R) container
        : collector.finisher().apply(container);
}
new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
                    (left, right) -> {
                        if (left.size() < right.size()) {
                            right.addAll(left); return right;
                        } else {
                            left.addAll(right); return left;
                        }
                    },
                    CH_UNORDERED_ID)

在collect方法中会判断是否为并行流,不是的话会执行evaluate(ReduceOps.makeRef(collector)); ReduceOps.makeRef(collector)会返回类型为TerminalOp的参数,在evaluate方法中会将链表的节点都包装为Sink。

 public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? super T, I, ?> collector) {
    Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
    BiConsumer<I, ? super T> accumulator = collector.accumulator();
    BinaryOperator<I> combiner = collector.combiner();
    class ReducingSink extends Box<I>
        implements AccumulatingSink<T, I, ReducingSink> {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
    return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                ? StreamOpFlag.NOT_ORDERED
                : 0;
        }
    };
}

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

执行完如图:

关键在于上面的copyInfo方法,此方法是stream的启动方法。遍历元素调用第一节点的逻辑(filter)。然后在end方法中调用第二个节点的begin方法,begin方法又会调用第二个节点的逻辑,之后和第一个节点一样,调用end方法,触发第三个节点的begin方法..... 最后调用到最后一个节点将处理好的元素收集起来。

下面是最后一步map的节点的栈帧和运行数据和对应的方法。这一步结束后会将此次运行的stream元素都add到hashSet中。

downstream.accept - > Set::add;

mapper.apply -> e -> e * 2;

最后调用:

(Supplier)(helper.wrapAndCopyInto(makeSink(), spliterator)).get()方法将保存元素的容器获取出来。

并发流源码分析

修改代码增加并发流:


public static void main(String[] args) {
    List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

    //        Set<Integer> collect = list.stream()
    //                .filter(e -> e > 2)
    //                .filter(e -> e < 8)
    //                .sorted()
    //                .map(e -> e * 2)
    //                .peek(System.out::println)
    //                .collect(Collectors.toSet());
    Stream<Integer> stream = list.stream().parallel(); // list.parallelStream()
    Stream<Integer> filterStream = stream.filter(e -> e > 2);
    Stream<Integer> sortedStream = filterStream.sorted();
    Stream<Integer> mapStream = sortedStream.map(e -> e * 2);
    Set<Integer> integers = mapStream.collect(Collectors.toSet());
    System.out.println(integers);
}

根据非并发流的分析直接来到最后一步collect。分歧在evaluate方法中,之前调用terminalOp.evaluateSequential,并发流则会调用terminalOp.evaluateParallel。


public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<P_IN> spliterator) {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

在evaluateParallel返回会执行ReduceTask累的构造方法,查看ReduceTask类发现继承AbstractTask类


private static final class ReduceTask<P_IN, P_OUT, R,
    S extends AccumulatingSink<P_OUT, R, S>>
    extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
        private final ReduceOp<P_OUT, R, S> op;

        ReduceTask(ReduceOp<P_OUT, R, S> op,
                   PipelineHelper<P_OUT> helper,
                   Spliterator<P_IN> spliterator) {
            super(helper, spliterator);
            this.op = op;
        }

        ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
                   Spliterator<P_IN> spliterator) {
            super(parent, spliterator);
            this.op = parent.op;
        }

        @Override
        protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, spliterator);
        }

        @Override
        protected S doLeaf() {
            return helper.wrapAndCopyInto(op.makeSink(), spliterator);
        }

        @Override
        public void onCompletion(CountedCompleter<?> caller) {
            if (!isLeaf()) {
                S leftResult = leftChild.getLocalResult();
                leftResult.combine(rightChild.getLocalResult());
                setLocalResult(leftResult);
            }
            // GC spliterator, left and right child
            super.onCompletion(caller);
        }
    }

继续往上查看


//AbstractTask extends CountedCompleter
abstract class AbstractTask<P_IN, P_OUT, R,
    K extends AbstractTask<P_IN, P_OUT, R, K>>
    extends CountedCompleter<R> {
    }
//AbstractTask extends ForkJoinTask
public abstract class CountedCompleter<T> extends ForkJoinTask<T> {}

通过idea工具可以更直观的查看继承关系,ReduceTask最终继承ForkJoinTask。ForkJoinTask与ForkJoinPool线程有关系。

程序继续运行会调用new ReduceTask<>(this, helper, spliterator).invoke(),invoke方法ForkJoinTask的启动方法。


public final V invoke() {
    int s;
    if (((s = doInvoke()) & ABNORMAL) != 0)
        reportException(s);
    return getRawResult();
}

private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}


final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            completed = false;
            s = setExceptionalCompletion(rex);
        }
        if (completed)
            s = setDone();
        }
    return s;
}

protected final boolean exec() {
    compute();
    return false;
}

最后跟着调用链回来到AbstractTask类中的compute方法

public void compute() {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}

在调用taskToFork.fork()前查看下当前变量表:

taskToFork的具体内容如下:op属性是Collectors.toSet(),而之前对元素的处理方法都在helper字段中

执行fork会把所有当前的task(this)放在ForkJoinPool这个线程池中。


public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

执行完查看Debug线程堆栈信息,看到除了main线程在运行外,同时还多了ForkJoinPool这个线程组

同时这个又会运行到之前调用过的doExec()方法中,以此形成将大任务分解成小任务的循环。

最后主线程再执行task.doLeaf()运行到定义好的方法lambda方法中,并将此结果收集到一起

最后简单介绍下ForkJoinPool的运行原理:

ForkJoinPool核心思想:分治

总结一下并行流的和串行流的区别:串行流是一个一个处理的,而并行流是把元素先分成n部分,然后给将这n部分放到一个线程池中执行,等各个线程把这n部分执行完毕后在将结果汇总起来在输出最终结果。并行流可以极大的加速stream的处理速度,不过需要注意的是,程序中的是各个并行流公用一个线程池。

JVM分析

先写一段简单的包含lambda的代码,编译后查看编译文件和字节码。


public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5)
                .filter(x -> x > 2)
                .forEach(System.out::println);
    }

main方法字节码如下:

0 iconst_5
 1 anewarray #2 <java/lang/Integer>
 4 dup
 5 iconst_0
 6 iconst_1
 7 invokestatic #3 <java/lang/Integer.valueOf : (I)Ljava/lang/Integer;>
10 aastore
11 dup
12 iconst_1
13 iconst_2
14 invokestatic #3 <java/lang/Integer.valueOf : (I)Ljava/lang/Integer;>
17 aastore
18 dup
19 iconst_2
20 iconst_3
21 invokestatic #3 <java/lang/Integer.valueOf : (I)Ljava/lang/Integer;>
24 aastore
25 dup
26 iconst_3
27 iconst_4
28 invokestatic #3 <java/lang/Integer.valueOf : (I)Ljava/lang/Integer;>
31 aastore
32 dup
33 iconst_4
34 iconst_5
35 invokestatic #3 <java/lang/Integer.valueOf : (I)Ljava/lang/Integer;>
38 aastore
39 invokestatic #4 <java/util/stream/Stream.of : ([Ljava/lang/Object;)Ljava/util/stream/Stream;>
42 invokedynamic #5 <test, BootstrapMethods #0>
47 invokeinterface #6 <java/util/stream/Stream.filter : (Ljava/util/function/Predicate;)Ljava/util/stream/Stream;> count 2
52 getstatic #7 <java/lang/System.out : Ljava/io/PrintStream;>
55 dup
56 invokestatic #8 <java/util/Objects.requireNonNull : (Ljava/lang/Object;)Ljava/lang/Object;>
59 pop
60 invokedynamic #9 <accept, BootstrapMethods #1>
65 invokeinterface #10 <java/util/stream/Stream.forEach : (Ljava/util/function/Consumer;)V> count 2
70 return

同时在生成的字节码中有一个名为lambda$main$0的方法,字节码如下:


0 aload_0
 1 invokevirtual #11 <java/lang/Integer.intValue : ()I>
 4 iconst_2
 5 if_icmple 12 (+7)
 8 iconst_1
 9 goto 13 (+4)
12 iconst_0
13 ireturn

先看main方法的字节码的第29行与第35行分别是invokedynamic #5 <test, BootstrapMethods #0> 和 invokedynamic #9 <accept, BootstrapMethods #1>

这两个字节码分别对应class文件的BootstrapMethods中。查看编译出来class文件的BootstrapMethods,有几个关键的地方,第一个是innerClasser。第二个是在BootstrapMethods出现:

java/lang/invoke/LambdaMetafactory.metafactory

以及对应的argument分别是#39, #40 和#41。通过描述符可知#39的入参为Object,返回为boolean,#40的入参为MethodHandle 具体的类型为com.yousheng.lambda.test.LambdaTest.lambda$main$0(Integer)boolean,#41的入参为Integer,返回为boolean

debug断点调试,会运行到BootstrapMethodInvoker的127,会执行MethodHandler的invokeExact方法,此方法是native方法。

最后通过jvm的解析转发调用会来到LambdaMetafactory的metafactory方法中

这方法的最后3个入参类型就是从class文件中看到那三个入参的类型。

同时jvm也通过调用者和方法名称以及方法描述符找到了最后需要调用的方法。

查看ImplMethod参数

member对象为就是#40具体含义为:com.yousheng.lambda.test.LambdaTest.lambda$main$0(Integer)boolean/invokeStatic。

  • com.yousheng.lambda.test.LambdaTest -- > 类名
  • lambda$main$0 -- >类中的方法名称
  • (Integer)boolean -- > 方法的描述符, (括号内的为入参类型,返回值为boolean)
  • invokeStatic --> 调用字节码。在jvm中有5中invoke字节码指令,分别为

继续查看栈帧发现此方法是由Jvm调用而来,metafactory的上一个方法是invokeStatic当时行号是-1所以说明是jvm内部方法

可以理一下整个流程。

首先jvm启动,运行方法, 发现字节码是中存在invokeDynamic,通过invokedynamic字节码对应的BootstrapMethods调用MethodHandles.lookup方法寻找调用类中与当前lambda对应的静态内部类方法,最后生成CallSite 调用点,最后调用真正的lambda方法。

在jvm启动的时候增加参数-Djdk.internal.lambda.dumpProxyClasses=可将生成出来的静态内部dump到指定的目录下

通过javap -v -p [生成的文件] 可查看相应字节码,下图就是生成的两个静态内部类

filter(x -> x > 2) 中的 x-> x > 2的字节码:

forEach(System.out::println) 中的 System.out::println 字节码:

最后因为System.out::println属于类 :: 静态方法的形式,所以在生成的字节码中存在“适配器”,即先将System.out通过静态方法赋值给对应的静态内部类,在通过调用lambda方法使用。

而正常的方法会直接调用Lambda中的lambda$main$0方法。

Lambda的序列化原理

lambda的本质在上面的探究中我们也能看到是静态内部类。序列化lambda跟序列化其他对象一样必须要实现Serializable接口, 为什么必须实现Serializable才能进行序列化呢,可以从源码中找到答案,ObjectOutputStream中的1178行。程序判断当前对象obj是否为Serializable子类,如果是的话进行序列化,否则抛出异常。注:如果需要实例化对象,那么这个对象里面的所有属性必须都是可实例化(即所有的属性包括自身都必须实现Serializable接口)。

知道序列化原理后,可以使用下列代码进行测试,并将运行时的class文件dump到本地。

 Function<Child, String> function = (Function<Child, String> & Serializable) Child::getName;

利用查看dump下的class文件,发现类实现Serializable接口,在类中又增加了writeReplace方法。且方法返回值为SerializedLambda。


final class LambdaTest$$Lambda$15 implements Function, Serializable {
    private LambdaTest$$Lambda$15() {
    }

    @Hidden
    public Object apply(Object var1) {
        return ((Child)var1).getName();
    }

    private final Object writeReplace() {
        return new SerializedLambda(LambdaTest.class, "java/util/function/Function", "apply", "(Ljava/lang/Object;)Ljava/lang/Object;", 5, "com/yousheng/lambda/entity/Child", "getName", "()Ljava/lang/String;", "(Lcom/yousheng/lambda/entity/Child;)Ljava/lang/String;", new Object[0]);
    }
}

先看一下writeReplace方法是什么,熟悉序列化的人知道如果类在进行序列化的时候会先查询类中是否有writeReplace方法。这一点同样可以在ObjectOutputStream类中1126行找到对应的处理逻辑。调用writeReplace方法返回的对象会替换原有的obj对象。

就是说实现了Serializable接口的lambda对象最后会被实例化成SerializedLambda类型,从SerializedLambda类上面的注释中可以看出来,类中的readResolve方法会去"capturing class"中寻找$deserializeLambda$(SerializedLambda)方法并会将"this"对象当做第一个参数传入。实现$deserializeLambda$的 Lambda 类负责验证SerializedLambda的属性是否与该类实际捕获的 lambda 一致。关键字:验证


/**
 * Serialized form of a lambda expression.  The properties of this class
 * represent the information that is present at the lambda factory site, including
 * static metafactory arguments such as the identity of the primary functional
 * interface method and the identity of the implementation method, as well as
 * dynamic metafactory arguments such as values captured from the lexical scope
 * at the time of lambda capture.
 *
 * <p>Implementors of serializable lambdas, such as compilers or language
 * runtime libraries, are expected to ensure that instances deserialize properly.
 * One means to do so is to ensure that the {@code writeReplace} method returns
 * an instance of {@code SerializedLambda}, rather than allowing default
 * serialization to proceed.
 *
 * <p>{@code SerializedLambda} has a {@code readResolve} method that looks for
 * a (possibly private) static method called
 * {@code $deserializeLambda$(SerializedLambda)} in the capturing class, invokes
 * that with itself as the first argument, and returns the result.  Lambda classes
 * implementing {@code $deserializeLambda$} are responsible for validating
 * that the properties of the {@code SerializedLambda} are consistent with a
 * lambda actually captured by that class.
 *
 * <p>The identity of a function object produced by deserializing the serialized
 * form is unpredictable, and therefore identity-sensitive operations (such as
 * reference equality, object locking, and {@code System.identityHashCode()} may
 * produce different results in different implementations, or even upon
 * different deserializations in the same implementation.
 *
 * @see LambdaMetafactory
 * @since 1.8
 */

通过查看字节码发现原方法中增加$deserializeLambda$方法,字节码如下,注意是字节码第50行出现了调用lambda的invokedynamic字节码,而50行之前的字节码通过不断调用invokevirtual获取SerializedLambda的各种属性,并使用equals方法对获取到的属性做校验。根据SerializedLambda类注释的关键字在结合字节码可知$deserializeLambda$主要做校验使用。

 0 aload_0
  1 invokevirtual #28 <java/lang/invoke/SerializedLambda.getImplMethodName : ()Ljava/lang/String;>
  4 astore_1
  5 iconst_m1
  6 istore_2
  7 aload_1
  8 invokevirtual #29 <java/lang/String.hashCode : ()I>
 11 lookupswitch 1
  -75308287:  28 (+17)
  default:  39 (+28)
 28 aload_1
 29 ldc #14 <getName>
 31 invokevirtual #30 <java/lang/String.equals : (Ljava/lang/Object;)Z>
 34 ifeq 39 (+5)
 37 iconst_0
 38 istore_2
 39 iload_2
 40 lookupswitch 1
  0:  60 (+20)
  default:  134 (+94)
 60 aload_0
 61 invokevirtual #31 <java/lang/invoke/SerializedLambda.getImplMethodKind : ()I>
 64 iconst_5
 65 if_icmpne 134 (+69)
 68 aload_0
 69 invokevirtual #32 <java/lang/invoke/SerializedLambda.getFunctionalInterfaceClass : ()Ljava/lang/String;>
 72 ldc #33 <com/yousheng/lambda/test/Func>
 74 invokevirtual #34 <java/lang/Object.equals : (Ljava/lang/Object;)Z>
 77 ifeq 134 (+57)
 80 aload_0
 81 invokevirtual #35 <java/lang/invoke/SerializedLambda.getFunctionalInterfaceMethodName : ()Ljava/lang/String;>
 84 ldc #11 <apply>
 86 invokevirtual #34 <java/lang/Object.equals : (Ljava/lang/Object;)Z>
 89 ifeq 134 (+45)
 92 aload_0
 93 invokevirtual #36 <java/lang/invoke/SerializedLambda.getFunctionalInterfaceMethodSignature : ()Ljava/lang/String;>
 96 ldc #12 <(Ljava/lang/Object;)Ljava/lang/Object;>
 98 invokevirtual #34 <java/lang/Object.equals : (Ljava/lang/Object;)Z>
101 ifeq 134 (+33)
104 aload_0
105 invokevirtual #37 <java/lang/invoke/SerializedLambda.getImplClass : ()Ljava/lang/String;>
108 ldc #13 <com/yousheng/lambda/entity/Child>
110 invokevirtual #34 <java/lang/Object.equals : (Ljava/lang/Object;)Z>
113 ifeq 134 (+21)
116 aload_0
117 invokevirtual #38 <java/lang/invoke/SerializedLambda.getImplMethodSignature : ()Ljava/lang/String;>
120 ldc #15 <()Ljava/lang/String;>
122 invokevirtual #34 <java/lang/Object.equals : (Ljava/lang/Object;)Z>
125 ifeq 134 (+9)
128 invokedynamic #2 <apply, BootstrapMethods #0>   // 注意
133 areturn
134 new #39 <java/lang/IllegalArgumentException>
137 dup
138 ldc #40 <Invalid lambda deserialization>
140 invokespecial #41 <java/lang/IllegalArgumentException.<init> : (Ljava/lang/String;)V>
143 athrow

再看一下SerializedLambda中的readResolve方法,通过capturingClass获取$deserializeLambda$方法,最后在进行调用。


private Object readResolve() throws ReflectiveOperationException {
    try {
    Method deserialize = AccessController.doPrivileged(new PrivilegedExceptionAction<>() {
        @Override
        public Method run() throws Exception {
            Method m = capturingClass.getDeclaredMethod("$deserializeLambda$", SerializedLambda.class);
            m.setAccessible(true);
            return m;
        }
    });

    return deserialize.invoke(null, this);
}
catch (PrivilegedActionException e) {
    Exception cause = e.getException();
    if (cause instanceof ReflectiveOperationException)
        throw (ReflectiveOperationException) cause;
    else if (cause instanceof RuntimeException)
        throw (RuntimeException) cause;
    else
        throw new RuntimeException("Exception in SerializedLambda.readResolve", e);
}
}

最后在整理下整个lambda序列化流程,首先是对应的lambda表达式必须实现Serializable接口,在实现Serializable接口后,jvm运行时候会在lambda生成的静态内部类中增加writeReplace方法,并在调用的类中增加$deserializeLambda$方法校验使用,在序列化过程中ObjectOutputStream会调用writeReplace方法,将整个lambda表达式转换成SerializedLambda,最后将SerializedLambda类序列化保存。

最后明白了lambda的序列化过程后可以用一个例子模拟lambda的反序列化的过程,首先序列化的对象并非是原来的lambda表达式,而是SerializedLambda对象,通过调用$deserializeLambda$方法生成校验SerializedLambda方队,校验通过则调用对应的Booststrap方法进行一系列转化继续来到LambdaMetaFactory的altMetafactory方法并最终在此方法生成最后的调用CallSite


class LambdaSerialized {
    void serializable() {
        Function<Child, String> function = (Function<Child, String> & Serializable) (Child child) -> {
            System.out.println("test");
            return child.getName();
        };
        System.out.println(Arrays.toString(function.getClass().getDeclaredMethods()));
    }

    public static void main(String[] args) throws ClassNotFoundException, InvocationTargetException, NoSuchMethodException, IllegalAccessException {
        Child child = new Child("yousheng", 18);
         // 下方输出结果 yousheng 
        System.out.println(new LambdaSerialized().<Function<Child, String>>test().apply(child));

    }
    static <T> T test() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        SerializedLambda serializedLambda = new SerializedLambda(LambdaSerialized.class, "java/util/function/Function", "apply", "(Ljava/lang/Object;)Ljava/lang/Object;", 5, "com/yousheng/lambda/entity/Child", "getName", "()Ljava/lang/String;", "(Lcom/yousheng/lambda/entity/Child;)Ljava/lang/String;", new Object[0]);
        Method m = Class.forName(serializedLambda.getCapturingClass().replace('/', '.')).getDeclaredMethod("$deserializeLambda$", SerializedLambda.class);
        m.setAccessible(true);
        return (T)m.invoke(null, serializedLambda);
    }
}
// Child类定义如下
class Child {
    private String name;
    private int age;
}

查看栈帧方法由$deserializeLambda$调用至 ,

最后调用到事先定义好的方法。

图计算及其应用

近年来,基于图数据的计算(图计算)得到了学术界和工业界越来越多的关注。本专场围绕图计算系统、应用及前沿学术研究问题,首先介绍阿里巴巴开源的一站式图计算系统 GraphScope的设计思想、基础能力以及未来发展方向;另外,邀请来自学术界和工业界的大咖,分享图计算最前沿的学术和技术热点;同时,邀请在业务中应用图计算技术的客户,分享图计算在真实业务场景中的应用案例。

本文由哈喽比特于1年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/ud5TckFLXWrVpilmobynhQ

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:8月以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:8月以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:8月以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:8月以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:8月以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:8月以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:8月以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:8月以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:8月以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:8月以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:8月以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:8月以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:8月以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:8月以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:8月以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:8月以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:8月以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:8月以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:8月以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:8月以前  |  398次阅读  |  详细内容 »
 相关文章
Java 中验证时间格式的 4 种方法 1年以前  |  3361次阅读
Java经典面试题答案解析(1-80题) 4年以前  |  2687次阅读
IDEA依赖冲突分析神器—Maven Helper 4年以前  |  2467次阅读
CentOS 配置java应用开机自动启动 3年以前  |  2465次阅读
SpringBoot 控制并发登录的人数教程 4年以前  |  2176次阅读
 目录