Я пытаюсь использовать ingest sdk для загрузки файла в моей программе Java и получаю сообщение об ошибке. Я создал таблицу, сцену и трубу заранее. Я также поместил свой CSV-файл на внутреннюю стадию, чтобы быть готовым для приема внутрь. Теперь моей Java программе просто нужно вызвать ingestFiles для загрузки файла. Однако, когда я выполняю вызов, я получаю ошибку 404 (см. Прилагаемый файл журнала и Java программу ниже).
Я создал свой publi c & закрытый ключ, используя следующие процедуры: https://docs.snowflake.net/manuals/user-guide/data-load-snowpipe-rest-gs.html#step -3-configure-security-per-user
Также обратите внимание, что я могу успешно загрузить файл, вручную введя команду COPY, связанную с каналом. Тем не менее, я предпочитаю использовать пакет ingest SDK, чтобы лучше отследить ответ.
Я подозреваю, что это проблема с разрешениями, но я не уверен. Мы будем благодарны за любую помощь.
03: 11: 06.275 [main] INFO com.pardi.snowpipetest.IngestTest - успешно загруженный закрытый ключ из файла keys / rsa_key.p8
[ main] WARN net .snowflake.ingest.connection.RequestBuilder - Не удалось прочитать информацию о версии: java .nio.file.FileSystemNotFoundException
[main] INFO net .snowflake.ingest.connection. SecurityManager - создание токена с темой WX11111.JPARDI
[main] INFO net .snowflake.ingest.connection.SecurityManager - создание токена с эмитентом WX11111.JPARDI.SHA256: xxxxxxxxxxxxxxxxx * ] INFO net .snowflake.ingest.connection.SecurityManager - Создан новый JWT - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * , Пользователь: JPARDI, Схема: https, Хост: .us-east-1.snowflakecomputing.com, Порт: 443 * 1 023 *
[main] INFO net .snowflake.ingest.SimpleIngestManager - UUID запроса на отправку -
[main] INFO net .snowflake.ingest.connection.RequestBuilder - Создан запрос на вставку: https://.us-east-1.snowflakecomputing.com: 443 / v1 / data / pipe / DEMO_DB.PUBLI C .COVETRUS_SNOWFLAKE_BATCH_SINK_SNOWPIPETEST_PIPE_H_CLIENT_0 / insertFiles? RequestId = 39a2c1e4-c637- * * * 10 * 1063.b2 *. .SimpleIngestManager - Попытка аннулировать ответ вставки - HttpResponseProxy {HTTP / 1.1 404 не найден [Content-Type: application / json, Дата: Вт, 25 февраля 2020 08:11:10 GMT, Сервер: nginx, Строгий Transport-Security: max-age = 31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Connection: keep-alive] net .snowflake.ingest.internal. apache .http. client.entity. DecompressingEntity@29c80149}
[main] WARN net .snowflake.ingest.connection.ServiceResponseHandler - Исключительный код состояния, найденный в ответе unmarshallInsert - 404
[main] ERROR net .snowflake.ingest.connection.S erviceResponseHandler - Код состояния 404, найденный в ответе службы
03: 11: 11.943 [main] INFO com.pardi.snowpipetest.IngestTest - Исключение службы:
Статус HTTP: 404
{
Сообщение: указанный объект не существует или не авторизован. Труба не найдена,
Данные: ноль
}
package com.pardi.snowpipetest;
import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryResponse;
import net.snowflake.ingest.connection.IngestResponse;
import net.snowflake.ingest.connection.IngestResponseException;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import java.io.*;
import java.security.PrivateKey;
import java.security.Security;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.*;
public class IngestTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IngestTest.class);
private static String host = "<xxxx>.us-east-1.snowflakecomputing.com";
private static String account = "<xxxxx>";
private static String user = "xxxxxx";
private static String passPhrase = "xxxxxxxxx";
private static int port = 443;
private static String database = "DEMO_DB";
private static String schema = "PUBLIC";
private static String pipe = "COVETRUS_SNOWFLAKE_BATCH_SINK_SNOWPIPETEST_PIPE_H_CLIENT_0";
private static String fqPipe = database + "." + schema + "." + pipe;
private static PrivateKey privateKey;
private static SimpleIngestManager manager;
public static void main(String[] args) throws Exception {
String privateKeyStr = loadPrivateKey();
privateKey = parseEncryptedPrivateKey(privateKeyStr, passPhrase);
manager = new SimpleIngestManager(account, user, fqPipe, privateKey, "https", host, port);
Set<String> files = new TreeSet<>();
files.add("covetrus_snowflake_batch_sink_snowpipetest_stage_h_client/h_client.csv");
try {
IngestResponse response = manager.ingestFiles(manager.wrapFilepaths(files), null);
LOGGER.info("response=" + response.toString());
HistoryResponse history = waitForFilesHistory(files);
LOGGER.info("Received history response: " + history.toString());
} catch (IngestResponseException e) {
LOGGER.info("Service exception: " + e.toString());
} catch (Exception e) {
LOGGER.info("Exception: " + e.getMessage());
}
}
private static String loadPrivateKey() throws IOException {
byte[] keyBytes;
String filename = "keys/rsa_key.p8";
File privateKeyFile = null;
try {
privateKeyFile = new ClassPathResource(filename).getFile();
FileInputStream fis = new FileInputStream(privateKeyFile);
DataInputStream dis = new DataInputStream(fis);
keyBytes = new byte[(int) privateKeyFile.length()];
dis.readFully(keyBytes);
dis.close();
} catch (IOException e) {
LOGGER.info("FATAL: error loading private key from file " + filename + ", exception=" + e.getMessage());
e.printStackTrace();
throw e;
}
String privateKeyStr = new String(keyBytes);
LOGGER.info("successfully loaded private key from file " + filename);
return privateKeyStr;
}
public static PrivateKey parseEncryptedPrivateKey(String key, String passphrase) {
Security.addProvider(new BouncyCastleFipsProvider());
try {
PEMParser pemParser = new PEMParser(new StringReader(key));
PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemParser.readObject();
pemParser.close();
InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleFipsProvider.PROVIDER_NAME);
PrivateKeyInfo decryptedPrivateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
return converter.getPrivateKey(decryptedPrivateKeyInfo);
} catch (Exception e) {
throw new RuntimeException("Invalid encrypted private key or passphrase");
}
}
private static HistoryResponse waitForFilesHistory(Set<String> files)
throws Exception {
ExecutorService service = Executors.newSingleThreadExecutor();
class GetHistory implements
Callable<HistoryResponse> {
private Set<String> filesWatchList;
GetHistory(Set<String> files) {
this.filesWatchList = files;
}
String beginMark = null;
public HistoryResponse call()
throws Exception {
HistoryResponse filesHistory = null;
while (true) {
Thread.sleep(500);
HistoryResponse response = manager.getHistory(null, null, beginMark);
if (response.getNextBeginMark() != null) {
beginMark = response.getNextBeginMark();
}
if (response != null && response.files != null) {
for (HistoryResponse.FileEntry entry : response.files) {
//if we have a complete file that we've
// loaded with the same name..
String filename = entry.getPath();
if (entry.getPath() != null && entry.isComplete() &&
filesWatchList.contains(filename)) {
if (filesHistory == null) {
filesHistory = new HistoryResponse();
filesHistory.setPipe(response.getPipe());
}
filesHistory.files.add(entry);
filesWatchList.remove(filename);
//we can return true!
if (filesWatchList.isEmpty()) {
return filesHistory;
}
}
}
}
}
}
}
GetHistory historyCaller = new GetHistory(files);
//fork off waiting for a load to the service
Future<HistoryResponse> result = service.submit(historyCaller);
HistoryResponse response = result.get(2, TimeUnit.MINUTES);
return response;
}
}