Почему запрос вставки приводит к тому, что конвейер потока данных останавливает обработку вставок базы данных? - PullRequest
0 голосов
/ 04 апреля 2019

У меня есть конвейер потоков данных Google Cloud, который обрабатывает большое количество вставок в течение дня и вставляет их в Cloud SQL. Я пытаюсь заполнить другую таблицу существующими данными, но когда я пытаюсь вставить в эти таблицы, это приводит к тому, что конвейер перестает вставлять. Когда я прекращаю выполнение запроса, вставки продолжают вставляться без проблем.

Я пробовал другие запросы в моей базе данных, но только вставки заставляют ее останавливать обработку вставок.

Мой запрос на вставку:

INSERT INTO t2 (c1, c2, firstUsed, lastUsed)
SELECT DISTINCT c3, c4, MIN(created), MAX(created) FROM t1 WHERE created 
<= '2018-12-01 23:59:59' GROUP BY c3, c4
ON DUPLICATE KEY UPDATE lastUsed = VALUES(lastUsed)

Исключение, которое я получаю в Google Cloud:

Processing stuck in step ParDo(ProcessDatabaseEvent) for at least 05m00s without outputting or completing in state process
  at java.net.SocketInputStream.socketRead0(Native Method)
  at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
  at java.net.SocketInputStream.read(SocketInputStream.java:170)
  at java.net.SocketInputStream.read(SocketInputStream.java:141)
  at com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107)
  at com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150)
  at com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180)
  at java.io.FilterInputStream.read(FilterInputStream.java:133)
  at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
  at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)
  at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)
  at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52)
  at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41)
  at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54)
  at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44)
  at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:557)
  at com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:735)
  at com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:674)
  at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:966)
  at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1165)
  at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:937)
  at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1116)
  at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1066)
  at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1396)
  at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdate(ClientPreparedStatement.java:1051)
  at mypardo.processElement(StatsCollector.java:173)
  at mypardo.invokeProcessElement(Unknown Source)
timestamp 2019-04-04T17:40:01.724Z
logger com.google.cloud.dataflow.worker.DataflowOperationContext
severity WARNING
worker myworker
step ParDo(ProcessDatabaseEvent)
thread 34

Любая помощь будет принята с благодарностью!

РЕДАКТИРОВАТЬ:

Добавление схемы базы данных

t1: (около 10 миллионов записей)

c1          varchar(255)  NO   PRI NULL    
c2          varchar(255)  NO   PRI NULL     
c3          varchar(255)  YES  NULL    
c4          varchar(255)  YES  NULL    
c5          varchar(255)  YES  NULL    
c6          varchar(255)  YES  NULL    
created     datetime      YES  NULL

t2:

c1         varchar(255) NO   PRI NULL    
c2         varchar(255) NO   PRI NULL    
firstUsed  datetime     YES      NULL    
lastUsed   datetime     YES      NULL    

1 Ответ

0 голосов
/ 04 апреля 2019

Если вы используете group by, вам не нужны отдельные и вам не следует использовать значения (lastUsed) в предложении ON DUPLICATE, но t1.lastUsed

  INSERT INTO t2 (c1, c2, firstUsed, lastUsed)
  SELECT c3, c4, MIN(created), MAX(created) 
  FROM t1 
  WHERE created <= '2018-12-01 23:59:59' 
  GROUP BY c3, c4
  ON DUPLICATE KEY UPDATE lastUsed = t1.lastUsed
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...