不小心重用了流
intstream stream = intstream.of(1, 2);
stream.foreach(system.out::println);
// that was fun! let's do it again!
你會碰到一個這樣的錯誤:
stream has already been operated upon or closed
是以使用流的時候應當格外注意。它隻能消費一次。
不小心建立了一個”無限"流
你可能一不留神就建立了一個無限流。就拿下面這個例子來說:
intstream.iterate(0, i -> i + 1)
.foreach(system.out::println);
流的問題就在于它有可能是無限的,如果你的确是這樣設計的話。唯一的問題就是,這并不是你真正想要的。是以,你得確定每次都給流提供一個适當的大小限制:
// that's better
.limit(10)
不小心建立了一個“隐藏的”無限流
這個話題是說不完的。你可能一不小心就真的建立了一個無限流。比如說下面的這個:
intstream.iterate(0, i -> ( i + 1 ) % 2)
.distinct()
這樣做的結果是:
我們生成了0和1的交替數列
然後隻保留不同的數值,比如說,一個0和一個1
然後再将流的大小限制為10
最後對流進行消費
好吧,這個distinct()操作它并不知道iterate()所調用的這個函數生成的隻有兩個不同的值。它覺得可能還會有别的值。是以它會不停地從流中消費新的值,而這個limit(10)永遠也不會被調用到。不幸的是,你的應用程式會崩掉。
不小心建立了一個”隐藏”的并行無限流
我還是想繼續提醒下你,你可能真的一不小心就消費了一個無限流。假設你認為distinct()操作是會并行執行的。那你可能會這麼寫:
.parallel()
.foreach(system.out::println); 現在我們可以知道的是,這段代碼會一直執行下去。不過在前面那個例子中,你至少隻消耗了機器上的一個cpu。而現在你可能會消耗四個,一個無限流的消費很可能就會消耗掉你整個系統的資源。這可相當不妙。這種情況下你可能得去重新開機伺服器了。看下我的筆記本在最終崩潰前是什麼樣的:
操作的順序
為什麼我一直在強調你可能一不小心就建立了一個無限流?很簡單。因為如果你把上面的這個流的limit()和distinct()操作的順序掉換一下,一切就都ok了。
現在則會輸出:
0
1
為什麼會這樣?因為我們先将無限流的大小限制為10個值,也就是(0 1 0 1 0 1 0 1 0 1),然後再在這個有限流上進行歸約,求出它所包含的不同值,(0,1)。
當然了,這個在語義上就是錯誤的了。因為你實際上想要的是資料集的前10個不同值。沒有人會真的要先取10個随機數,然後再求出它們的不同值的。
如果你是來自sql背景的話,你可能不會想到還有這個差別。就拿sql server 2012舉例來說,下面的兩個sql語句是一樣的:
-- using top
select distinct top 10 *
from i
order by ..
-- using fetch
select *
offset 0 rows
fetch next 10 rows only
是以,作為一名sql使用者,你可能并不會注意到流操作順序的重要性。
還是操作順序
既然說到了sql,如果你用的是mysql或者postgresql,你可能會經常用到limit .. offset子句。sql裡全是這種暗坑,這就是其中之一。正如sql server 2012中的文法所說明的那樣,offset子名會優先執行。
如果你将mysql/postgresql方言轉化成流的話,得到的結果很可能是錯的:
.limit(10) // limit
.skip(5) // offset
上面的代碼會輸出:
5
6
7
8
9
是的,它輸出9後就結束了,因為首先生效的是limit(),這樣會輸出(0 1 2 3 4 5 6 7 8 9)。其次才是skip(),它将流縮減為(5 6 7 8 9)。而這并不是你所想要的。
警惕limit .. offset和offset .. limit的陷阱!
使用過濾器來周遊檔案系統
這個問題我們之前已經講過了。使用過濾器來周遊檔案系統是個不錯的方式:
files.walk(paths.get("."))
.filter(p -> !p.tofile().getname().startswith("."))
看起來上面的這個流隻是周遊了所有的非隐藏目錄,也就是不以點号開始的那些目錄。不幸的是,你又犯了錯誤五和錯誤六了。walk()方法已經生成一個目前目錄下的所有子目錄的流。雖然是一個惰性流,但是也包含了所有的子路徑。現在的這個過濾器可以正确過濾掉所有名字以點号開始的那些目錄,也就是說結果流中不會包含.git或者.idea。不過路徑可能會是:..git\refs或者..idea\libraries。而這并不是你實際想要的。
你可别為了解決問題而這麼寫:
.filter(p -> !p.tostring().contains(file.separator + "."))
雖然這麼寫的結果是對的,但是它會去周遊整個子目錄結構樹,這會遞歸所有的隐藏目錄的子目錄。
我猜你又得求助于老的jdk1.0中所提供的file.list()了。不過好消息是, filenamefilter和filefilter現在都是函數式接口了。
修改流内部的集合
當周遊清單的時候,你不能在疊代的過程中同時去修改這個清單。這個在java 8之前就是這樣的,不過在java 8的流中則更為棘手。看下下面這個0到9的清單:
// of course, we create this list using streams:
list<integer> list =
intstream.range(0, 10)
.boxed()
.collect(tocollection(arraylist::new));
現在,假設下我們在消費流的時候同時去删除元素:
list.stream()
// remove(object), not remove(int)!
.peek(list::remove)
有趣的是,其中的一些元素中可以的删除的。你得到的輸出将會是這樣的:
2
4
null
java.util.concurrentmodificationexception
如果我們捕獲異常後再檢視下這個清單,會發現一個很有趣的事情。得到的結果是:
[1, 3, 5, 7, 9]
所有的奇數都這樣。這是一個bug嗎?不,這更像是一個特性。如果你看一下jdk的源碼,會發現在arraylist.arralistspliterator裡面有這麼一段注釋:
/* * if arraylists were immutable, or structurally immutable (no * adds, removes, etc), we could implement their spliterators * with arrays.spliterator. instead we detect as much * interference during traversal as practical without * sacrificing much performance. we rely primarily on * modcounts. these are not guaranteed to detect concurrency * violations, and are sometimes overly conservative about * within-thread interference, but detect enough problems to * be worthwhile in practice. to carry this out, we (1) lazily * initialize fence and expectedmodcount until the latest * point that we need to commit to the state we are checking * against; thus improving precision. (this doesn't apply to * sublists, that create spliterators with current non-lazy * values). (2) we perform only a single * concurrentmodificationexception check at the end of foreach * (the most performance-sensitive method). when using foreach * (as opposed to iterators), we can normally only detect * interference after actions, not before. further * cme-triggering checks apply to all other possible * violations of assumptions for example null or too-small * elementdata array given its size(), that could only have * occurred due to interference. this allows the inner loop * of foreach to run without any further checks, and * simplifies lambda-resolution. while this does entail a * number of checks, note that in the common case of * list.stream().foreach(a), no checks or other computation * occur anywhere other than inside foreach itself. the other * less-often-used methods cannot take advantage of most of * these streamlinings. */
現在來看下如果我們對這個流排序後會是什麼結果:
.sorted()
輸出的結果看起來是我們想要的:
3
而流消費完後的清單是空的:
[]
也就是說所有的元素都正确地消費掉并删除了。sorted()操作是一個“帶狀态的中間操作”,這意味着後續的操作不會再操作内部的那個集合了,而是在一個内部的狀态上進行操作。現在你可以安全地從清單裡删除元素了!
不過,真的是嗎這樣?我們來試一下帶有parallel(), sorted()的删除操作:
這個會輸出 :
現在清單裡包含:
[8]
唉呀。居然沒有删完所有的元素?!誰能解決這個問題,我免費請他喝酒!
這些行為看起來都是不确定的,我隻能建議你在使用流的時候不要去修改它内部的資料集合。這樣做是沒用的。
忘了去消費流
你覺得下面這個流在做什麼?
intstream.range(1, 5)
.peek(system.out::println)
.peek(i -> {
if (i == 5)
throw new runtimeexception("bang");
});
看完這段代碼,你覺得應該會輸出(1 2 3 4 5)然後抛出一個異常。不過并不是這樣。它什麼也不會做。這個流并沒有被消費掉,它隻是靜靜的待在那裡。
正如别的流api或者dsl那樣,你可能會忘了調用這個終止操作。當你使用peek()的時候也是這樣的,因為peek有點類似于foreach()。
在jooq中也存在這樣的情況,如果你忘了去調用 execute()或者fetch():
dsl.using(configuration)
.update(table)
.set(table.col1, 1)
.set(table.col2, "abc")
.where(table.id.eq(3));
杯具。忘了調用execute方法了。
并行流死鎖
終于快講完了~
如果你沒有正确地進行同步的話,所有的并發系統都可能碰到死鎖。現實中的例子可能不那麼明顯,不過如果你想自己創造一個場景的話倒是很容易。下面這個parallel()流肯定會造成死鎖:
object[] locks = { new object(), new object() };
intstream
.range(1, 5)
.peek(unchecked.intconsumer(i -> {
synchronized (locks[i % locks.length]) {
thread.sleep(100);
synchronized (locks[(i + 1) % locks.length]) {
thread.sleep(50);
}
}))
注意這裡unchecked.intconsumer()的使用,它把intconsumer接口轉化成了 org.jooq.lambda.fi.util.function.checkedintconsumer,這樣你才可以抛出已檢查異常。
好吧。這下你的機器倒黴了。這些線程會一直阻塞下去:-)
不過好消息就是,在java裡面要寫出一個這種教科書上的死鎖可不是那麼容易。
想進一步了解的話,可以看下brian goetz在stackoverflow上的一個回答。
結論
引入了流和函數式程式設計之後,我們開始會碰到許多新的難以發現的bug。這些bug很難避免,除非你見過并且還時刻保持警惕。你必須去考慮操作的順序,還得注意流是不是無限的。
流是一個非常強大的工具,但也是一個首先得去熟練掌握的工具。
最新内容請見作者的github頁:http://qaseven.github.io/