db.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package service
  2. import (
  3. "crawler/config"
  4. "strings"
  5. "time"
  6. "database/sql"
  7. "fmt"
  8. "gorm.io/driver/mysql"
  9. "gorm.io/gorm"
  10. )
  11. var (
  12. DB_MOVIEW DB
  13. DB_CRAWLER DB
  14. DB_PLAYR DB
  15. )
  16. type DB struct {
  17. SQLDB *sql.DB
  18. GORMDB *gorm.DB
  19. DSN string
  20. Err error
  21. }
  22. // 2025.08.08 비동기 처리를 위한 수정
  23. type logItem struct {
  24. Action int
  25. Query string
  26. Comment string
  27. }
  28. var (
  29. logChannel = make(chan logItem, 10000)
  30. flushN = 200 // 200개 쌓이면 flush
  31. flushT = 250 * time.Millisecond // 250ms마다 flush
  32. stopChannel = make(chan struct{})
  33. stoppedChannel = make(chan struct{})
  34. )
  35. func InitLogWorker(db *sql.DB) {
  36. go func() {
  37. defer close(stoppedChannel)
  38. buf := make([]logItem, 0, flushN)
  39. ticker := time.NewTicker(flushT)
  40. defer ticker.Stop()
  41. for {
  42. select {
  43. case item := <-logChannel:
  44. buf = append(buf, item)
  45. if len(buf) >= flushN {
  46. flushLogs(db, buf)
  47. buf = buf[:0]
  48. }
  49. case <-ticker.C:
  50. if len(buf) > 0 {
  51. flushLogs(db, buf)
  52. buf = buf[:0]
  53. }
  54. case <-stopChannel:
  55. for {
  56. select {
  57. case it := <-logChannel:
  58. buf = append(buf, it)
  59. default:
  60. if len(buf) > 0 {
  61. flushLogs(db, buf)
  62. }
  63. return
  64. }
  65. }
  66. }
  67. }
  68. }()
  69. }
  70. func StopLogWorker() {
  71. close(stopChannel) // stopChannel를 닫아 더 이상 logItem을 받지 않도록 함
  72. <-stoppedChannel // flush 끝날 때까지 대기
  73. }
  74. // 다중 입력 SQL 생성
  75. func flushLogs(db *sql.DB, items []logItem) {
  76. var sb strings.Builder
  77. sb.WriteString(`INSERT INTO tb_general_log (action, query, comment, created_at) VALUES `)
  78. args := make([]any, 0, len(items)*3)
  79. for i, it := range items {
  80. if i > 0 {
  81. sb.WriteByte(',')
  82. }
  83. sb.WriteString("(?, ?, ?, NOW())")
  84. args = append(args, it.Action, it.Query, it.Comment)
  85. }
  86. if _, err := db.Exec(sb.String(), args...); err != nil {
  87. fmt.Println("log flush error:", err)
  88. }
  89. }
  90. func SetConfig() config.DBAccount {
  91. var (
  92. env = config.Env
  93. db = config.DB
  94. account = config.DBAccount{}
  95. )
  96. // DB 환경설정
  97. switch env.DeveloperEnv {
  98. case config.LOCAL:
  99. account = db.Local
  100. case config.DEV:
  101. account = db.Dev
  102. default:
  103. fmt.Println("DB 설정이 잘못되었습니다.")
  104. }
  105. //fmt.Printf("\n%#v\n", account)
  106. return account
  107. }
  108. // DB 연결
  109. func SetDatabase(dbName string) DB {
  110. var (
  111. env = config.Env
  112. db = config.DB
  113. account = SetConfig()
  114. err error
  115. )
  116. if dbName != "" {
  117. account.Name = dbName
  118. }
  119. /*
  120. * sqlDB
  121. */
  122. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s", account.User, account.Password, account.Address, account.Name)
  123. fmt.Printf("\nEnv : %s\n", env.DeveloperEnv)
  124. fmt.Printf("%s\n", dsn)
  125. SQLConnd, err := sql.Open(account.Driver, dsn)
  126. if err != nil || SQLConnd.Ping() != nil {
  127. fmt.Printf("%s\n", account.Driver)
  128. fmt.Printf("%s\n", dsn)
  129. fmt.Printf("%v\n", SQLConnd)
  130. fmt.Printf("sqlDB에 연결에 실패하였습니다.\n %v", err.Error())
  131. fmt.Println(err)
  132. SQLConnd.Close()
  133. }
  134. SQLConnd.SetConnMaxLifetime(time.Duration(db.MaxLifetime) * time.Second)
  135. SQLConnd.SetConnMaxIdleTime(time.Duration(db.MaxIdleTime) * time.Second)
  136. SQLConnd.SetMaxIdleConns(db.MaxIdleConn)
  137. SQLConnd.SetMaxOpenConns(db.MaxOpenConn)
  138. /*
  139. * GormDB
  140. */
  141. GORMConnd, err := gorm.Open(mysql.New(mysql.Config{
  142. Conn: SQLConnd,
  143. }), &gorm.Config{})
  144. if err != nil {
  145. fmt.Printf("GormDB에 연결에 실패하였습니다.\n %v", err.Error())
  146. fmt.Println(err)
  147. }
  148. new := new(DB)
  149. new.SQLDB = SQLConnd
  150. new.GORMDB = GORMConnd
  151. new.DSN = dsn
  152. new.Err = err
  153. fmt.Println("Database was opened successfully !!")
  154. return *new
  155. }
  156. // DB 연결
  157. func (db *DB) Connection(dbName string) DB {
  158. return SetDatabase(dbName)
  159. }
  160. // gorm 커넥션 연결
  161. func (db *DB) GConnection(dbName string) DB {
  162. return SetDatabase(dbName)
  163. }
  164. // DB 오류 입력
  165. func (db *DB) SetErrorLog(err error, query string) {
  166. var (
  167. conn = DB_CRAWLER.SQLDB
  168. sql = `CALL SP_ERROR_LOG(?, ?, ?);`
  169. errorMessage = strings.TrimSpace(strings.Split(err.Error(), ":")[1])
  170. errorNo = strings.TrimSpace(strings.Replace(strings.Split(err.Error(), ":")[0], "Error ", "", 1))
  171. )
  172. conn.Exec(sql, errorNo, errorMessage, query)
  173. switch errorMessage {
  174. case "sql: no rows in result set":
  175. case "":
  176. default:
  177. fmt.Printf("DB error msg : %s\n", err.Error())
  178. }
  179. fmt.Printf("Exe error query : %s\n", query)
  180. }
  181. /*
  182. * DB SQL 입력
  183. * 2025.08.08 비동기 처리를 위한 수정으로 미사용
  184. */
  185. /*
  186. func (db *DB) SetGeneralLog(action int, query, comment string) {
  187. var (
  188. conn = DB_CRAWLER.SQLDB
  189. sql = `
  190. INSERT INTO tb_general_log
  191. SET
  192. action = ?,
  193. query = ?,
  194. comment = ?,
  195. created_at = NOW();
  196. `
  197. )
  198. _, err := conn.Exec(sql, action, query, comment)
  199. if err != nil {
  200. db.SetErrorLog(err, sql)
  201. }
  202. }
  203. */
  204. func (db *DB) SetGeneralLog(action int, query, comment string) {
  205. if len(query) > 4000 {
  206. query = query[:4000] + "…(truncated)" // 너무 긴 쿼리는 잘라서 저장 (행 크기/페이지 split 완화)
  207. }
  208. select {
  209. case logChannel <- logItem{action, query, comment}:
  210. default:
  211. // 큐가 가득하면 발생
  212. }
  213. }
  214. // DB 연결 종료
  215. func (db *DB) Close() {
  216. defer func() {
  217. if err := recover(); err != nil {
  218. fmt.Printf("Database recovered message: %s\n", err)
  219. }
  220. }()
  221. if DB_MOVIEW != (DB{}) {
  222. defer DB_MOVIEW.SQLDB.Close()
  223. DB_MOVIEW.SQLDB = nil
  224. DB_MOVIEW.GORMDB = nil
  225. DB_MOVIEW.DSN = ""
  226. DB_MOVIEW.Err = nil
  227. }
  228. if DB_CRAWLER != (DB{}) {
  229. defer DB_CRAWLER.SQLDB.Close()
  230. DB_CRAWLER.SQLDB = nil
  231. DB_CRAWLER.GORMDB = nil
  232. DB_CRAWLER.DSN = ""
  233. DB_CRAWLER.Err = nil
  234. }
  235. if DB_PLAYR != (DB{}) {
  236. defer DB_PLAYR.SQLDB.Close()
  237. DB_PLAYR.SQLDB = nil
  238. DB_PLAYR.GORMDB = nil
  239. DB_PLAYR.DSN = ""
  240. DB_PLAYR.Err = nil
  241. }
  242. fmt.Println("DB closed")
  243. }