최근에는 배포된 스마트 컨트랙트 event를 구독하여 데이터를 가져오는 로직을 짜고 있다.
그러다 보니, 자연스럽게 event를 어떻게 가져오는지 궁금해져서 알아보았다.
[다루는 내용은 geth 기준]
web3, ethers, eth-client ... 등 에서 contract에 대한 event listen 요청은 이더리움 노드의 eth_newFilter 요청이고, 이는 NewFilter()를 호출한다.
// go-ethereum/eth/filters/api.go
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
logs := make(chan []*types.Log)
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
if err != nil {
return "", err
}
api.filtersMu.Lock()
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
api.filtersMu.Unlock()
go func() {
for {
select {
case l := <-logs:
api.filtersMu.Lock()
if f, found := api.filters[logsSub.ID]; found {
f.logs = append(f.logs, l...)
}
api.filtersMu.Unlock()
case <-logsSub.Err():
api.filtersMu.Lock()
delete(api.filters, logsSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return logsSub.ID, nil
}
logs를 받는 채널을 선언하고, 노드에 filters라는 map 자료구조에 filter{}를 넣는다. 조금 더 들어가보자.
SubscribeLogs()를 먼저 본다.
// go-ethereum/eth/filters/filter_system.go
func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
} else {
from = rpc.BlockNumber(crit.FromBlock.Int64())
}
if crit.ToBlock == nil {
to = rpc.LatestBlockNumber
} else {
to = rpc.BlockNumber(crit.ToBlock.Int64())
}
// only interested in pending logs
if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribePendingLogs(crit, logs), nil
}
// only interested in new mined logs
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
// only interested in mined logs within a specific block range
if from >= 0 && to >= 0 && to >= from {
return es.subscribeLogs(crit, logs), nil
}
// interested in mined logs from a specific block number, new logs and pending logs
if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribeMinedPendingLogs(crit, logs), nil
}
// interested in logs from a specific block number to new mined blocks
if from >= 0 && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
return nil, fmt.Errorf("invalid from and to block combination: from > to")
}
사용자가 요청한 filter의 값을 사용하여 from ~ to 블록 범위를 정한다. (비어있으면 최근 블록 넘버를 넣어준다)
그 후, 원하는 log 타입에 따라 메서드를 호출해준다. 여기서는 내가 유일하게 써본 from, to 가 비어있는 filter, 즉 새롭게 생성된 블록의 로그를 갖고 오는 log 메서드를 보겠다.
// only interested in new mined logs
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
// 여기로 들어간다.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
//
func (es *EventSystem) subscribe(sub *subscription) *Subscription {
es.install <- sub
<-sub.installed
return &Subscription{ID: sub.id, f: sub, es: es}
}
//
type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
subscription 구조체를 만든 후, subscribe() 수행
해당 메서드에서는 EventSystem install 채널로 생성한 subscribe 을 보낸다.
type filterIndex map[Type]map[rpc.ID]*subscription
// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
es.txsSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()
index := make(filterIndex)
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
for {
select {
case ev := <-es.txsCh:
es.handleTxsEvent(index, ev)
case ev := <-es.logsCh:
es.handleLogs(index, ev)
case ev := <-es.rmLogsCh:
es.handleRemovedLogs(index, ev)
case ev := <-es.pendingLogsCh:
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
index[LogsSubscription][f.id] = f
index[PendingLogsSubscription][f.id] = f
} else {
index[f.typ][f.id] = f
}
close(f.installed)
case f := <-es.uninstall:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
delete(index[LogsSubscription], f.id)
delete(index[PendingLogsSubscription], f.id)
} else {
delete(index[f.typ], f.id)
}
close(f.err)
// System stopped
case <-es.txsSub.Err():
return
case <-es.logsSub.Err():
return
case <-es.rmLogsSub.Err():
return
case <-es.chainSub.Err():
return
}
}
}
그러면 eventLoop() 에서 case es.install로 넘어가 subscribe를 받고, 타입에 따라 index에 subscribe를 넣어준다.
즉, 사용자가 node에 event listen을 요청한다는 것은 해당 node의 EventSystem안 filterIndex에 subscribe를 생성하여 넣어주고, api.filters라는 map 자료구조에 set(subscribe.id, subscribe) 해주는 과정으로 볼 수 있겠다.
close(f.installed)를 하며 결국은 이전 코드로 돌아가
func (es *EventSystem) subscribe(sub *subscription) *Subscription {
es.install <- sub
<-sub.installed
return &Subscription{ID: sub.id, f: sub, es: es}
}
Subscription을 리턴해준다.
여기까지가 event listen을 했을 때 과정이고, 이제는 실제 블록이 생성될 때 어떤식으로 나에게 넘어오는 지 살펴보자.
해당 부분은 go-ethereum/core/blockchain.go 에 있다.
InsertChain(), insertChain() 메서드에서 이루어지는데 코드가 길어서... 요약으로 대체한다.
insertChain 메서드의 파라미터로 blocks가 넘어오고, blocks iterator로 순회하게 되어있다.
( * * blocks ? 여러 블록이 넘어오는 경우도 한번 알아 봐야겠다.)
각 block은 evm을 통해 내부 트랜잭션에 대한 처리를 하게되고, 그에 대한 리턴값은 이와 같다
. . .
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
. . .
if !setHead {
// Don't set the head, only insert the block
err = bc.writeBlockWithState(block, receipts, logs, statedb)
} else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
}
. . .
블록을 받고, 내부 트랜잭션까지 처리한 뒤, writeBlockAndSetHead() 메서드로 receipts, logs를 넘긴다. 따라가보자.
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
if err := bc.writeBlockWithState(block, receipts, logs, state); err != nil {
return NonStatTy, err
}
currentBlock := bc.CurrentBlock()
reorg, err := bc.forker.ReorgNeeded(currentBlock.Header(), block.Header())
if err != nil {
return NonStatTy, err
}
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, err
}
}
status = CanonStatTy
} else {
status = SideStatTy
}
// Set new head.
if status == CanonStatTy {
bc.writeHeadBlock(block)
}
bc.futureBlocks.Remove(block.Hash())
if status == CanonStatTy {
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
// In theory we should fire a ChainHeadEvent when we inject
// a canonical block, but sometimes we can insert a batch of
// canonicial blocks. Avoid firing too many ChainHeadEvents,
// we will fire an accumulated ChainHeadEvent and disable fire
// event here.
if emitHeadEvent {
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
}
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
return status, nil
}
사용자에게 logs가 보내지는 지점을 찾았다. ( bc.logsFeed.Send(logs) )
logs의 크기가 0보다 크면 bc.logsFeed에 logs를 보낸다.
(event 관련 transaction은 logs 길이가 반드시 1이상이다. 이는 나중에 evm 관련 글에 자세히 추가하면 좋을 것 같다.)
하지만 아직 납득이 안되는 것이... 블록이 생성되면 bc.logsFeed로 logs를 보내는 것 까지는 알겠는데 이 데이터를 어디서 받아서 나한테 넘겨주는 걸까? 지금까지 과정에서 logs는 선언했지만 bc.logsFeed를 따로 선언 한 적은 없었는데..
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}
// Subscribe events
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}
go m.eventLoop()
return m
}
위 코드에서 subscribe event가 생성되고 있었다. SubscribeLogsEvent() 메서드를 한번 보자.
func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
// -->
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return bc.scope.Track(bc.logsFeed.Subscribe(ch))
}
// -->
func (f *Feed) Subscribe(channel interface{}) Subscription {
f.once.Do(f.init)
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
}
func (sc *SubscriptionScope) Track(s Subscription) Subscription {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.closed {
return nil
}
if sc.subs == nil {
sc.subs = make(map[*scopeSub]struct{})
}
ss := &scopeSub{sc, s}
sc.subs[ss] = struct{}{}
return ss
}
blockchain.logsFeed -> EventSystem.logsCh
두 채널이 연결되고,
EventSystem의 eventLoop를 한번 더 살펴 보면,
// go-ethereum/eth/filters/filter_system.go, eventLoop()
for {
select {
case ev := <-es.txsCh:
es.handleTxsEvent(index, ev)
case ev := <-es.logsCh:
es.handleLogs(index, ev)
.
.
.
}
blockchain.logsFeed에서 EventSystem.logsCh로 log가 전송되면 위 코드의
ev := <-es.logsCh case문을 수행한다.
// go-ethereum/eth/filters/filter_system.go
func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
if len(ev) == 0 {
return
}
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
여기서 받은 logs는 블록 안 모든 트랜잭션에 대한 값이다. filterLogs에서 사용자가 요청한 filter에 따른 실제 filtering을 수행한다.
// go-ethereum/eth/filters/filter.go
func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
var ret []*types.Log
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
continue
}
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue Logs
}
for i, sub := range topics {
match := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if log.Topics[i] == topic {
match = true
break
}
}
if !match {
continue Logs
}
}
ret = append(ret, log)
}
return ret
}
이렇게 걸러진 logs가 드디어 api.filters[ID].logs 에 들어간다...
// go-ethereum/eth/filters/filter_system.go, handleLogs()
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
이렇게 해서,
블록생성 -> logs -> blockchain.logsFeed -> eventSystem.logsCh -> filtering() -> f.logs -> User
이 사이클이 완성되었다.
User가 f.logs에서 값을 가져가는 과정을 정리해보자면,
사용자가 event listen 요청을 했을 때 go-ethereum/eth/filters/api.go안 NewFilter() 메서드가 호출되었다.
해당 메서드는 filters라는 map 자료구조에 Subscription을 저장하고 go routine을 실행했다.
// go-ethereum/eth/filters/api.go , NewFilter() 안
go func() {
for {
select {
case l := <-logs:
api.filtersMu.Lock()
if f, found := api.filters[logsSub.ID]; found {
f.logs = append(f.logs, l...)
}
api.filtersMu.Unlock()
case <-logsSub.Err():
api.filtersMu.Lock()
delete(api.filters, logsSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
사용자의 event listen 호출마다 위 go routine이 생성된다.
블록이 생성될 때마다 logs의 값을 넘겨 받아 f.logs에 append해주고 있다.
사용자는 이렇게 추가된 f.logs안 logs를 가져간다.
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(api.timeout)
switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
}
}
return []interface{}{}, fmt.Errorf("filter not found")
}
filter를 등록할 때 리턴되었던 id값을 키로 사용하여 f.logs 데이터에 접근하며, 가져갈 때 f.logs를 비워준다.
이렇게 event listen 등록, 블록 생성시 logs 추가, 추가된 logs 가져가는 부분까지 살펴봤는데, 그래서 결국 사용자는 어떻게 블록 생성을 알고 GetFilterChanges를 호출하느냐는 해결이 되지 않은 것 같다.
그래도 event 가 어떻게 동작하는가 정도는 알 수 있었던 시간이었다.
대충 logs filter 관련 구조
Ethereum Node
FilterService - Filters
- sub id 1
- logs queue . . .
- sub id 2
- logs queue . . .
.
.
.
만약 사용자가 노드의 리소스를 고갈할 목적으로 event를 계속하여 생성하거나, 일부러 event 속 logs 데이터를 안가져가면 어쩌나 싶었는데, deadline이 설정되어 시간이 지나면 go routine이 내려가게 되어 있다. 찾아보니 geth의 filter deadeline은 5분으로 되어있더라... (go-ethereum/eth/backend.go:325)
5분이면 위험하지는 않나... 아니면 사용자별 request가 제한되어 있나 봐야겠다.
filter의 deadline은 5분이다. 그 사이 사용자가 logs를 가져가면 deadline이 리셋된다.
이더리움에서 포크를 딴 블록체인, 블록 생성이 짧은 경우에는 logs를 보관하는 큐 차는 속도가 부담스럽진 않을까, 혹시 다르게 설정했을까? 싶어 블록 생성 시간 1초인 클레이튼을 봤는데, 동일하게 되어 있었다. deadline도 5분으로 되어있다.
ethereum의 logs 큐 차는 속도와 클레이튼의 logs 큐 차는 속도는 다를텐데 deadline을 똑같이 줘도 될까? 그전에, logs queue가 부담이 되기는 할까?
2022년 5월 22일 기준 클레이스코프 하루 트랜잭션 35만건으로 잡고, 대충 발생 시간 07:00 ~ 02:00 정도로 계산을 해보면,
350000 / (19 x 60 x 60) => 5 TPS
5개 트랜잭션이 모두 이벤트를 갖고 있다고 가정해도 한 블록당 logs 5개 append... 몰리는 경우 가정해도 크게 부담이 될지 모르겠다.