Golang Mgo笔记
1.1 mgo概述
mgo 为 mongodb golang中实现的driver,简单高效,官方地址: http://labix.org/mgo, 主要实现了三个pkg:
安装:
go get gopkg.in/mgo.v2
mogodb采用 bson 格式, bson规范参见 http://bsonspec.org/, bson tools 支持将csv,json,xml,hex等转换成bson格式的文件。
Mac下 Mogodb 的界面客户端 Robomongo
1.2 Example
// This program provides a sample application for using MongoDB with
// the mgo driver.
package main
import (
"log"
"sync"
"time"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
const (
MongoDBHosts = "127.0.0.1"
AuthDatabase = "test"
AuthUserName = ""
AuthPassword = ""
TestDatabase = "test"
)
type (
// BuoyCondition contains information for an individual station.
BuoyCondition struct {
WindSpeed float64 `bson:"wind_speed_milehour"`
WindDirection int `bson:"wind_direction_degnorth"`
WindGust float64 `bson:"gust_wind_speed_milehour"`
}
// BuoyLocation contains the buoy's location.
BuoyLocation struct {
Type string `bson:"type"`
Coordinates []float64 `bson:"coordinates"`
}
// BuoyStation contains information for an individual station.
BuoyStation struct {
ID bson.ObjectId `bson:"_id,omitempty"`
StationId string `bson:"station_id"`
Name string `bson:"name"`
LocDesc string `bson:"location_desc"`
Condition BuoyCondition `bson:"condition"`
Location BuoyLocation `bson:"location"`
}
)
// main is the entry point for the application.
func main() {
// We need this object to establish a session to our MongoDB.
mongoDBDialInfo := &mgo.DialInfo{
Addrs: []string{MongoDBHosts},
Timeout: 60 * time.Second,
Database: AuthDatabase,
Username: AuthUserName,
Password: AuthPassword,
}
// Create a session which maintains a pool of socket connections
// to our MongoDB.
mongoSession, err := mgo.DialWithInfo(mongoDBDialInfo)
if err != nil {
log.Fatalf("CreateSession: %s\n", err)
}
// Reads may not be entirely up-to-date, but they will always see the
// history of changes moving forward, the data read will be consistent
// across sequential queries in the same session, and modifications made
// within the session will be observed in following queries (read-your-writes).
// http://godoc.org/labix.org/v2/mgo#Session.SetMode
mongoSession.SetMode(mgo.Monotonic, true)
// Create a wait group to manage the goroutines.
var waitGroup sync.WaitGroup
// Perform 10 concurrent queries against the database.
waitGroup.Add(10)
for query := 0; query < 10; query++ {
go RunQuery(query, &waitGroup, mongoSession)
}
// Wait for all the queries to complete.
waitGroup.Wait()
log.Println("All Queries Completed")
}
// RunQuery is a function that is launched as a goroutine to perform
// the MongoDB work.
func RunQuery(query int, waitGroup *sync.WaitGroup, mongoSession *mgo.Session) {
// Decrement the wait group count so the program knows this
// has been completed once the goroutine exits.
defer waitGroup.Done()
// Request a socket connection from the session to process our query.
// Close the session when the goroutine exits and put the connection back
// into the pool.
sessionCopy := mongoSession.Copy()
defer sessionCopy.Close()
// Get a collection to execute the query against.
collection := sessionCopy.DB(TestDatabase).C("buoy_stations")
log.Printf("RunQuery : %d : Executing\n", query)
// Retrieve the list of stations.
var buoyStations []BuoyStation
err := collection.Find(nil).All(&buoyStations)
if err != nil {
log.Printf("RunQuery : ERROR : %s\n", err)
return
}
log.Printf("RunQuery : %d : Count[%d]\n", query, len(buoyStations))
}
1.3 ObjectID
type ObjectId string # 12 个 byte组成
- 4-byte value representing the seconds since the Unix epoch,
- 3-byte machine identifier,
- 2-byte process id, and
- 3-byte counter, starting with a random value.
mgo中的代码实现:
// NewObjectId returns a new unique ObjectId.
func NewObjectId() ObjectId {
var b [12]byte
// Timestamp, 4 bytes, big endian
binary.BigEndian.PutUint32(b[:], uint32(time.Now().Unix()))
// Machine, first 3 bytes of md5(hostname)
b[4] = machineId[0]
b[5] = machineId[1]
b[6] = machineId[2]
// Pid, 2 bytes, specs don't specify endianness, but we use big endian.
b[7] = byte(processId >> 8)
b[8] = byte(processId)
// Increment, 3 bytes, big endian
i := atomic.AddUint32(&objectIdCounter, 1)
b[9] = byte(i >> 16)
b[10] = byte(i >> 8)
b[11] = byte(i)
return ObjectId(b[:])
}
1.3.1 ObjectID为空判断
文[1] 中:
换句话说,如果你是想判断一个结构体的ObjectId是否为空,使用h.ID == “”是一定会结果为false的。
以上说法是根据以下函数判断,说法不准确,以下有代码验证, golang 相关比较的规范:Comparison_operators
// String returns a hex string representation of the id.
// Example: ObjectIdHex("4d88e15b60f486e428412dc9").
func (id ObjectId) String() string {
return fmt.Sprintf(`ObjectIdHex("%x")`, string(id))
}
为空测试:
package main
import (
"fmt"
"gopkg.in/mgo.v2/bson"
)
type MyString string
func (s MyString) String() string {
return "This is mystring nil"
}
type Person struct {
ID bson.ObjectId `bson:"_id,omitempty"`
Name string
Phone string
}
func main() {
// test type string equel
var str MyString
fmt.Println(str == "") // expect true
fmt.Println(str)
// test objectID
person := &Person{}
fmt.Println(person.ID == "") // expect true
person.ID = bson.NewObjectId()
fmt.Println(person.ID == "") // expect false
}
结果如下:
true
This is mystring nil
true
false
1.3.2 insert 中的 ObjectID
Client主动生成
采用ObjectID提供的 NewObjectId() 方法,可以保证唯一性。
type Person struct {
ID bson.ObjectId `bson:"_id"`
Name string
Phone string
}
person := &Person{ID: bson.NewObjectId(), Name: "Ale", Phone: "+55 53 8116 9639"}
NewObjectIdWithTime(t time.Time) 这个方法由于仅仅使用了当前时间值作为了key,因此不推荐使用。
// NewObjectIdWithTime returns a dummy ObjectId with the timestamp part filled
// with the provided number of seconds from epoch UTC, and all other parts
// filled with zeroes. It's not safe to insert a document with an id generated
// by this method, it is useful only for queries to find documents with ids
// generated before or after the specified timestamp.
func NewObjectIdWithTime(t time.Time) ObjectId {
var b [12]byte
binary.BigEndian.PutUint32(b[:4], uint32(t.Unix()))
return ObjectId(string(b[:]))
}
Mgo驱动自动生成
默认情况下ObjectId是由客户端Mogodb Driver生成的,和服务端没有关系。
type Person struct {
ID bson.ObjectId `bson:"_id,omitempty"` # 注意增加了 omitempty 属性, insert 过程中会自动生成 _id
Name string
Phone string
}
1.4 时间问题 [1]
之前看到有人问,为什么保存的时间进入到数据库中慢了8个小时呢?原因是在保存进入MongoDB时,数据是按照UTC时间(不懂什么是UTC?看这里)进行的保存,但是取出是按照当前时区来取出。那么问题来了,我的客户如果不都是国人,我怎么保存时间呢?目前我们采用了两种方式来确定数据库的保存时间。一种是Unix时间戳,这个是不受到时区的影响的,由前端格式化为对应的时区时间;另外一种则是需要在额外的对从MongoDB数据库中取出的数据进行额外的时区校准,简单来说可以这样:
type Home struct {
ID bson.ObjectId `bson:"_id,omitempty"`
Name string `bson:"name"`
InsertTime time.Time `bson:"insert_time"`
}
func main() {
sess, _ := mgo.Dial("127.0.0.1")
c := sess.DB("test").C("home")
h := Home{Name: "123", InsertTime: time.Now()}
c.Upsert(bson.M{"name": "123"}, h)
c.Find(bson.M{"name": "123"}).One(&h)
fmt.Println(h.InsertTime.Format("2006-01-02 15:04:05"))
tz, _ := time.LoadLocation("America/New_York")
fmt.Println(h.InsertTime.In(tz).Format("2006-01-02 15:04:05"))
}
1.5 Update更新操作
1.5.1 Update和UpdateId函数
和mysql不同,Update函数只能用来修改单条记录,即使条件能匹配多条记录,也只会修改第一条匹配的记录, Update()函数会将文档整体update中的data,而不仅仅是更新 Person中的Name, 同时还会将 InsertTime 字段删除。
selector := bson.M{"_id": bson.ObjectIdHex("56fdce98189df8759fd61e5b")}
data := bson.M{"name": "otherName"}
err := mongoSession.DB("test").C("Person").Update(selector, data)
if err != nil {
panic(err)
}
UpdateId 函数则是将上述的selector更换成了 ObjectId, 一种简化方式,Update中比较常用。
1.5.2 Update函数使用 set 关键字
如果仅仅需要更新Person中的Name字段需要使用$set关键词, 用法如下:
selector := bson.M{"_id": bson.ObjectIdHex("571de968a99cff2c68264807")}
data := bson.M{"$set": bson.M{"name": "otherName"}}
err := mongoSession.DB("test").C("Person").Update(selector, data)
if err != nil { // 如果更新的数据不存在,则会报一个not found的错误: err == mgo.ErrNotFound
panic(err)
}
1.5.2 UpdateAll 函数
符合条件的文档记录会被批量更新
// func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error)
selector := bson.M{"name": "Tom"}
data := bson.M{"$set": bson.M{"insert_time": time.Now()}}
changeInfo, err := mongoSession.DB("test").C("Person").UpdateAll(selector, data)
if err != nil {
panic(err)
}
fmt.Printf("%+v\n", changeInfo)
// output: &{Updated:2 Removed:0 UpsertedId:<nil>}
1.5.3 Upsert或UpsertId
这个函数就是如果数据存在就更新,否则就新增一条记录,比较常用。
id := bson.ObjectIdHex("571df02ea99cff2c6826480b")
data := bson.M{"$set": bson.M{"name": "Tom"}}
changeInfo, err := mongoSession.DB("test").C("Person").UpsertId(id, data)
if err != nil {
panic(err)
}
fmt.Printf("%+v\n", changeInfo)
// 首次执行output: &{Updated:0 Removed:0 UpsertedId:ObjectIdHex("571df02ea99cff2c6826480b")}
// 再次执行output: &{Updated:1 Removed:0 UpsertedId:<nil>}
1.6 连接池维护
1.6.1 使用Session Copy高效实现访问
1.2 中的RunQuery函数中使用Seesion Copy,可以实现连接池的效果。
func RunQuery(query int, waitGroup *sync.WaitGroup, mongoSession *mgo.Session) {
// Decrement the wait group count so the program knows this
// has been completed once the goroutine exits.
defer waitGroup.Done()
// Request a socket connection from the session to process our query.
// Close the session when the goroutine exits and put the connection back
// into the pool.
sessionCopy := mongoSession.Copy() # 对于原有session的Copy
defer sessionCopy.Close() # 处理完成后关闭该Session
// Get a collection to execute the query against.
collection := sessionCopy.DB(TestDatabase).C("buoy_stations")
log.Printf("RunQuery : %d : Executing\n", query)
// Retrieve the list of stations.
var buoyStations []BuoyStation
err := collection.Find(nil).All(&buoyStations)
if err != nil {
log.Printf("RunQuery : ERROR : %s\n", err)
return
}
log.Printf("RunQuery : %d : Count[%d]\n", query, len(buoyStations))
}
Session Copy 等同于重新创建一个Session,底层启动一个socket连接到mogodb server,但是使用原有Session的Authentication信息。采用Session Copy的方式一般是连接的数目可控制的场景。
1.6.2 Session Clone 参见[4]
Session Clone类似于Session Copy,会尽量直接复用原有session的socket,如果过多的goroutine使用session,但是没有close,会导致重新建立新的连接。
1.6.3 合理设置Session数目
mgo中的连接池最大数目为 4096,如果超过这个值会导致其他的连接不能得到处理。
通过配置文件设置 maxPoolSize 设置
[host]:[port]?maxPoolSize=300
在代码中进行设置
dao.GlobalMgoSession.SetPoolLimit(300)