PROGRAMMAZIONE AVANZATA JAVA E C

PROGRAMMAZIONE
AVANZATA JAVA E C
Massimiliano Redolfi
Lezione 9: multithreads, continue…
Stream in Java 8
Functional Programming
https://docs.oracle.com/javase/8/docs/api/java/util/
stream/package-summary.html
import java.util.stream;
Stream in Java 8
Functional Programming
Da NON confodere con InputStream e simili!!!
Stream
Stream
Stream
gli stream sono monads: strutture che definiscono unità computazionali composte
da un insieme di step
Stream: operation pipepline
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c1", "c2");
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
Le operazioni sugli stream possono essere intermedie o terminali:
- intermedie: restituiscono uno stream come risultato
- terminali non restituiscono nulla (es. forEach)
Stream: operation pipepline
Le operazioni sugli stream possono essere
intermedie o terminali:
- intermedie: restituiscono uno stream come
risultato
- terminali non restituiscono nulla (es. forEach)
Le operazioni sono non-interfering se non
alterano lo stream sottostante (modificandone i
dati o aggiungendo/togliendo elementi)
Le operazioni sono stateless, dipendono solo dai
valori specificati dai rispettivi parametri al
momento della valutazione delle stesse, non dal
contesto
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
Stream: possono essere creati in vari modi…
Stream.of("alfa", "beta", "gamma")
.findFirst()
.ifPresent(System.out::println);
IntStream.range(1, 10)
.filter(v -> v%2 == 0)
.forEach(System.out::println);
Gli stream possono essere
generici (Stream) oppure legati a
tipi primitivi (IntStream,
LongStream, DoubleStream)
Arrays.stream(new int[] {1, 2, 3})
.map(n -> n * n)
.average()
.ifPresent(System.out::println);
I primitive streams supportano
alcuni operatori specifici
(average, sum, …)
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println);
Inoltre utilizzano lambda exp.
specializzate (IntFunction/
Function, …)
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
ordine degli operatori
Stream: ordine di elaborazione
Stream.of("alfa", "beta", "gamma", "delta")
.filter(s -> {
System.out.println("Filtro: " + s);
return true;
});
se non c’è un’operazione terminale gli
operatori intermedi non vengono elaborati…
lazines
Stream.of("alfa", "beta", "gamma", "delta")
.filter(s -> {
System.out.println("Filtro: " + s);
return true;
})
.forEach(s -> System.out.println("For: " + s));
Filtro: alfa
For: alfa
Filtro: beta
For: beta
Filtro: gamma
For: gamma
Filtro: delta
For: delta
elaborazione ‘verticale’ degli operatori
Stream: ordine di elaborazione
Stream.of("alfa", "beta", "gamma", "delta")
.map(s -> {
System.out.println("Map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("Match: " + s);
return s.startsWith("B");
});
Si elabora solo ciò che serve…
Map: alfa
Match: ALFA
Map: beta
Match: BETA
Stream: ordine di elaborazione
Stream.of("alfa", "beta", "gamma", "delta")
.map(s -> {
System.out.println("Map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("Filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
Stream.of("alfa", "beta", "gamma", “delta")
.filter(s -> {
System.out.println("Filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("Map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
Map: alfa
Filter: ALFA
forEach: ALFA
Map: beta
Filter: BETA
Map: gamma
Filter: GAMMA
Map: delta
Filter: DELTA
Filter: alfa
Map: alfa
forEach: ALFA
Filter: beta
Filter: gamma
Filter: delta
l’ordine conta!
Stream: ordine di elaborazione
Stream.of("gamma", "alfa", "beta", "epsilon", "delta")
.sorted((s1, s2) -> {
System.out.printf("Sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("Filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("Map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("ForEach: " + s));
Sort: alfa; gamma
Sort: beta; alfa
Sort: beta; gamma
Sort: beta; alfa
Sort: epsilon; beta
Sort: epsilon; gamma
Sort: delta; epsilon
Sort: delta; beta
Filter: alfa
Map: alfa
ForEach: ALFA
Filter: beta
Filter: delta
Filter: epsilon
Filter: gamma
sorted: operatore intermedio stateful, conserva lo stato durante l’esecuzione…
Nota: viene elaborato in ‘orizzontale’ per tutti gli elementi della sequenza.
Stream: ordine di elaborazione
Stream.of("gamma", "alfa", "beta", "epsilon", “delta")
.filter(s -> {
System.out.println("Filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("Sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("Map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("ForEach: " + s));
Filter: gamma
Filter: alfa
Filter: beta
Filter: epsilon
Filter: delta
Map: alfa
ForEach: ALFA
l’ordine conta!
Riutilizzo degli Stream
Stream: riutilizzo & Supplier
IntStream.iterate(2, f -> 2 * f)
.anyMatch(f -> {
System.out.println("Match: " + f);
return f > 15;
});
IntStream s = IntStream.iterate(2, f -> 2 * f);
s.anyMatch(f -> {
System.out.println("Match: " + f);
return f > 15;
});
s.anyMatch(f -> {
System.out.println("Match: " + f);
return f > 32;
});
Match:
Match:
Match:
Match:
2
4
8
16
Match: 2
Match: 4
Match: 8
Match: 16
Exception in thread
"main"
java.lang.IllegalStateExc
eption: stream has
already been operated
upon or closed
at
java.util.stream.Abstract
Pipeline.evaluate(Abstrac
tPipeline.java:229)
at
java.util.stream.IntPipel
ine.anyMatch(IntPipeline.
java:477)
at
jExample_Threads.Main.mai
n(Main.java:17)
Stream: riutilizzo & Supplier
Supplier<IntStream> supplier = () -> IntStream.iterate(2, f -> 2 * f);
supplier.get().anyMatch(f -> {
System.out.println("Match: " + f);
return f > 15;
});
supplier.get().anyMatch(f -> {
System.out.println("Match: " + f);
return f > 32;
});
Match:
Match:
Match:
Match:
Match:
Match:
Match:
Match:
Match:
Match:
2
4
8
16
2
4
8
16
32
64
public interface Supplier<T> {
T get();
}
Sulla serie di fibonacci…
Fibonacci
jEx004_Fibonacci
# ciclico
# ricorsivo
# traite iteratori
oppure tramite Stream!
LongStream fibs = Stream
.iterate(new long[]{1, 1},
f -> new long[] {f[1], f[0] + f[1]})
.mapToLong(f -> f[0]);
fibs.limit(10).forEach(System.out::println);
1
1
2
3
5
8
13
21
34
55
Stream & Thread
invokeAll
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "alfa",
() -> "beta",
() -> "gamma");
executor.invokeAll(callables)
.stream()
.map(future -> {
try { return future.get(); }
catch(Exception e) { ; }
return null;
})
.forEach(System.out::println);
invokeAll: prende una collection di Callable e restituisce una lista di
Future
alfa
beta
gamma
invokeAny
ForkJoinPool
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "alfa",
() -> "beta",
() -> "gamma");
System.out.println(executor.invokeAny(callables));
viene restituito il
valore del primo dei
Callable che termina
invokeAny: prende una collection di Callable e blocca l’esecuzione
sino a che uno dei callable termina, il valore di tale elemento viene
quindi restituito
Scheduled Thread Pools
Task con esecuzione ritardata
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("\nScheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1400);
long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
Remaining Delay: 1595ms
Scheduling: 365475420654374
Task periodici: fixedRate
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + new Date());
int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
…
Attenzione: che succede se il task dura 3 secondi?
Thu
Thu
Thu
Thu
Thu
Thu
Thu
Thu
Thu
Nov
Nov
Nov
Nov
Nov
Nov
Nov
Nov
Nov
12
12
12
12
12
12
12
12
12
23:42:48
23:42:49
23:42:50
23:42:51
23:42:52
23:42:53
23:42:54
23:42:55
23:42:56
CET
CET
CET
CET
CET
CET
CET
CET
CET
2015
2015
2015
2015
2015
2015
2015
2015
2015
!
Task periodici: fixedRate
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {System.out.println("Scheduling: " + new Date());
try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { ; }
};
int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
…
Thu
Thu
Thu
Thu
Thu
Thu
Thu
Nov
Nov
Nov
Nov
Nov
Nov
Nov
12
12
12
12
12
12
12
23:51:58
23:52:01
23:52:04
23:52:07
23:52:10
23:52:13
23:52:16
CET
CET
CET
CET
CET
CET
CET
2015
2015
2015
2015
2015
2015
2015
Task periodici: fixedDelay
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {System.out.println("Scheduling: " + new Date());
try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { ; }
};
int initialDelay = 0;
int period = 1;
executor.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.SECONDS);
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
Scheduling:
…
Thu
Thu
Thu
Thu
Thu
Thu
Thu
Nov
Nov
Nov
Nov
Nov
Nov
Nov
12
12
12
12
12
12
12
23:55:40
23:55:44
23:55:48
23:55:52
23:55:56
23:56:00
23:56:04
CET
CET
CET
CET
CET
CET
CET
2015
2015
2015
2015
2015
2015
2015
Concorrenza
Concorrenza
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newFixedThreadPool(2);
Counter counter = new Counter();
IntStream.range(0, 10000)
.forEach(i -> exec.submit(counter::inc));
exec.shutdown();
exec.awaitTermination(60, TimeUnit.SECONDS);
System.out.println(counter.count);
}
}
class Counter {
int count = 0;
void inc() {
count++;
}
}
ci aspetteremmo 10000, giusto?
"
Concorrenza
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newFixedThreadPool(2);
Counter counter = new Counter();
IntStream.range(0, 10000)
.forEach(i -> exec.submit(counter::inc));
exec.shutdown();
exec.awaitTermination(60, TimeUnit.SECONDS);
System.out.println(counter.count);
}
}
class Counter {
int count = 0;
synchronized void inc() {
count++;
}
}
10000 ok! bisogna sincronizzare!