Skip to content

Commit

Permalink
#75 read all rows.stream on rows.Close for prevent deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
kshvakov committed Oct 31, 2017
1 parent 7d7028f commit 9beaf18
Showing 1 changed file with 13 additions and 19 deletions.
32 changes: 13 additions & 19 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/kshvakov/clickhouse/lib/column"
Expand All @@ -15,18 +14,17 @@ import (
)

type rows struct {
ch *clickhouse
err error
mutex sync.RWMutex
finish func()
offset int
block *data.Block
totals *data.Block
extremes *data.Block
stream chan *data.Block
columns []string
blockColumns []column.Column
allDataIsReceived int32
ch *clickhouse
err error
mutex sync.RWMutex
finish func()
offset int
block *data.Block
totals *data.Block
extremes *data.Block
stream chan *data.Block
columns []string
blockColumns []column.Column
}

func (rows *rows) Columns() []string {
Expand Down Expand Up @@ -84,10 +82,7 @@ func (rows *rows) NextResultSet() error {
}

func (rows *rows) receiveData() error {
defer func() {
close(rows.stream)
atomic.StoreInt32(&rows.allDataIsReceived, 1)
}()
defer close(rows.stream)
var (
err error
packet uint64
Expand Down Expand Up @@ -147,8 +142,7 @@ func (rows *rows) receiveData() error {
func (rows *rows) Close() error {
rows.ch.logf("[rows] close")
rows.columns = nil
for !atomic.CompareAndSwapInt32(&rows.allDataIsReceived, 1, 2) {
time.Sleep(time.Millisecond * 2)
for range rows.stream {
}
if rows.finish != nil {
rows.finish()
Expand Down

0 comments on commit 9beaf18

Please sign in to comment.