PROGRAMMAZIONE AVANZATA JAVA E C Massimiliano Redolfi Lezione 9: multithreads, continue… Stream in Java 8 Functional Programming https://docs.oracle.com/javase/8/docs/api/java/util/ stream/package-summary.html import java.util.stream; Stream in Java 8 Functional Programming Da NON confodere con InputStream e simili!!! Stream Stream Stream gli stream sono monads: strutture che definiscono unità computazionali composte da un insieme di step Stream: operation pipepline List<String> myList = Arrays.asList("a1", "a2", "b1", "c1", "c2"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); Le operazioni sugli stream possono essere intermedie o terminali: - intermedie: restituiscono uno stream come risultato - terminali non restituiscono nulla (es. forEach) Stream: operation pipepline Le operazioni sugli stream possono essere intermedie o terminali: - intermedie: restituiscono uno stream come risultato - terminali non restituiscono nulla (es. forEach) Le operazioni sono non-interfering se non alterano lo stream sottostante (modificandone i dati o aggiungendo/togliendo elementi) Le operazioni sono stateless, dipendono solo dai valori specificati dai rispettivi parametri al momento della valutazione delle stesse, non dal contesto myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); Stream: possono essere creati in vari modi… Stream.of("alfa", "beta", "gamma") .findFirst() .ifPresent(System.out::println); IntStream.range(1, 10) .filter(v -> v%2 == 0) .forEach(System.out::println); Gli stream possono essere generici (Stream) oppure legati a tipi primitivi (IntStream, LongStream, DoubleStream) Arrays.stream(new int[] {1, 2, 3}) .map(n -> n * n) .average() .ifPresent(System.out::println); I primitive streams supportano alcuni operatori specifici (average, sum, …) Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); Inoltre utilizzano lambda exp. specializzate (IntFunction/ Function, …) IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); ordine degli operatori Stream: ordine di elaborazione Stream.of("alfa", "beta", "gamma", "delta") .filter(s -> { System.out.println("Filtro: " + s); return true; }); se non c’è un’operazione terminale gli operatori intermedi non vengono elaborati… lazines Stream.of("alfa", "beta", "gamma", "delta") .filter(s -> { System.out.println("Filtro: " + s); return true; }) .forEach(s -> System.out.println("For: " + s)); Filtro: alfa For: alfa Filtro: beta For: beta Filtro: gamma For: gamma Filtro: delta For: delta elaborazione ‘verticale’ degli operatori Stream: ordine di elaborazione Stream.of("alfa", "beta", "gamma", "delta") .map(s -> { System.out.println("Map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("Match: " + s); return s.startsWith("B"); }); Si elabora solo ciò che serve… Map: alfa Match: ALFA Map: beta Match: BETA Stream: ordine di elaborazione Stream.of("alfa", "beta", "gamma", "delta") .map(s -> { System.out.println("Map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("Filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); Stream.of("alfa", "beta", "gamma", “delta") .filter(s -> { System.out.println("Filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("Map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); Map: alfa Filter: ALFA forEach: ALFA Map: beta Filter: BETA Map: gamma Filter: GAMMA Map: delta Filter: DELTA Filter: alfa Map: alfa forEach: ALFA Filter: beta Filter: gamma Filter: delta l’ordine conta! Stream: ordine di elaborazione Stream.of("gamma", "alfa", "beta", "epsilon", "delta") .sorted((s1, s2) -> { System.out.printf("Sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("Filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("Map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("ForEach: " + s)); Sort: alfa; gamma Sort: beta; alfa Sort: beta; gamma Sort: beta; alfa Sort: epsilon; beta Sort: epsilon; gamma Sort: delta; epsilon Sort: delta; beta Filter: alfa Map: alfa ForEach: ALFA Filter: beta Filter: delta Filter: epsilon Filter: gamma sorted: operatore intermedio stateful, conserva lo stato durante l’esecuzione… Nota: viene elaborato in ‘orizzontale’ per tutti gli elementi della sequenza. Stream: ordine di elaborazione Stream.of("gamma", "alfa", "beta", "epsilon", “delta") .filter(s -> { System.out.println("Filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("Sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("Map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("ForEach: " + s)); Filter: gamma Filter: alfa Filter: beta Filter: epsilon Filter: delta Map: alfa ForEach: ALFA l’ordine conta! Riutilizzo degli Stream Stream: riutilizzo & Supplier IntStream.iterate(2, f -> 2 * f) .anyMatch(f -> { System.out.println("Match: " + f); return f > 15; }); IntStream s = IntStream.iterate(2, f -> 2 * f); s.anyMatch(f -> { System.out.println("Match: " + f); return f > 15; }); s.anyMatch(f -> { System.out.println("Match: " + f); return f > 32; }); Match: Match: Match: Match: 2 4 8 16 Match: 2 Match: 4 Match: 8 Match: 16 Exception in thread "main" java.lang.IllegalStateExc eption: stream has already been operated upon or closed at java.util.stream.Abstract Pipeline.evaluate(Abstrac tPipeline.java:229) at java.util.stream.IntPipel ine.anyMatch(IntPipeline. java:477) at jExample_Threads.Main.mai n(Main.java:17) Stream: riutilizzo & Supplier Supplier<IntStream> supplier = () -> IntStream.iterate(2, f -> 2 * f); supplier.get().anyMatch(f -> { System.out.println("Match: " + f); return f > 15; }); supplier.get().anyMatch(f -> { System.out.println("Match: " + f); return f > 32; }); Match: Match: Match: Match: Match: Match: Match: Match: Match: Match: 2 4 8 16 2 4 8 16 32 64 public interface Supplier<T> { T get(); } Sulla serie di fibonacci… Fibonacci jEx004_Fibonacci # ciclico # ricorsivo # traite iteratori oppure tramite Stream! LongStream fibs = Stream .iterate(new long[]{1, 1}, f -> new long[] {f[1], f[0] + f[1]}) .mapToLong(f -> f[0]); fibs.limit(10).forEach(System.out::println); 1 1 2 3 5 8 13 21 34 55 Stream & Thread invokeAll ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<String>> callables = Arrays.asList( () -> "alfa", () -> "beta", () -> "gamma"); executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch(Exception e) { ; } return null; }) .forEach(System.out::println); invokeAll: prende una collection di Callable e restituisce una lista di Future alfa beta gamma invokeAny ForkJoinPool ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<String>> callables = Arrays.asList( () -> "alfa", () -> "beta", () -> "gamma"); System.out.println(executor.invokeAny(callables)); viene restituito il valore del primo dei Callable che termina invokeAny: prende una collection di Callable e blocca l’esecuzione sino a che uno dei callable termina, il valore di tale elemento viene quindi restituito Scheduled Thread Pools Task con esecuzione ritardata ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("\nScheduling: " + System.nanoTime()); ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS); TimeUnit.MILLISECONDS.sleep(1400); long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS); System.out.printf("Remaining Delay: %sms", remainingDelay); Remaining Delay: 1595ms Scheduling: 365475420654374 Task periodici: fixedRate ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + new Date()); int initialDelay = 0; int period = 1; executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS); Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: … Attenzione: che succede se il task dura 3 secondi? Thu Thu Thu Thu Thu Thu Thu Thu Thu Nov Nov Nov Nov Nov Nov Nov Nov Nov 12 12 12 12 12 12 12 12 12 23:42:48 23:42:49 23:42:50 23:42:51 23:42:52 23:42:53 23:42:54 23:42:55 23:42:56 CET CET CET CET CET CET CET CET CET 2015 2015 2015 2015 2015 2015 2015 2015 2015 ! Task periodici: fixedRate ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> {System.out.println("Scheduling: " + new Date()); try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { ; } }; int initialDelay = 0; int period = 1; executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS); Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: … Thu Thu Thu Thu Thu Thu Thu Nov Nov Nov Nov Nov Nov Nov 12 12 12 12 12 12 12 23:51:58 23:52:01 23:52:04 23:52:07 23:52:10 23:52:13 23:52:16 CET CET CET CET CET CET CET 2015 2015 2015 2015 2015 2015 2015 Task periodici: fixedDelay ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> {System.out.println("Scheduling: " + new Date()); try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { ; } }; int initialDelay = 0; int period = 1; executor.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.SECONDS); Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: Scheduling: … Thu Thu Thu Thu Thu Thu Thu Nov Nov Nov Nov Nov Nov Nov 12 12 12 12 12 12 12 23:55:40 23:55:44 23:55:48 23:55:52 23:55:56 23:56:00 23:56:04 CET CET CET CET CET CET CET 2015 2015 2015 2015 2015 2015 2015 Concorrenza Concorrenza public class Main { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newFixedThreadPool(2); Counter counter = new Counter(); IntStream.range(0, 10000) .forEach(i -> exec.submit(counter::inc)); exec.shutdown(); exec.awaitTermination(60, TimeUnit.SECONDS); System.out.println(counter.count); } } class Counter { int count = 0; void inc() { count++; } } ci aspetteremmo 10000, giusto? " Concorrenza public class Main { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newFixedThreadPool(2); Counter counter = new Counter(); IntStream.range(0, 10000) .forEach(i -> exec.submit(counter::inc)); exec.shutdown(); exec.awaitTermination(60, TimeUnit.SECONDS); System.out.println(counter.count); } } class Counter { int count = 0; synchronized void inc() { count++; } } 10000 ok! bisogna sincronizzare!