Как я могу получить CSV-файл с помощью Snowpipe ingestFile SDK? - PullRequest
0 голосов
/ 25 февраля 2020

Я пытаюсь использовать ingest sdk для загрузки файла в моей программе Java и получаю сообщение об ошибке. Я создал таблицу, сцену и трубу заранее. Я также поместил свой CSV-файл на внутреннюю стадию, чтобы быть готовым для приема внутрь. Теперь моей Java программе просто нужно вызвать ingestFiles для загрузки файла. Однако, когда я выполняю вызов, я получаю ошибку 404 (см. Прилагаемый файл журнала и Java программу ниже).

Я создал свой publi c & закрытый ключ, используя следующие процедуры: https://docs.snowflake.net/manuals/user-guide/data-load-snowpipe-rest-gs.html#step -3-configure-security-per-user

Также обратите внимание, что я могу успешно загрузить файл, вручную введя команду COPY, связанную с каналом. Тем не менее, я предпочитаю использовать пакет ingest SDK, чтобы лучше отследить ответ.

Я подозреваю, что это проблема с разрешениями, но я не уверен. Мы будем благодарны за любую помощь.

03: 11: 06.275 [main] INFO com.pardi.snowpipetest.IngestTest - успешно загруженный закрытый ключ из файла keys / rsa_key.p8

[ main] WARN net .snowflake.ingest.connection.RequestBuilder - Не удалось прочитать информацию о версии: java .nio.file.FileSystemNotFoundException

[main] INFO net .snowflake.ingest.connection. SecurityManager - создание токена с темой WX11111.JPARDI

[main] INFO net .snowflake.ingest.connection.SecurityManager - создание токена с эмитентом WX11111.JPARDI.SHA256: xxxxxxxxxxxxxxxxx * ] INFO net .snowflake.ingest.connection.SecurityManager - Создан новый JWT - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx * , Пользователь: JPARDI, Схема: https, Хост: .us-east-1.snowflakecomputing.com, Порт: 443 * 1 023 *

[main] INFO net .snowflake.ingest.SimpleIngestManager - UUID запроса на отправку -

[main] INFO net .snowflake.ingest.connection.RequestBuilder - Создан запрос на вставку: https://.us-east-1.snowflakecomputing.com: 443 / v1 / data / pipe / DEMO_DB.PUBLI C .COVETRUS_SNOWFLAKE_BATCH_SINK_SNOWPIPETEST_PIPE_H_CLIENT_0 / insertFiles? RequestId = 39a2c1e4-c637- * * * 10 * 1063.b2 *. .SimpleIngestManager - Попытка аннулировать ответ вставки - HttpResponseProxy {HTTP / 1.1 404 не найден [Content-Type: application / json, Дата: Вт, 25 февраля 2020 08:11:10 GMT, Сервер: nginx, Строгий Transport-Security: max-age = 31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Connection: keep-alive] net .snowflake.ingest.internal. apache .http. client.entity. DecompressingEntity@29c80149}

[main] WARN net .snowflake.ingest.connection.ServiceResponseHandler - Исключительный код состояния, найденный в ответе unmarshallInsert - 404

[main] ERROR net .snowflake.ingest.connection.S erviceResponseHandler - Код состояния 404, найденный в ответе службы

03: 11: 11.943 [main] INFO com.pardi.snowpipetest.IngestTest - Исключение службы:

Статус HTTP: 404

{

Сообщение: указанный объект не существует или не авторизован. Труба не найдена,

Данные: ноль

}

package com.pardi.snowpipetest;

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryResponse;
import net.snowflake.ingest.connection.IngestResponse;
import net.snowflake.ingest.connection.IngestResponseException;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;

import java.io.*;
import java.security.PrivateKey;
import java.security.Security;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.*;

public class IngestTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(IngestTest.class);
    private static String host = "<xxxx>.us-east-1.snowflakecomputing.com";
    private static String account = "<xxxxx>";
    private static String user = "xxxxxx";
    private static String passPhrase = "xxxxxxxxx";
    private static int port = 443;
    private static String database = "DEMO_DB";
    private static String schema = "PUBLIC";
    private static String pipe = "COVETRUS_SNOWFLAKE_BATCH_SINK_SNOWPIPETEST_PIPE_H_CLIENT_0";
    private static String fqPipe = database + "." + schema + "." + pipe;

    private static PrivateKey privateKey;
    private static SimpleIngestManager manager;

    public static void main(String[] args) throws Exception {
        String privateKeyStr = loadPrivateKey();
        privateKey = parseEncryptedPrivateKey(privateKeyStr, passPhrase);

        manager = new SimpleIngestManager(account, user, fqPipe, privateKey, "https", host, port);

        Set<String> files = new TreeSet<>();
        files.add("covetrus_snowflake_batch_sink_snowpipetest_stage_h_client/h_client.csv");

        try {
            IngestResponse response = manager.ingestFiles(manager.wrapFilepaths(files), null);
            LOGGER.info("response=" + response.toString());

            HistoryResponse history = waitForFilesHistory(files);
            LOGGER.info("Received history response: " + history.toString());
        } catch (IngestResponseException e) {
            LOGGER.info("Service exception: " + e.toString());
        } catch (Exception e) {
            LOGGER.info("Exception: " + e.getMessage());
        }
    }

    private static String loadPrivateKey() throws IOException {
        byte[] keyBytes;

        String filename = "keys/rsa_key.p8";

        File privateKeyFile = null;

        try {
            privateKeyFile = new ClassPathResource(filename).getFile();
            FileInputStream fis = new FileInputStream(privateKeyFile);
            DataInputStream dis = new DataInputStream(fis);
            keyBytes = new byte[(int) privateKeyFile.length()];
            dis.readFully(keyBytes);
            dis.close();
        } catch (IOException e) {
            LOGGER.info("FATAL: error loading private key from file " + filename + ", exception=" + e.getMessage());
            e.printStackTrace();
            throw e;
        }

        String privateKeyStr = new String(keyBytes);
        LOGGER.info("successfully loaded private key from file " + filename);
        return privateKeyStr;
    }

    public static PrivateKey parseEncryptedPrivateKey(String key, String passphrase) {
        Security.addProvider(new BouncyCastleFipsProvider());
        try {
            PEMParser pemParser = new PEMParser(new StringReader(key));
            PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemParser.readObject();
            pemParser.close();
            InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
            JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleFipsProvider.PROVIDER_NAME);
            PrivateKeyInfo decryptedPrivateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
            return converter.getPrivateKey(decryptedPrivateKeyInfo);
        } catch (Exception e) {
            throw new RuntimeException("Invalid encrypted private key or passphrase");
        }
    }

    private static HistoryResponse waitForFilesHistory(Set<String> files)
            throws Exception {
        ExecutorService service = Executors.newSingleThreadExecutor();

        class GetHistory implements
                Callable<HistoryResponse> {
            private Set<String> filesWatchList;

            GetHistory(Set<String> files) {
                this.filesWatchList = files;
            }

            String beginMark = null;

            public HistoryResponse call()
                    throws Exception {
                HistoryResponse filesHistory = null;
                while (true) {
                    Thread.sleep(500);
                    HistoryResponse response = manager.getHistory(null, null, beginMark);
                    if (response.getNextBeginMark() != null) {
                        beginMark = response.getNextBeginMark();
                    }
                    if (response != null && response.files != null) {
                        for (HistoryResponse.FileEntry entry : response.files) {
                            //if we have a complete file that we've
                            // loaded with the same name..
                            String filename = entry.getPath();
                            if (entry.getPath() != null && entry.isComplete() &&
                                    filesWatchList.contains(filename)) {
                                if (filesHistory == null) {
                                    filesHistory = new HistoryResponse();
                                    filesHistory.setPipe(response.getPipe());
                                }
                                filesHistory.files.add(entry);
                                filesWatchList.remove(filename);
                                //we can return true!
                                if (filesWatchList.isEmpty()) {
                                    return filesHistory;
                                }
                            }
                        }
                    }
                }
            }
        }

        GetHistory historyCaller = new GetHistory(files);
        //fork off waiting for a load to the service
        Future<HistoryResponse> result = service.submit(historyCaller);

        HistoryResponse response = result.get(2, TimeUnit.MINUTES);
        return response;
    }
}
...